From 29abb4a212a3d2696ba510eacbd5fad5ddc6d98e Mon Sep 17 00:00:00 2001 From: hebo Date: Fri, 23 Aug 2019 15:05:21 +0800 Subject: [PATCH] deal incomplete sql --- README.md | 4 +- capture/network.go | 30 ++-- model/query_piece.go | 10 +- session-dealer/model.go | 6 - session-dealer/mysql/session.go | 250 ++++++++++++++++---------------- 5 files changed, 148 insertions(+), 152 deletions(-) diff --git a/README.md b/README.md index 8767d42..0f560bd 100644 --- a/README.md +++ b/README.md @@ -21,9 +21,9 @@ sniffer-agent采用模块化结构,支持用户添加自己的解析模块, 目前输出的内容都是解析结果组成的json。 MySQL协议的解析结果示例如下: ``` -{"sid":"10.XX.XX.XX:54656","sip":"192.168.XX.XX","sport":3306,"user":"root","db":"sniffer","sql":"show tables","bt":"2019-08-05 18:23:09","cms":15} +{"cip":"192.168.XXX.XXX","cport":63888,"sip":"192.168.XX.XX","sport":3306,"user":"root","db":"sniffer","sql":"show tables","bt":"2019-08-05 18:23:09","cms":15} ``` -其中sid代表客户端ip:port组成的session标识,sip代表server ip,sport代表server port,user代表查询用户,db代表当前连接的库名,sql代表查询语句,bt代表查询开始时间,cms代表查询消耗的时间,单位是毫秒 +其中cip代表客户端ip,cport代表客户端port(客户端ip:port组成session标识),sip代表server ip,sport代表server port,user代表查询用户,db代表当前连接的库名,sql代表查询语句,bt代表查询开始时间,cms代表查询消耗的时间,单位是毫秒 ### Exporter diff --git a/capture/network.go b/capture/network.go index e66e026..e51be5e 100644 --- a/capture/network.go +++ b/capture/network.go @@ -71,7 +71,7 @@ func initEthernetHandlerFromPacpgo() (handler *pcapgo.EthernetHandle) { panic(err.Error()) } - _ = handler.SetCaptureLength(65535) + _ = handler.SetCaptureLength(1024*1024*10) return } @@ -92,16 +92,19 @@ func initEthernetHandlerFromPacp() (handler *pcap.Handle) { func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { if inParallel { - return nc.listenInParallel() + nc.listenInParallel() + + } else { + nc.listenNormal() } - return nc.listenNormal() + return nc.receiver } // Listen get a connection. -func (nc *networkCard) listenNormal() (receiver chan model.QueryPiece) { +func (nc *networkCard) listenNormal() { go func() { - handler := initEthernetHandlerFromPacp() + handler := initEthernetHandlerFromPacpgo() for { var data []byte data, ci, err := handler.ZeroCopyReadPacketData() @@ -123,7 +126,7 @@ func (nc *networkCard) listenNormal() (receiver chan model.QueryPiece) { } // Listen get a connection. -func (nc *networkCard) listenInParallel() (receiver chan model.QueryPiece) { +func (nc *networkCard) listenInParallel() { type captureInfo struct { bytes []byte captureInfo gopacket.CaptureInfo @@ -158,16 +161,19 @@ func (nc *networkCard) listenInParallel() (receiver chan model.QueryPiece) { // parse package go func() { - defer func() { - close(receiver) - }() - for captureInfo := range rawDataChan { packet := gopacket.NewPacket(captureInfo.bytes, layers.LayerTypeEthernet, gopacket.NoCopy) m := packet.Metadata() m.CaptureInfo = captureInfo.captureInfo m.Truncated = m.Truncated || captureInfo.captureInfo.CaptureLength < captureInfo.captureInfo.Length + packageChan <- packet + } + }() + + // parse package + go func() { + for packet := range packageChan { nc.parseTCPPackage(packet) } }() @@ -246,7 +252,7 @@ func readFromServerPackage( sessionKey := spliceSessionKey(srcIP, srcPort) session := sessionPool[*sessionKey] if session != nil { - // session.ReadFromServer(tcpPayload) + // session.readFromServer(tcpPayload) // qp = session.GenerateQueryPiece() pkt := model.NewTCPPacket(tcpPayload, int64(tcpPkt.Ack), false) session.ReceiveTCPPacket(pkt) @@ -286,8 +292,6 @@ func readToServerPackage( pkt := model.NewTCPPacket(tcpPayload, int64(tcpPkt.Seq), true) session.ReceiveTCPPacket(pkt) - // session.ResetBeginTime() - // session.ReadFromClient(int64(tcpPkt.Seq), tcpPayload) return } diff --git a/model/query_piece.go b/model/query_piece.go index 7ca21dd..b25fadb 100644 --- a/model/query_piece.go +++ b/model/query_piece.go @@ -18,8 +18,9 @@ type QueryPiece interface { // MysqlQueryPiece 查询信息 type MysqlQueryPiece struct { - SessionID *string `json:"sid"` - ClientHost *string `json:"-"` + SessionID *string `json:"-"` + ClientHost *string `json:"cip"` + ClientPort int `json:"cport"` SyncSend bool `json:"-"` ServerIP *string `json:"sip"` ServerPort int `json:"sport"` @@ -45,7 +46,8 @@ var ( ) func NewPooledMysqlQueryPiece( - sessionID, visitUser, visitDB, clientHost, serverIP *string, serverPort int, stmtBeginTime int64) ( + sessionID, clientIP, visitUser, visitDB, clientHost, serverIP *string, + clientPort, serverPort int, stmtBeginTime int64) ( mqp *PooledMysqlQueryPiece) { mqp = mqpp.Dequeue() if mqp == nil { @@ -56,6 +58,8 @@ func NewPooledMysqlQueryPiece( nowInMS := time.Now().UnixNano() / millSecondUnit mqp.SessionID = sessionID + mqp.ClientHost = clientIP + mqp.ClientPort = clientPort mqp.ClientHost = clientHost mqp.ServerIP = serverIP mqp.ServerPort = serverPort diff --git a/session-dealer/model.go b/session-dealer/model.go index c023565..9258c7c 100644 --- a/session-dealer/model.go +++ b/session-dealer/model.go @@ -3,11 +3,5 @@ package session_dealer import "github.com/zr-hebo/sniffer-agent/model" type ConnSession interface { - ReadFromClient(seqID int64, bytes []byte) - ReadFromServer(bytes []byte) - ResetBeginTime() - GenerateQueryPiece() (qp model.QueryPiece) - ReceiveTCPPacket(*model.TCPPacket) - Stop() } diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index a9b5291..8e342b7 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -3,6 +3,7 @@ package mysql import ( "fmt" "strings" + "sync" "time" "github.com/siddontang/go/hack" @@ -11,31 +12,32 @@ import ( ) type MysqlSession struct { - connectionID *string - visitUser *string - visitDB *string - clientHost *string - clientPort int - serverIP *string - serverPort int - stmtBeginTime int64 - beginSeqID int64 - packageOffset int64 - expectReceiveSize int - coverRanges []*jigsaw - expectSendSize int - prepareInfo *prepareInfo - cachedPrepareStmt map[int]*string - cachedStmtBytes []byte + connectionID *string + visitUser *string + visitDB *string + clientHost *string + clientPort int + serverIP *string + serverPort int + stmtBeginTime int64 + beginSeqID int64 + packageOffset int64 + expectReceiveSize int + coverRanges []*jigsaw + expectSendSize int + prepareInfo *prepareInfo + cachedPrepareStmt map[int]*string + cachedStmtBytes []byte computeWindowSizeCounter int tcpPacketCache []*model.TCPPacket queryPieceReceiver chan model.QueryPiece - lastSeq int64 - keepAlive chan bool + lastSeq int64 + closeConn chan bool + pkgCacheLock sync.Mutex - ackID int64 + ignoreAckID int64 sendSize int64 } @@ -55,118 +57,127 @@ func NewMysqlSession( sessionKey *string, clientIP *string, clientPort int, serverIP *string, serverPort int, receiver chan model.QueryPiece) (ms *MysqlSession) { ms = &MysqlSession{ - connectionID: sessionKey, - clientHost: clientIP, - clientPort: clientPort, - serverIP: serverIP, - serverPort: serverPort, - stmtBeginTime: time.Now().UnixNano() / millSecondUnit, - cachedPrepareStmt: make(map[int]*string, 8), - coverRanges: make([]*jigsaw, 0, 4), + connectionID: sessionKey, + clientHost: clientIP, + clientPort: clientPort, + serverIP: serverIP, + serverPort: serverPort, + stmtBeginTime: time.Now().UnixNano() / millSecondUnit, + cachedPrepareStmt: make(map[int]*string, 8), + coverRanges: make([]*jigsaw, 0, 4), queryPieceReceiver: receiver, - keepAlive: make(chan bool, 1), - lastSeq: -1, - ackID: -1, - sendSize: 0, + closeConn: make(chan bool, 1), + lastSeq: -1, + ignoreAckID: -1, + sendSize: 0, + pkgCacheLock: sync.Mutex{}, } - go ms.haha() - return } -func (ms *MysqlSession) Stop() { - ms.keepAlive <- false -} -func (ms *MysqlSession) haha() { +func (ms *MysqlSession) dealTCPPacket() { + for { + select { + case closeConn := <-ms.closeConn: - for true { - // select { - // case <- ms.keepAlive: - // return - // default: - // } - - if len(ms.tcpPacketCache) < 1 { - // log.Debugf("there are %d packages in tcp packet cache", ) - time.Sleep(1) - continue - } - - beginIdx := -1 - if ms.lastSeq < 0 { - ms.lastSeq = ms.tcpPacketCache[0].Seq - } - for idx := 0; idx < len(ms.tcpPacketCache); idx++ { - pkt := ms.tcpPacketCache[idx] - if ms.lastSeq == pkt.Seq { - beginIdx = idx - ms.lastSeq = pkt.Seq + int64(len(pkt.Payload)) - - } else { - break + if closeConn { + return } - } - - if beginIdx < 0 { - return - } - - inOrderPkgs := ms.tcpPacketCache[:beginIdx+1] - if beginIdx == len(ms.tcpPacketCache) - 1 { - ms.tcpPacketCache = make([]*model.TCPPacket, 0, 4) - } else { - ms.tcpPacketCache = ms.tcpPacketCache[beginIdx+1:] - } - - for _, pkg := range inOrderPkgs { - if pkg.ToServer { - ms.ReadFromClient(pkg.Seq, pkg.Payload) + default: + if len(ms.tcpPacketCache) > 0 { + ms.parseTCPPacket() } else { - ms.ReadFromServer(pkg.Payload) - ms.queryPieceReceiver <- ms.GenerateQueryPiece() + log.Debugf("no package need deal in session:%s, so sleep", *ms.connectionID) + time.Sleep(time.Second) } } } } -func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) { - if !newPkt.ToServer && ms.ackID + ms.sendSize == newPkt.Seq { +func (ms *MysqlSession) parseTCPPacket() { + ms.pkgCacheLock.Lock() + var pkg *model.TCPPacket + if len(ms.tcpPacketCache) < 1 { + ms.pkgCacheLock.Unlock() + return + } + + pkg = ms.tcpPacketCache[0] + ms.tcpPacketCache = ms.tcpPacketCache[1:] + ms.pkgCacheLock.Unlock() + + if pkg.ToServer { + ms.resetBeginTime() + ms.readFromClient(pkg.Seq, pkg.Payload) + + } else { + ms.readFromServer(pkg.Payload) + qp := ms.GenerateQueryPiece() + if qp != nil { + ms.queryPieceReceiver <- qp + } + } + + return +} + +func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) { + if newPkt == nil { + return + } + + if !newPkt.ToServer && ms.ignoreAckID == newPkt.Seq { // ignore to response to client data - ms.ackID = ms.ackID + newPkt.Seq - ms.sendSize = ms.sendSize + int64(len(newPkt.Payload)) + ms.ignoreAckID = ms.ignoreAckID + int64(len(newPkt.Payload)) return } else if !newPkt.ToServer { - ms.ackID = newPkt.Seq - ms.sendSize = int64(len(newPkt.Payload)) + ms.ignoreAckID = newPkt.Seq + int64(len(newPkt.Payload)) } - insertIdx := len(ms.tcpPacketCache) - for idx, pkt := range ms.tcpPacketCache { - if pkt.Seq > newPkt.Seq { - insertIdx = idx + if newPkt.ToServer { + ms.resetBeginTime() + ms.readFromClient(newPkt.Seq, newPkt.Payload) + + } else { + ms.readFromServer(newPkt.Payload) + qp := ms.GenerateQueryPiece() + if qp != nil { + ms.queryPieceReceiver <- qp } } - if insertIdx == len(ms.tcpPacketCache) { - ms.tcpPacketCache = append(ms.tcpPacketCache, newPkt) - } else { - newCache := make([]*model.TCPPacket, len(ms.tcpPacketCache)+1) - copy(newCache[:insertIdx], ms.tcpPacketCache[:insertIdx]) - newCache[insertIdx] = newPkt - copy(newCache[insertIdx+1:], ms.tcpPacketCache[insertIdx:]) - } + // ms.pkgCacheLock.Lock() + // defer ms.pkgCacheLock.Unlock() + // + // insertIdx := len(ms.tcpPacketCache) + // for idx, pkt := range ms.tcpPacketCache { + // if pkt.Seq > newPkt.Seq { + // insertIdx = idx + // } + // } + // + // if insertIdx == len(ms.tcpPacketCache) { + // ms.tcpPacketCache = append(ms.tcpPacketCache, newPkt) + // } else { + // newCache := make([]*model.TCPPacket, len(ms.tcpPacketCache)+1) + // copy(newCache[:insertIdx], ms.tcpPacketCache[:insertIdx]) + // newCache[insertIdx] = newPkt + // copy(newCache[insertIdx+1:], ms.tcpPacketCache[insertIdx:]) + // ms.tcpPacketCache = newCache + // } + } -func (ms *MysqlSession) ResetBeginTime() { +func (ms *MysqlSession) resetBeginTime() { ms.stmtBeginTime = time.Now().UnixNano() / millSecondUnit } -func (ms *MysqlSession) ReadFromServer(bytes []byte) { +func (ms *MysqlSession) readFromServer(bytes []byte) { if ms.expectSendSize < 1 { ms.expectSendSize = extractMysqlPayloadSize(bytes[:4]) contents := bytes[4:] @@ -176,7 +187,7 @@ func (ms *MysqlSession) ReadFromServer(bytes []byte) { } } -func (ms *MysqlSession) mergeRanges() { +func (ms *MysqlSession) mergeRanges() { if len(ms.coverRanges) > 1 { newRange, newPkgRanges := mergeRanges(ms.coverRanges[0], ms.coverRanges[1:]) tmpRanges := make([]*jigsaw, len(newPkgRanges)+1) @@ -219,7 +230,7 @@ func mergeRanges(currRange *jigsaw, pkgRanges []*jigsaw) (mergedRange *jigsaw, n } func (ms *MysqlSession) oneMysqlPackageFinish() bool { - if int64(len(ms.cachedStmtBytes)) % MaxMysqlPacketLen == 0 { + if int64(len(ms.cachedStmtBytes))%MaxMysqlPacketLen == 0 { return true } @@ -230,25 +241,22 @@ func (ms *MysqlSession) checkFinish() bool { if len(ms.coverRanges) != 1 { ranges := make([]string, 0, len(ms.coverRanges)) for _, cr := range ms.coverRanges { - log.Errorf("miss values: %s", string(ms.cachedStmtBytes[cr.b-ms.beginSeqID: cr.e-ms.beginSeqID])) - ranges = append(ranges, fmt.Sprintf("[%d -- %d]", cr.b, cr.e)) } - - log.Errorf("in session %s get invalid range: %s", *ms.connectionID, strings.Join(ranges, ", ")) + log.Debugf("in session %s get invalid range: %s", *ms.connectionID, strings.Join(ranges, ", ")) return false } firstRange := ms.coverRanges[0] - if firstRange.e - firstRange.b != int64(len(ms.cachedStmtBytes)) { + if firstRange.e-firstRange.b != int64(len(ms.cachedStmtBytes)) { return false } return true } -func (ms *MysqlSession) ReadFromClient(seqID int64, bytes []byte) { +func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { contentSize := int64(len(bytes)) if ms.expectReceiveSize == 0 || ms.oneMysqlPackageFinish() { @@ -268,12 +276,9 @@ func (ms *MysqlSession) ReadFromClient(seqID int64, bytes []byte) { copy(newCache[:len(ms.cachedStmtBytes)], ms.cachedStmtBytes) } - if int64(ms.expectReceiveSize+len(ms.cachedStmtBytes)) > ms.packageOffset+int64(len(contents)) { + if int64(ms.expectReceiveSize+len(ms.cachedStmtBytes)) >= ms.packageOffset+int64(len(contents)) { copy(newCache[ms.packageOffset:ms.packageOffset+int64(len(contents))], contents) ms.cachedStmtBytes = newCache - } else { - log.Debugf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXxxx") - return } } else { @@ -296,7 +301,7 @@ func (ms *MysqlSession) ReadFromClient(seqID int64, bytes []byte) { } } - cr := &jigsaw{b: seqID, e: seqID+contentSize} + cr := &jigsaw{b: seqID, e: seqID + contentSize} if len(ms.coverRanges) < 1 || insertIdx == len(ms.coverRanges) { ms.coverRanges = append(ms.coverRanges, cr) @@ -311,17 +316,6 @@ func (ms *MysqlSession) ReadFromClient(seqID int64, bytes []byte) { ms.mergeRanges() } -func (ms *MysqlSession) refreshWindowSize(readSize int) { - windowCounter := windowSizeCache[*ms.clientHost] - if windowCounter == nil { - windowCounter = newPackageWindowCounter() - windowSizeCache[*ms.clientHost] = windowCounter - } - - // windowCounter.refresh(readSize, ms.checkFinish()) - // ms.tcpWindowSize = windowCounter.suggestSize -} - func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { defer func() { ms.cachedStmtBytes = nil @@ -330,7 +324,7 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { ms.prepareInfo = nil ms.coverRanges = make([]*jigsaw, 0, 4) ms.lastSeq = -1 - ms.ackID = -1 + ms.ignoreAckID = -1 ms.sendSize = 0 }() @@ -338,9 +332,8 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { return } - // fmt.Printf("packageComplete in generate: %v\n", ms.packageComplete) if !ms.checkFinish() { - log.Errorf("receive a not complete cover") + log.Debugf("receive a not complete cover") return } @@ -430,5 +423,6 @@ func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (m func (ms *MysqlSession) composeQueryPiece() (mqp *model.PooledMysqlQueryPiece) { return model.NewPooledMysqlQueryPiece( - ms.connectionID, ms.visitUser, ms.visitDB, ms.clientHost, ms.serverIP, ms.serverPort, ms.stmtBeginTime) + ms.connectionID, ms.clientHost, ms.visitUser, ms.visitDB, ms.clientHost, ms.serverIP, + ms.clientPort, ms.serverPort, ms.stmtBeginTime) }