From a7bdc409767a894ce2191b2da3e0d147e0365a3f Mon Sep 17 00:00:00 2001 From: hebo Date: Tue, 20 Aug 2019 19:44:03 +0800 Subject: [PATCH] capture big mysql package --- capture/network.go | 79 +++++++++--- model/query_piece.go | 21 +-- session-dealer/model.go | 5 +- session-dealer/mysql/const.go | 4 +- session-dealer/mysql/model.go | 6 + session-dealer/mysql/session.go | 219 ++++++++++++++++++++------------ session-dealer/mysql/util.go | 4 - 7 files changed, 219 insertions(+), 119 deletions(-) diff --git a/capture/network.go b/capture/network.go index 9c90c26..c6d0fff 100644 --- a/capture/network.go +++ b/capture/network.go @@ -7,6 +7,7 @@ import ( "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" "golang.org/x/net/bpf" + "time" "github.com/google/gopacket/pcapgo" log "github.com/sirupsen/logrus" @@ -36,7 +37,6 @@ func NewNetworkCard() (nc *networkCard) { } func initEthernetHandlerFromPacpgo() (handler *pcapgo.EthernetHandle) { - // handler, err := pcap.OpenLive(DeviceName, 65535, false, pcap.BlockForever) handler, err := pcapgo.NewEthernetHandle(DeviceName) if err != nil { panic(fmt.Sprintf("cannot open network interface %s <-- %s", DeviceName, err.Error())) @@ -50,13 +50,13 @@ func initEthernetHandlerFromPacpgo() (handler *pcapgo.EthernetHandle) { } bpfIns := []bpf.RawInstruction{} for _, ins := range pcapBPF { - bpfIns2 := bpf.RawInstruction{ + bpfIn := bpf.RawInstruction{ Op: ins.Code, Jt: ins.Jt, Jf: ins.Jf, K: ins.K, } - bpfIns = append(bpfIns, bpfIns2) + bpfIns = append(bpfIns, bpfIn) } err = handler.SetBPF(bpfIns) @@ -64,10 +64,15 @@ func initEthernetHandlerFromPacpgo() (handler *pcapgo.EthernetHandle) { panic(err.Error()) } + // fmt.Printf("++++ handler.CaptureLength: %d\n", handler.GetCaptureLength()) + _ = handler.SetCaptureLength(65535) + // fmt.Printf("++++ handler.CaptureLength: %d\n", handler.GetCaptureLength()) + return } func initEthernetHandlerFromPacp() (handler *pcap.Handle) { + // handler, err := pcap.OpenLive(DeviceName, 65535, false, pcap.BlockForever) handler, err := pcap.OpenLive(DeviceName, 65535, false, pcap.BlockForever) if err != nil { panic(fmt.Sprintf("cannot open network interface %s <-- %s", DeviceName, err.Error())) @@ -77,6 +82,7 @@ func initEthernetHandlerFromPacp() (handler *pcap.Handle) { if err != nil { panic(err.Error()) } + handler.SnapLen() return } @@ -86,17 +92,14 @@ func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { receiver = make(chan model.QueryPiece, 100) go func() { - defer func() { - close(receiver) - }() - - handler := initEthernetHandlerFromPacp() + handler := initEthernetHandlerFromPacpgo() for { var data []byte - // data, ci, err := handler.ZeroCopyReadPacketData() - data, ci, err := handler.ReadPacketData() + data, ci, err := handler.ZeroCopyReadPacketData() + // data, ci, err := handler.ReadPacketData() if err != nil { log.Error(err.Error()) + time.Sleep(time.Second*3) continue } @@ -115,6 +118,54 @@ func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { return } +// Listen get a connection. +func (nc *networkCard) ListenInParallel() (receiver chan model.QueryPiece) { + receiver = make(chan model.QueryPiece, 100) + packageChan := make(chan gopacket.Packet, 10) + + // read packet + go func() { + defer func() { + close(packageChan) + }() + + handler := initEthernetHandlerFromPacpgo() + for { + var data []byte + // data, ci, err := handler.ZeroCopyReadPacketData() + data, ci, err := handler.ReadPacketData() + if err != nil { + log.Error(err.Error()) + time.Sleep(time.Second*3) + continue + } + + packet := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.NoCopy) + m := packet.Metadata() + m.CaptureInfo = ci + m.Truncated = m.Truncated || ci.CaptureLength < ci.Length + + packageChan <- packet + } + }() + + // deal packet + go func() { + defer func() { + close(receiver) + }() + + for packet := range packageChan { + qp := nc.parseTCPPackage(packet) + if qp != nil { + receiver <- qp + } + } + }() + + return +} + func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) (qp model.QueryPiece) { var err error defer func() { @@ -182,6 +233,8 @@ func readFromServerPackage(srcIP *string, srcPort int, tcpConn *layers.TCP) (qp return } + _ = tcpConn.Seq + sessionKey := spliceSessionKey(srcIP, srcPort) session := sessionPool[*sessionKey] if session != nil { @@ -220,11 +273,7 @@ func readToServerPackage(srcIP *string, srcPort int, tcpConn *layers.TCP) (err e } session.ResetBeginTime() - session.ReadFromClient(tcpPayload) - a := session.ReadOnePackageFinish() - if a { - session.ResetCache() - } + session.ReadFromClient(int64(tcpConn.Seq), tcpPayload) return } diff --git a/model/query_piece.go b/model/query_piece.go index 2dc7636..7ca21dd 100644 --- a/model/query_piece.go +++ b/model/query_piece.go @@ -7,6 +7,7 @@ import ( "github.com/pingcap/tidb/util/hack" ) + type QueryPiece interface { String() *string Bytes() []byte @@ -68,14 +69,14 @@ func NewPooledMysqlQueryPiece( return } -func (qp *MysqlQueryPiece) String() (*string) { - content := qp.Bytes() +func (mqp *MysqlQueryPiece) String() (*string) { + content := mqp.Bytes() contentStr := hack.String(content) return &contentStr } -func (qp *MysqlQueryPiece) Bytes() (bytes []byte) { - content, err := json.Marshal(qp) +func (mqp *MysqlQueryPiece) Bytes() (bytes []byte) { + content, err := json.Marshal(mqp) if err != nil { return []byte(err.Error()) } @@ -83,16 +84,16 @@ func (qp *MysqlQueryPiece) Bytes() (bytes []byte) { return content } -func (qp *MysqlQueryPiece) GetSQL() (str *string) { - return qp.QuerySQL +func (mqp *MysqlQueryPiece) GetSQL() (str *string) { + return mqp.QuerySQL } -func (qp *MysqlQueryPiece) NeedSyncSend() (bool) { - return qp.SyncSend +func (mqp *MysqlQueryPiece) NeedSyncSend() (bool) { + return mqp.SyncSend } -func (qp *MysqlQueryPiece) SetNeedSyncSend(syncSend bool) { - qp.SyncSend = syncSend +func (mqp *MysqlQueryPiece) SetNeedSyncSend(syncSend bool) { + mqp.SyncSend = syncSend } func (pmqp *PooledMysqlQueryPiece) Recovery() { diff --git a/session-dealer/model.go b/session-dealer/model.go index 9a42e21..bdbb9ac 100644 --- a/session-dealer/model.go +++ b/session-dealer/model.go @@ -3,11 +3,8 @@ package session_dealer import "github.com/zr-hebo/sniffer-agent/model" type ConnSession interface { - ReadFromClient(bytes []byte) + ReadFromClient(seqID int64, bytes []byte) ReadFromServer(bytes []byte) ResetBeginTime() GenerateQueryPiece() (qp model.QueryPiece) - ReadAllPackageFinish() bool - ReadOnePackageFinish() bool - ResetCache() } diff --git a/session-dealer/mysql/const.go b/session-dealer/mysql/const.go index a3b87ac..b4afad0 100644 --- a/session-dealer/mysql/const.go +++ b/session-dealer/mysql/const.go @@ -95,8 +95,8 @@ const ( // Identifier length limitations. // See https://dev.mysql.com/doc/refman/5.7/en/identifiers.html const ( - // MaxPayloadLen is the max packet payload length. - MaxPayloadLen = 1<<24 - 1 + // MaxMysqlPacketLen is the max packet payload length. + MaxMysqlPacketLen = 1<<24 - 1 ) const ( diff --git a/session-dealer/mysql/model.go b/session-dealer/mysql/model.go index a33b40c..d47e30a 100644 --- a/session-dealer/mysql/model.go +++ b/session-dealer/mysql/model.go @@ -7,3 +7,9 @@ type handshakeResponse41 struct { DBName string Auth []byte } + +// jigsaw record tcp package begin and end seq id +type jigsaw struct { + b int64 + e int64 +} diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 6a27de8..2c43ab2 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -2,11 +2,11 @@ package mysql import ( "fmt" - "github.com/siddontang/go/hack" "time" - "github.com/zr-hebo/sniffer-agent/model" + "github.com/siddontang/go/hack" log "github.com/sirupsen/logrus" + "github.com/zr-hebo/sniffer-agent/model" ) type MysqlSession struct { @@ -18,15 +18,17 @@ type MysqlSession struct { serverIP *string serverPort int stmtBeginTime int64 + beginSeqID int64 + packageOffset int64 expectReceiveSize int - packageBaseSize int - packageComplete bool + coverRanges []*jigsaw + tcpWindowSize int expectSendSize int prepareInfo *prepareInfo - sizeCount map[int]int64 + sizeCount map[int]int64 cachedPrepareStmt map[int]*string - tcpCache []byte cachedStmtBytes []byte + computeWindowSizeCounter int } type prepareInfo struct { @@ -34,8 +36,8 @@ type prepareInfo struct { } const ( - defaultCacheSize = 1<<16 - maxBeyondCount = 3 + defaultCacheSize = 1 << 16 + maxIPPackageSize = 1 << 16 ) func NewMysqlSession(sessionKey *string, clientIP *string, clientPort int, serverIP *string, serverPort int) (ms *MysqlSession) { @@ -48,9 +50,8 @@ func NewMysqlSession(sessionKey *string, clientIP *string, clientPort int, serve stmtBeginTime: time.Now().UnixNano() / millSecondUnit, cachedPrepareStmt: make(map[int]*string, 8), } - ms.tcpCache = make([]byte, 0, defaultCacheSize) - ms.cachedStmtBytes = make([]byte, 0, defaultCacheSize) - ms.packageBaseSize = 512 + ms.tcpWindowSize = 512 + ms.coverRanges = make([]*jigsaw, 0, 4) ms.sizeCount = make(map[int]int64) return @@ -67,62 +68,143 @@ func (ms *MysqlSession) ReadFromServer(bytes []byte) { if ms.prepareInfo != nil && contents[0] == 0 { ms.prepareInfo.prepareStmtID = bytesToInt(contents[1:5]) } - fmt.Printf("Init ms.expectSendSize: %v\n", ms.expectSendSize) - ms.expectSendSize = ms.expectSendSize - len(contents) - - } else { - ms.expectSendSize = ms.expectSendSize - len(bytes) } } -func (ms *MysqlSession) ReadFromClient(bytes []byte) { - if ms.expectReceiveSize < 1 { +func (ms *MysqlSession) mergeRanges() { + if len(ms.coverRanges) > 1 { + newRange, newPkgRanges := mergeRanges(ms.coverRanges[0], ms.coverRanges[1:]) + newPkgRanges = append(newPkgRanges, newRange) + ms.coverRanges = newPkgRanges + } +} + +func mergeRanges(currRange *jigsaw, pkgRanges []*jigsaw) (mergedRange *jigsaw, newPkgRanges []*jigsaw) { + var nextRange *jigsaw + if len(pkgRanges) < 1 { + return currRange, make([]*jigsaw, 0) + + } else if len(pkgRanges) == 1 { + nextRange = pkgRanges[0] + newPkgRanges = make([]*jigsaw, 0, 4) + + } else { + nextRange, newPkgRanges = mergeRanges(pkgRanges[0], pkgRanges[1:]) + } + + if currRange.e >= nextRange.b { + mergedRange = &jigsaw{b: currRange.b, e: nextRange.e} + + } else { + newPkgRanges = append(newPkgRanges, nextRange) + mergedRange = currRange + } + return +} + +func (ms *MysqlSession) oneMysqlPackageFinish() bool { + if int64(len(ms.cachedStmtBytes)) % MaxMysqlPacketLen == 0 { + return true + } + + return false +} + +func (ms *MysqlSession) checkFinish() bool { + if len(ms.coverRanges) != 1 { + return true + } + + firstRange := ms.coverRanges[0] + if firstRange.e - firstRange.b != int64(len(ms.cachedStmtBytes)) { + return false + } + + return true +} + +func (ms *MysqlSession) ReadFromClient(seqID int64, bytes []byte) { + readSize := len(bytes) + contentSize := int64(len(bytes)) + + if ms.expectReceiveSize == 0 || ms.oneMysqlPackageFinish() { ms.expectReceiveSize = extractMysqlPayloadSize(bytes[:4]) + ms.packageOffset = int64(len(ms.cachedStmtBytes)) + contents := bytes[4:] if contents[0] == ComStmtPrepare { ms.prepareInfo = &prepareInfo{} } - ms.tcpCache = append(ms.tcpCache, contents...) - ms.expectReceiveSize = ms.expectReceiveSize - len(contents) + contentSize = int64(len(contents)) + seqID += 4 + ms.beginSeqID = seqID + newCache := make([]byte, ms.expectReceiveSize+len(ms.cachedStmtBytes)) + if len(ms.cachedStmtBytes) > 0 { + copy(newCache[:len(ms.cachedStmtBytes)], ms.cachedStmtBytes) + } + copy(newCache[ms.packageOffset:ms.packageOffset+int64(len(contents))], contents) + ms.cachedStmtBytes = newCache } else { - ms.tcpCache = append(ms.tcpCache, bytes...) - ms.expectReceiveSize = ms.expectReceiveSize - len(bytes) - } - - readSize := len(bytes) - readTail := readSize % ms.packageBaseSize - if readTail != 0 { - if ms.expectReceiveSize == 0 { - ms.cachedStmtBytes = append(ms.cachedStmtBytes, ms.tcpCache...) - ms.tcpCache = ms.tcpCache[0:0] - ms.packageComplete = true - - } else { - ms.packageComplete = false + if seqID < ms.beginSeqID { + log.Debugf("outdate package with Seq:%d", seqID) + return } - } else if readTail == 0 && ms.expectReceiveSize == 0 { - ms.packageComplete = true + seqOffset := seqID - ms.beginSeqID + if ms.packageOffset+seqOffset+int64(len(bytes)) <= int64(ms.expectReceiveSize) { + copy(ms.cachedStmtBytes[ms.packageOffset+seqOffset:ms.packageOffset+seqOffset+int64(len(bytes))], bytes) + } } - miniMatchSize := 1 << 16 + ms.refreshWindowSize(readSize) + + insertIdx := len(ms.coverRanges) + for idx, cr := range ms.coverRanges { + if seqID < cr.b { + insertIdx = idx + break + } + } + + cr := &jigsaw{b: seqID, e: seqID+int64(contentSize)} + if insertIdx == len(ms.coverRanges) - 1 { + ms.coverRanges = append(ms.coverRanges, cr) + + } else { + newCoverRanges := make([]*jigsaw, len(ms.coverRanges)+1) + copy(newCoverRanges[:insertIdx], ms.coverRanges[:insertIdx]) + newCoverRanges[insertIdx] = cr + copy(newCoverRanges[insertIdx+1:], ms.coverRanges[insertIdx:]) + ms.coverRanges = newCoverRanges + } + ms.mergeRanges() + +} + +func (ms *MysqlSession) refreshWindowSize(readSize int) { + if ms.computeWindowSizeCounter > 5000 { + return + } + + log.Debugf("sizeCount: %#v", ms.sizeCount) + + ms.computeWindowSizeCounter += 1 + miniMatchSize := maxIPPackageSize for size := range ms.sizeCount { if readSize % size == 0 && miniMatchSize > size { miniMatchSize = size } } - if miniMatchSize < 1 << 16 { + if miniMatchSize < maxIPPackageSize { ms.sizeCount[miniMatchSize] = ms.sizeCount[miniMatchSize] + 1 - } else if (ms.expectReceiveSize != 0) { + } else if ms.checkFinish() { ms.sizeCount[readSize] = 1 } - - - mostFrequentSize := ms.packageBaseSize - miniSize := ms.packageBaseSize + mostFrequentSize := ms.tcpWindowSize + miniSize := ms.tcpWindowSize mostFrequentCount := int64(0) for size, count := range ms.sizeCount { if count > mostFrequentCount { @@ -135,62 +217,33 @@ func (ms *MysqlSession) ReadFromClient(bytes []byte) { } } - ms.packageBaseSize = mostFrequentSize - - // fmt.Printf("read %v bytes: %v\n", len(bytes), string(bytes)) - fmt.Printf("ms.expectReceiveSize: %v\n", ms.expectReceiveSize) - fmt.Printf("ms.sizeCount: %#v\n", ms.sizeCount) - fmt.Printf("len(ms.tcpCache): %#v\n", len(ms.tcpCache)) - fmt.Printf("packageComplete in read: %v\n", ms.packageComplete) - log.Infof("ms.packageBaseSize: %v", ms.packageBaseSize) + ms.tcpWindowSize = mostFrequentSize } -func (ms *MysqlSession) ReadOnePackageFinish() bool { - if len(ms.tcpCache) == MaxPayloadLen { - return true - } - - return false -} - -func (ms *MysqlSession) ReadAllPackageFinish() bool { - // fmt.Printf("len(ms.tcpCache): %v\n", len(ms.tcpCache)) - // fmt.Printf("ms.expectReceiveSize: %v\n", ms.expectReceiveSize) - - if len(ms.tcpCache) < MaxPayloadLen { - return true - } - - return false -} - -func (ms *MysqlSession) ResetCache() { - ms.cachedStmtBytes = append(ms.cachedStmtBytes, ms.tcpCache...) - ms.tcpCache = ms.tcpCache[0:0] -} func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { defer func() { - ms.tcpCache = ms.tcpCache[0:0] - ms.cachedStmtBytes = ms.cachedStmtBytes[0:0] + // ms.tcpCache = ms.tcpCache[0:0] + ms.cachedStmtBytes = nil ms.expectReceiveSize = 0 ms.expectSendSize = 0 ms.prepareInfo = nil - ms.packageComplete = false + ms.coverRanges = make([]*jigsaw, 0, 4) + // ms.packageComplete = false }() - if len(ms.cachedStmtBytes) < 1 && len(ms.tcpCache) < 1 { + if len(ms.cachedStmtBytes) < 1 { return } // fmt.Printf("packageComplete in generate: %v\n", ms.packageComplete) - if !ms.packageComplete { + if !ms.checkFinish() { + log.Errorf("is not a complete cover") return } var mqp *model.PooledMysqlQueryPiece var querySQLInBytes []byte - ms.cachedStmtBytes = append(ms.cachedStmtBytes, ms.tcpCache...) switch ms.cachedStmtBytes[0] { case ComAuth: var userName, dbName string @@ -218,15 +271,13 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { case ComCreateDB, ComQuery: mqp = ms.composeQueryPiece() - querySQLInBytes = make([]byte, len(ms.cachedStmtBytes[1:])) - copy(querySQLInBytes, ms.cachedStmtBytes[1:]) + querySQLInBytes = ms.cachedStmtBytes[1:] querySQL := hack.String(querySQLInBytes) mqp.QuerySQL = &querySQL case ComStmtPrepare: mqp = ms.composeQueryPiece() - querySQLInBytes = make([]byte, len(ms.cachedStmtBytes[1:])) - copy(querySQLInBytes, ms.cachedStmtBytes[1:]) + querySQLInBytes = ms.cachedStmtBytes[1:] querySQL := hack.String(querySQLInBytes) mqp.QuerySQL = &querySQL ms.cachedPrepareStmt[ms.prepareInfo.prepareStmtID] = &querySQL diff --git a/session-dealer/mysql/util.go b/session-dealer/mysql/util.go index 9b5d6d6..53c1752 100644 --- a/session-dealer/mysql/util.go +++ b/session-dealer/mysql/util.go @@ -3,7 +3,6 @@ package mysql import ( "bytes" "encoding/binary" - "fmt" ) // parseHandshakeResponseHeader parses the common header of SSLRequest and HandshakeResponse41. @@ -123,9 +122,6 @@ func parseLengthEncodedInt(b []byte) (num uint64, isNull bool, n int) { } func extractMysqlPayloadSize(header []byte) int { - fmt.Printf("==== package header: %v\n", header) - - // return int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16) return int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16) }