From 16a5d8db2670da21d1fae06d9c4ff067ed6e6cbd Mon Sep 17 00:00:00 2001 From: hebo Date: Fri, 16 Aug 2019 23:02:08 +0800 Subject: [PATCH] deal multi packages --- capture/network.go | 29 +++++-- exporter/cli.go | 2 +- scripts/sysbench.sh | 4 + session-dealer/model.go | 3 + session-dealer/mysql/session.go | 140 +++++++++++++++++++++++++++----- session-dealer/mysql/util.go | 7 +- 6 files changed, 158 insertions(+), 27 deletions(-) create mode 100755 scripts/sysbench.sh diff --git a/capture/network.go b/capture/network.go index f80a6c9..9c90c26 100644 --- a/capture/network.go +++ b/capture/network.go @@ -35,7 +35,7 @@ func NewNetworkCard() (nc *networkCard) { return &networkCard{name: DeviceName, listenPort: snifferPort} } -func initEthernetHandler() (handler *pcapgo.EthernetHandle) { +func initEthernetHandlerFromPacpgo() (handler *pcapgo.EthernetHandle) { // handler, err := pcap.OpenLive(DeviceName, 65535, false, pcap.BlockForever) handler, err := pcapgo.NewEthernetHandle(DeviceName) if err != nil { @@ -67,6 +67,20 @@ func initEthernetHandler() (handler *pcapgo.EthernetHandle) { return } +func initEthernetHandlerFromPacp() (handler *pcap.Handle) { + handler, err := pcap.OpenLive(DeviceName, 65535, false, pcap.BlockForever) + if err != nil { + panic(fmt.Sprintf("cannot open network interface %s <-- %s", DeviceName, err.Error())) + } + + err = handler.SetBPFFilter(fmt.Sprintf("tcp and (port %d)", snifferPort)) + if err != nil { + panic(err.Error()) + } + + return +} + // Listen get a connection. func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { receiver = make(chan model.QueryPiece, 100) @@ -76,10 +90,11 @@ func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { close(receiver) }() - handler := initEthernetHandler() + handler := initEthernetHandlerFromPacp() for { var data []byte - data, ci, err := handler.ZeroCopyReadPacketData() + // data, ci, err := handler.ZeroCopyReadPacketData() + data, ci, err := handler.ReadPacketData() if err != nil { log.Error(err.Error()) continue @@ -130,14 +145,14 @@ func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) (qp model.QueryPi dstIP := ipInfo.DstIP.String() srcPort := int(tcpConn.SrcPort) dstPort := int(tcpConn.DstPort) - if dstIP == *localIPAddr && dstPort == nc.listenPort { + if dstPort == nc.listenPort { // deal mysql server response err = readToServerPackage(&srcIP, srcPort, tcpConn) if err != nil { return } - } else if srcIP == *localIPAddr && srcPort == nc.listenPort { + } else if srcPort == nc.listenPort { // deal mysql client request qp, err = readFromServerPackage(&dstIP, dstPort, tcpConn) if err != nil { @@ -206,6 +221,10 @@ func readToServerPackage(srcIP *string, srcPort int, tcpConn *layers.TCP) (err e session.ResetBeginTime() session.ReadFromClient(tcpPayload) + a := session.ReadOnePackageFinish() + if a { + session.ResetCache() + } return } diff --git a/exporter/cli.go b/exporter/cli.go index 3ba7718..cfedfa9 100644 --- a/exporter/cli.go +++ b/exporter/cli.go @@ -13,6 +13,6 @@ func NewCliExporter() *cliExporter { } func (c *cliExporter) Export (qp model.QueryPiece) (err error){ - fmt.Println(qp.String()) + fmt.Println(*qp.String()) return } \ No newline at end of file diff --git a/scripts/sysbench.sh b/scripts/sysbench.sh new file mode 100755 index 0000000..08e5f29 --- /dev/null +++ b/scripts/sysbench.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +sysbench /usr/share/sysbench/oltp_read_only.lua --mysql-host=192.XX.XX.XX --mysql-port=3306 --mysql-user=root --mysql-password='' --mysql-db=sniffer --db-driver=mysql --tables=10 --table-size=1000000 --report-interval=10 --threads=128 --time=120 prepare +sysbench /usr/share/sysbench/oltp_read_only.lua --mysql-host=192.XX.XX.XX --mysql-port=3306 --mysql-user=root --mysql-password='' --mysql-db=sniffer --db-driver=mysql --tables=10 --table-size=1000000 --report-interval=10 --threads=128 --time=120 run \ No newline at end of file diff --git a/session-dealer/model.go b/session-dealer/model.go index d8c242c..9a42e21 100644 --- a/session-dealer/model.go +++ b/session-dealer/model.go @@ -7,4 +7,7 @@ type ConnSession interface { ReadFromServer(bytes []byte) ResetBeginTime() GenerateQueryPiece() (qp model.QueryPiece) + ReadAllPackageFinish() bool + ReadOnePackageFinish() bool + ResetCache() } diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 3bb2293..6a27de8 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -18,8 +18,12 @@ type MysqlSession struct { serverIP *string serverPort int stmtBeginTime int64 - expectSize int + expectReceiveSize int + packageBaseSize int + packageComplete bool + expectSendSize int prepareInfo *prepareInfo + sizeCount map[int]int64 cachedPrepareStmt map[int]*string tcpCache []byte cachedStmtBytes []byte @@ -29,6 +33,11 @@ type prepareInfo struct { prepareStmtID int } +const ( + defaultCacheSize = 1<<16 + maxBeyondCount = 3 +) + func NewMysqlSession(sessionKey *string, clientIP *string, clientPort int, serverIP *string, serverPort int) (ms *MysqlSession) { ms = &MysqlSession{ connectionID: sessionKey, @@ -39,6 +48,11 @@ func NewMysqlSession(sessionKey *string, clientIP *string, clientPort int, serve stmtBeginTime: time.Now().UnixNano() / millSecondUnit, cachedPrepareStmt: make(map[int]*string, 8), } + ms.tcpCache = make([]byte, 0, defaultCacheSize) + ms.cachedStmtBytes = make([]byte, 0, defaultCacheSize) + ms.packageBaseSize = 512 + ms.sizeCount = make(map[int]int64) + return } @@ -47,46 +61,133 @@ func (ms *MysqlSession) ResetBeginTime() { } func (ms *MysqlSession) ReadFromServer(bytes []byte) { - if ms.expectSize < 1 { - ms.expectSize = extractMysqlPayloadSize(bytes) + if ms.expectSendSize < 1 { + ms.expectSendSize = extractMysqlPayloadSize(bytes[:4]) contents := bytes[4:] if ms.prepareInfo != nil && contents[0] == 0 { ms.prepareInfo.prepareStmtID = bytesToInt(contents[1:5]) } - ms.expectSize = ms.expectSize - len(contents) + fmt.Printf("Init ms.expectSendSize: %v\n", ms.expectSendSize) + ms.expectSendSize = ms.expectSendSize - len(contents) } else { - ms.expectSize = ms.expectSize - len(bytes) + ms.expectSendSize = ms.expectSendSize - len(bytes) } } func (ms *MysqlSession) ReadFromClient(bytes []byte) { - if ms.expectSize < 1 { - ms.expectSize = extractMysqlPayloadSize(bytes) + if ms.expectReceiveSize < 1 { + ms.expectReceiveSize = extractMysqlPayloadSize(bytes[:4]) contents := bytes[4:] if contents[0] == ComStmtPrepare { ms.prepareInfo = &prepareInfo{} } - ms.expectSize = ms.expectSize - len(contents) ms.tcpCache = append(ms.tcpCache, contents...) + ms.expectReceiveSize = ms.expectReceiveSize - len(contents) } else { - ms.expectSize = ms.expectSize - len(bytes) ms.tcpCache = append(ms.tcpCache, bytes...) - if len(ms.tcpCache) == MaxPayloadLen { + ms.expectReceiveSize = ms.expectReceiveSize - len(bytes) + } + + readSize := len(bytes) + readTail := readSize % ms.packageBaseSize + if readTail != 0 { + if ms.expectReceiveSize == 0 { ms.cachedStmtBytes = append(ms.cachedStmtBytes, ms.tcpCache...) - ms.tcpCache = ms.tcpCache[:0] - ms.expectSize = 0 + ms.tcpCache = ms.tcpCache[0:0] + ms.packageComplete = true + + } else { + ms.packageComplete = false + } + + } else if readTail == 0 && ms.expectReceiveSize == 0 { + ms.packageComplete = true + } + + miniMatchSize := 1 << 16 + for size := range ms.sizeCount { + if readSize % size == 0 && miniMatchSize > size { + miniMatchSize = size } } + if miniMatchSize < 1 << 16 { + ms.sizeCount[miniMatchSize] = ms.sizeCount[miniMatchSize] + 1 + } else if (ms.expectReceiveSize != 0) { + ms.sizeCount[readSize] = 1 + } + + + + mostFrequentSize := ms.packageBaseSize + miniSize := ms.packageBaseSize + mostFrequentCount := int64(0) + for size, count := range ms.sizeCount { + if count > mostFrequentCount { + mostFrequentSize = size + mostFrequentCount = count + } + + if miniSize > size { + miniSize = size + } + } + + ms.packageBaseSize = mostFrequentSize + + // fmt.Printf("read %v bytes: %v\n", len(bytes), string(bytes)) + fmt.Printf("ms.expectReceiveSize: %v\n", ms.expectReceiveSize) + fmt.Printf("ms.sizeCount: %#v\n", ms.sizeCount) + fmt.Printf("len(ms.tcpCache): %#v\n", len(ms.tcpCache)) + fmt.Printf("packageComplete in read: %v\n", ms.packageComplete) + log.Infof("ms.packageBaseSize: %v", ms.packageBaseSize) +} + +func (ms *MysqlSession) ReadOnePackageFinish() bool { + if len(ms.tcpCache) == MaxPayloadLen { + return true + } + + return false +} + +func (ms *MysqlSession) ReadAllPackageFinish() bool { + // fmt.Printf("len(ms.tcpCache): %v\n", len(ms.tcpCache)) + // fmt.Printf("ms.expectReceiveSize: %v\n", ms.expectReceiveSize) + + if len(ms.tcpCache) < MaxPayloadLen { + return true + } + + return false +} + +func (ms *MysqlSession) ResetCache() { + ms.cachedStmtBytes = append(ms.cachedStmtBytes, ms.tcpCache...) + ms.tcpCache = ms.tcpCache[0:0] } func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { + defer func() { + ms.tcpCache = ms.tcpCache[0:0] + ms.cachedStmtBytes = ms.cachedStmtBytes[0:0] + ms.expectReceiveSize = 0 + ms.expectSendSize = 0 + ms.prepareInfo = nil + ms.packageComplete = false + }() + if len(ms.cachedStmtBytes) < 1 && len(ms.tcpCache) < 1 { return } + // fmt.Printf("packageComplete in generate: %v\n", ms.packageComplete) + if !ms.packageComplete { + return + } + var mqp *model.PooledMysqlQueryPiece var querySQLInBytes []byte ms.cachedStmtBytes = append(ms.cachedStmtBytes, ms.tcpCache...) @@ -96,7 +197,7 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { var err error userName, dbName, err = parseAuthInfo(ms.cachedStmtBytes) if err != nil { - return + log.Errorf("parse auth info failed <-- %s", err.Error()) } ms.visitUser = &userName ms.visitDB = &dbName @@ -109,9 +210,13 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { // update session database ms.visitDB = &newDBName - case ComCreateDB: case ComDropDB: - case ComQuery: + dbName := string(ms.cachedStmtBytes[1:]) + dropSQL := fmt.Sprintf("drop database %s", dbName) + mqp = ms.composeQueryPiece() + mqp.QuerySQL = &dropSQL + + case ComCreateDB, ComQuery: mqp = ms.composeQueryPiece() querySQLInBytes = make([]byte, len(ms.cachedStmtBytes[1:])) copy(querySQLInBytes, ms.cachedStmtBytes[1:]) @@ -139,6 +244,7 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { log.Debugf("remove prepare statement:%d", prepareStmtID) default: + return } if strictMode && mqp != nil && mqp.VisitUser == nil { @@ -151,10 +257,6 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { } } - ms.tcpCache = ms.tcpCache[:0] - ms.cachedStmtBytes = ms.cachedStmtBytes[:0] - ms.expectSize = 0 - ms.prepareInfo = nil return filterQueryPieceBySQL(mqp, querySQLInBytes) } diff --git a/session-dealer/mysql/util.go b/session-dealer/mysql/util.go index 9153bcf..9b5d6d6 100644 --- a/session-dealer/mysql/util.go +++ b/session-dealer/mysql/util.go @@ -3,6 +3,7 @@ package mysql import ( "bytes" "encoding/binary" + "fmt" ) // parseHandshakeResponseHeader parses the common header of SSLRequest and HandshakeResponse41. @@ -121,8 +122,10 @@ func parseLengthEncodedInt(b []byte) (num uint64, isNull bool, n int) { return } -func extractMysqlPayloadSize(payload []byte) int { - header := payload[:4] +func extractMysqlPayloadSize(header []byte) int { + fmt.Printf("==== package header: %v\n", header) + + // return int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16) return int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16) }