From f60f43046d61c5e5377d63fc00ecbd400c84341a Mon Sep 17 00:00:00 2001 From: hebo Date: Wed, 25 Sep 2019 15:56:58 +0800 Subject: [PATCH 01/17] change git ignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 0d72c1f..32835eb 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ sniffer-agent # vendor/github.com profile* *.svg +nohup.out +*/nohup.out From f5b8337638602c7620c6a3fe0d2dc1a9fd2a5d3d Mon Sep 17 00:00:00 2001 From: hebo Date: Thu, 26 Sep 2019 08:59:42 +0800 Subject: [PATCH 02/17] fix bug of read to many bytes --- session-dealer/mysql/session.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 99f6a96..418c263 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -12,17 +12,17 @@ import ( ) type MysqlSession struct { - connectionID *string - visitUser *string - visitDB *string - clientHost *string - clientPort int - serverIP *string - serverPort int - stmtBeginTime int64 + connectionID *string + visitUser *string + visitDB *string + clientHost *string + clientPort int + serverIP *string + serverPort int + stmtBeginTime int64 // packageOffset int64 - beginSeqID int64 - endSeqID int64 + beginSeqID int64 + endSeqID int64 coverRanges *coverRanges expectReceiveSize int expectSendSize int @@ -59,7 +59,7 @@ func NewMysqlSession( queryPieceReceiver: receiver, closeConn: make(chan bool, 1), expectReceiveSize: -1, - coverRanges: NewCoverRanges(), + coverRanges: NewCoverRanges(), ignoreAckID: -1, sendSize: 0, pkgCacheLock: sync.Mutex{}, @@ -100,7 +100,7 @@ func (ms *MysqlSession) resetBeginTime() { } func (ms *MysqlSession) readFromServer(bytes []byte) { - if ms.expectSendSize < 1 { + if ms.expectSendSize < 1 && len(bytes) > 4 { ms.expectSendSize = extractMysqlPayloadSize(bytes[:4]) contents := bytes[4:] if ms.prepareInfo != nil && contents[0] == 0 { @@ -115,7 +115,7 @@ func (ms *MysqlSession) checkFinish() bool { } checkNode := ms.coverRanges.head.next - if checkNode.end - checkNode.begin == int64(len(ms.cachedStmtBytes)) { + if checkNode.end-checkNode.begin == int64(len(ms.cachedStmtBytes)) { return true } @@ -160,7 +160,6 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { ms.beginSeqID = seqID ms.endSeqID = seqID - if int64(ms.expectReceiveSize) < int64(len(contents)) { log.Warnf("receive invalid mysql packet") return @@ -192,7 +191,7 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { if seqOffset+contentSize > int64(len(ms.cachedStmtBytes)) { // not in a normal mysql packet log.Debugf("receive an unexpect packet") - ms.clear() + ms.clear() return } @@ -204,7 +203,6 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { // ms.expectReceiveSize = ms.expectReceiveSize - int(contentSize) } - func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { defer ms.clear() From fcdfbfeb0da14ac96199b2074bdaa90a4c69f56f Mon Sep 17 00:00:00 2001 From: hebo Date: Thu, 26 Sep 2019 10:43:31 +0800 Subject: [PATCH 03/17] split session id --- model/mysql_query_piece.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/model/mysql_query_piece.go b/model/mysql_query_piece.go index bebe694..7843436 100644 --- a/model/mysql_query_piece.go +++ b/model/mysql_query_piece.go @@ -9,9 +9,9 @@ import ( type MysqlQueryPiece struct { BaseQueryPiece - SessionID *string `json:"cid"` - ClientHost *string `json:"-"` - ClientPort int `json:"-"` + SessionID *string `json:"-"` + ClientHost *string `json:"cip"` + ClientPort int `json:"cport"` VisitUser *string `json:"user"` VisitDB *string `json:"db"` From b57c87a86d514b17b130b0f868cf86915acefe76 Mon Sep 17 00:00:00 2001 From: hebo Date: Fri, 27 Sep 2019 12:42:59 +0800 Subject: [PATCH 04/17] change default capture rate --- communicator/config.go | 2 +- communicator/model.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/communicator/config.go b/communicator/config.go index aebf222..e8d2910 100644 --- a/communicator/config.go +++ b/communicator/config.go @@ -28,7 +28,7 @@ func init() { flag.IntVar(&communicatePort, "communicate_port", 8088, "http server port. Default is 8088") var cpr float64 - flag.Float64Var(&cpr, CAPTURE_PACKET_RATE, 1, "capture packet rate. Default is 1.0") + flag.Float64Var(&cpr, CAPTURE_PACKET_RATE, 0.01, "capture packet rate. Default is 0.01") _ = catpurePacketRate.setVal(cpr) configMap = make(map[string]configItem) diff --git a/communicator/model.go b/communicator/model.go index 44ad29e..c1c6a3d 100644 --- a/communicator/model.go +++ b/communicator/model.go @@ -32,6 +32,7 @@ func (cprc *capturePacketRateConfig) setVal (val interface{}) (err error){ return } + fmt.Printf("set config %s: %v", CAPTURE_PACKET_RATE, realVal) cprc.mysqlTPR = realVal cprc.tcpTPR = math.Sqrt(realVal) return From 653c6c124fb693a69d8b6980112f17bf197ea4f4 Mon Sep 17 00:00:00 2001 From: hebo Date: Fri, 27 Sep 2019 12:45:40 +0800 Subject: [PATCH 05/17] change default capture rate --- session-dealer/mysql/session.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 418c263..0a0ec77 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -161,7 +161,7 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { ms.endSeqID = seqID if int64(ms.expectReceiveSize) < int64(len(contents)) { - log.Warnf("receive invalid mysql packet") + log.Debug("receive invalid mysql packet") return } From b5dd173c017b71ea9975c52c98d05f6692fa5f46 Mon Sep 17 00:00:00 2001 From: hebo Date: Fri, 27 Sep 2019 12:53:54 +0800 Subject: [PATCH 06/17] change default capture rate --- capture/network.go | 40 ++++++++++++++++++++-------------------- communicator/model.go | 2 +- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/capture/network.go b/capture/network.go index a981347..0308730 100644 --- a/capture/network.go +++ b/capture/network.go @@ -113,33 +113,33 @@ func (nc *networkCard) listenNormal() { var ci gopacket.CaptureInfo var err error - // capture packets according to a certain probability - capturePacketRate := communicator.GetTCPCapturePacketRate() - if capturePacketRate <= 0 { - time.Sleep(time.Second*1) - aliveCounter += 1 - if aliveCounter >= checkCount { - aliveCounter = 0 - nc.receiver <- model.NewBaseQueryPiece(localIPAddr, nc.listenPort, capturePacketRate) - } - continue - - } else if 0 < capturePacketRate && capturePacketRate < 1.0 { - // fall into throw range - rn := rand.Float64() - if rn > capturePacketRate { - continue - } - } - - aliveCounter = 0 data, ci, err = handler.ZeroCopyReadPacketData() if err != nil { log.Error(err.Error()) time.Sleep(time.Second*3) continue } + + // capture packets according to a certain probability + tcpCapturePacketRate := communicator.GetTCPCapturePacketRate() + if tcpCapturePacketRate <= 0 { + time.Sleep(time.Second*1) + aliveCounter += 1 + if aliveCounter >= checkCount { + aliveCounter = 0 + nc.receiver <- model.NewBaseQueryPiece(localIPAddr, nc.listenPort, tcpCapturePacketRate) + } + continue + } else if 0 < tcpCapturePacketRate && tcpCapturePacketRate < 1.0 { + // fall into throw range + rn := rand.Float64() + if rn > tcpCapturePacketRate { + continue + } + } + + aliveCounter = 0 packet := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.NoCopy) m := packet.Metadata() m.CaptureInfo = ci diff --git a/communicator/model.go b/communicator/model.go index c1c6a3d..ec420f3 100644 --- a/communicator/model.go +++ b/communicator/model.go @@ -32,7 +32,7 @@ func (cprc *capturePacketRateConfig) setVal (val interface{}) (err error){ return } - fmt.Printf("set config %s: %v", CAPTURE_PACKET_RATE, realVal) + fmt.Printf("set config %s: %v\n", CAPTURE_PACKET_RATE, realVal) cprc.mysqlTPR = realVal cprc.tcpTPR = math.Sqrt(realVal) return From e2a924c776f8a8d9984c67870022612917390b13 Mon Sep 17 00:00:00 2001 From: hebo Date: Fri, 27 Sep 2019 14:13:59 +0800 Subject: [PATCH 07/17] fix bug when deal invalid package --- capture/network.go | 2 +- session-dealer/mysql/session.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/capture/network.go b/capture/network.go index 0308730..9f00792 100644 --- a/capture/network.go +++ b/capture/network.go @@ -119,7 +119,7 @@ func (nc *networkCard) listenNormal() { time.Sleep(time.Second*3) continue } - + // capture packets according to a certain probability tcpCapturePacketRate := communicator.GetTCPCapturePacketRate() if tcpCapturePacketRate <= 0 { diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 0a0ec77..b2194da 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -149,6 +149,11 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { return } + // ignore invalid head package + if len(bytes) <= 4{ + return + } + contents := bytes[4:] // add prepare info if contents[0] == ComStmtPrepare { From b79e9f6867a7be1b01d744e78b65d3dd2f80cd96 Mon Sep 17 00:00:00 2001 From: hebo Date: Fri, 27 Sep 2019 16:48:47 +0800 Subject: [PATCH 08/17] do not ignore FIN tcp packet --- capture/network.go | 40 +++++++++++++++++++++++++------------- communicator/config.go | 6 ++---- communicator/controller.go | 6 ++++++ 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/capture/network.go b/capture/network.go index 9f00792..d5ee4b0 100644 --- a/capture/network.go +++ b/capture/network.go @@ -20,13 +20,13 @@ import ( var ( DeviceName string snifferPort int - inParallel bool + // inParallel bool ) func init() { flag.StringVar(&DeviceName, "interface", "eth0", "network device name. Default is eth0") flag.IntVar(&snifferPort, "port", 3306, "sniffer port. Default is 3306") - flag.BoolVar(&inParallel, "in_parallel", false, "if capture and deal package in parallel. Default is false") + // flag.BoolVar(&inParallel, "in_parallel", false, "if capture and deal package in parallel. Default is false") } // networkCard is network device @@ -93,21 +93,27 @@ func initEthernetHandlerFromPacp() (handler *pcap.Handle) { } func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { - if inParallel { - nc.listenInParallel() - - } else { - nc.listenNormal() - } + // if inParallel { + // nc.listenInParallel() + // + // } else { + // nc.listenNormal() + // } + nc.listenNormal() return nc.receiver } +func isFINPacket(data []byte) (isFIN bool) { + return +} + // Listen get a connection. func (nc *networkCard) listenNormal() { go func() { aliveCounter := 0 - handler := initEthernetHandlerFromPacpgo() + handler := initEthernetHandlerFromPacp() + for { var data []byte var ci gopacket.CaptureInfo @@ -120,6 +126,18 @@ func (nc *networkCard) listenNormal() { continue } + // packet := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.NoCopy) + packet := gopacket.NewPacket(data, handler.LinkType(), gopacket.NoCopy) + m := packet.Metadata() + m.CaptureInfo = ci + + // send FIN tcp packet to avoid not complete session cannot be released + tcpPkt := packet.TransportLayer().(*layers.TCP) + if tcpPkt.FIN { + nc.parseTCPPackage(packet) + continue + } + // capture packets according to a certain probability tcpCapturePacketRate := communicator.GetTCPCapturePacketRate() if tcpCapturePacketRate <= 0 { @@ -140,10 +158,6 @@ func (nc *networkCard) listenNormal() { } aliveCounter = 0 - packet := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.NoCopy) - m := packet.Metadata() - m.CaptureInfo = ci - m.Truncated = m.Truncated || ci.CaptureLength < ci.Length nc.parseTCPPackage(packet) } }() diff --git a/communicator/config.go b/communicator/config.go index e8d2910..73806a2 100644 --- a/communicator/config.go +++ b/communicator/config.go @@ -13,7 +13,6 @@ const ( var ( communicatePort int - // capturePacketRate float64 router = mux.NewRouter() ) @@ -21,15 +20,14 @@ var ( configMapLock sync.RWMutex configMap map[string]configItem catpurePacketRate *capturePacketRateConfig + catpurePacketRateVal float64 ) func init() { catpurePacketRate = newCapturePacketRateConfig() flag.IntVar(&communicatePort, "communicate_port", 8088, "http server port. Default is 8088") - var cpr float64 - flag.Float64Var(&cpr, CAPTURE_PACKET_RATE, 0.01, "capture packet rate. Default is 0.01") - _ = catpurePacketRate.setVal(cpr) + flag.Float64Var(&catpurePacketRateVal, CAPTURE_PACKET_RATE, 0.01, "capture packet rate. Default is 0.01") configMap = make(map[string]configItem) } diff --git a/communicator/controller.go b/communicator/controller.go index 5b97185..43a1e44 100644 --- a/communicator/controller.go +++ b/communicator/controller.go @@ -11,6 +11,8 @@ import ( ) func Server() { + initConfig() + server := &http.Server{ Addr: "0.0.0.0:" + strconv.Itoa(communicatePort), IdleTimeout: time.Second * 5, @@ -22,6 +24,10 @@ func Server() { } } +func initConfig() { + _ = catpurePacketRate.setVal(catpurePacketRateVal) +} + func outletCheckAlive(resp http.ResponseWriter, req *http.Request) { mp := hu.NewMouthpiece(resp) defer func() { From 6b87d5bf0620cffabb0efc423ee971196fb0a715 Mon Sep 17 00:00:00 2001 From: hebo Date: Fri, 27 Sep 2019 16:52:20 +0800 Subject: [PATCH 09/17] change warn log to debug --- session-dealer/mysql/session.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index b2194da..11c0706 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -181,7 +181,7 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { } if ms.beginSeqID == -1 { - log.Warnf("cover range is empty") + log.Debug("cover range is empty") return } From a01de02f9a44a9ed6cd5ab9d1621be50d995a5b8 Mon Sep 17 00:00:00 2001 From: hebo Date: Sun, 29 Sep 2019 09:21:55 +0800 Subject: [PATCH 10/17] fix bug of read short packet --- session-dealer/mysql/session.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 11c0706..6640479 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -143,14 +143,14 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { contentSize := int64(len(bytes)) if ms.expectReceiveSize == -1 { - ms.expectReceiveSize = extractMysqlPayloadSize(bytes[:4]) - // ignore too big mysql packet - if ms.expectReceiveSize >= MaxMysqlPacketLen { + // ignore invalid head package + if len(bytes) <= 4{ return } - // ignore invalid head package - if len(bytes) <= 4{ + ms.expectReceiveSize = extractMysqlPayloadSize(bytes[:4]) + // ignore too big mysql packet + if ms.expectReceiveSize >= MaxMysqlPacketLen { return } From 2c54f183746932741412acf636028a16ae2fe870 Mon Sep 17 00:00:00 2001 From: hebo Date: Tue, 8 Oct 2019 14:23:37 +0800 Subject: [PATCH 11/17] change recover order --- model/mysql_query_piece.go | 2 +- model/query_piece.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/model/mysql_query_piece.go b/model/mysql_query_piece.go index 7843436..19180c5 100644 --- a/model/mysql_query_piece.go +++ b/model/mysql_query_piece.go @@ -71,7 +71,7 @@ func (mqp *MysqlQueryPiece) GetSQL() (str *string) { } func (pmqp *PooledMysqlQueryPiece) Recovery() { - pmqp.recoverPool.Enqueue(pmqp) pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0]) pmqp.jsonContent = nil + pmqp.recoverPool.Enqueue(pmqp) } diff --git a/model/query_piece.go b/model/query_piece.go index 440a664..e0d3a74 100644 --- a/model/query_piece.go +++ b/model/query_piece.go @@ -83,6 +83,10 @@ func (bqp *BaseQueryPiece) Recovery() { func marsharQueryPiece(qp interface{}) []byte { var cacheBuffer = localSliceBufferPool.Dequeue() + if len(cacheBuffer) > 0 { + panic("there already have bytes in buffer") + } + buffer := bytes.NewBuffer(cacheBuffer) err := json.NewEncoder(buffer).Encode(qp) if err != nil { From c3531819bdf6d237ab1484baa15ff1b3a86f16bb Mon Sep 17 00:00:00 2001 From: hebo Date: Tue, 8 Oct 2019 15:16:42 +0800 Subject: [PATCH 12/17] change marshal strategy --- Godeps/Godeps.json | 2 +- model/mysql_query_piece.go | 2 +- model/query_piece.go | 17 ++++++++++++++--- vendor/github.com/zr-hebo/util-db/query_db.go | 4 +++- .../zr-hebo/util-db/query_pooled_db.go | 2 +- 5 files changed, 20 insertions(+), 7 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 9476f6d..8fa4383 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -109,7 +109,7 @@ }, { "ImportPath": "github.com/zr-hebo/util-db", - "Rev": "3ff29f916f7b712b3adc53c4b9b19b13b8bbed87" + "Rev": "06948bca5665b2d80078f9ff5c015eabb43c54f8" }, { "ImportPath": "github.com/zr-hebo/util-http", diff --git a/model/mysql_query_piece.go b/model/mysql_query_piece.go index 19180c5..5ef6791 100644 --- a/model/mysql_query_piece.go +++ b/model/mysql_query_piece.go @@ -62,7 +62,7 @@ func (mqp *MysqlQueryPiece) Bytes() (content []byte) { return mqp.jsonContent } - mqp.jsonContent = marsharQueryPiece(mqp) + mqp.jsonContent = marsharQueryPieceMonopolize(mqp) return mqp.jsonContent } diff --git a/model/query_piece.go b/model/query_piece.go index e0d3a74..80f207d 100644 --- a/model/query_piece.go +++ b/model/query_piece.go @@ -1,13 +1,15 @@ package model import ( - // "github.com/json-iterator/go" "bytes" "encoding/json" + jsoniter "github.com/json-iterator/go" "github.com/pingcap/tidb/util/hack" "time" ) +var jsonIterator = jsoniter.ConfigCompatibleWithStandardLibrary + type QueryPiece interface { String() *string Bytes() []byte @@ -70,7 +72,7 @@ func (bqp *BaseQueryPiece) Bytes() (content []byte) { return bqp.jsonContent } - bqp.jsonContent = marsharQueryPiece(bqp) + bqp.jsonContent = marsharQueryPieceMonopolize(bqp) return bqp.jsonContent } @@ -81,7 +83,7 @@ func (bqp *BaseQueryPiece) GetSQL() (*string) { func (bqp *BaseQueryPiece) Recovery() { } -func marsharQueryPiece(qp interface{}) []byte { +func marsharQueryPieceShareMemory(qp interface{}) []byte { var cacheBuffer = localSliceBufferPool.Dequeue() if len(cacheBuffer) > 0 { panic("there already have bytes in buffer") @@ -95,3 +97,12 @@ func marsharQueryPiece(qp interface{}) []byte { return buffer.Bytes() } + +func marsharQueryPieceMonopolize(qp interface{}) (content []byte) { + content, err := jsonIterator.Marshal(qp) + if err != nil { + return []byte(err.Error()) + } + + return content +} \ No newline at end of file diff --git a/vendor/github.com/zr-hebo/util-db/query_db.go b/vendor/github.com/zr-hebo/util-db/query_db.go index 574ae53..64b0f22 100644 --- a/vendor/github.com/zr-hebo/util-db/query_db.go +++ b/vendor/github.com/zr-hebo/util-db/query_db.go @@ -43,12 +43,14 @@ type MysqlDB struct { DatabaseType string DBName string ConnectTimeout int + QueryTimeout int } // NewMysqlDB 创建MySQL数据库 func NewMysqlDB() (md *MysqlDB) { md = new(MysqlDB) md.DatabaseType = dbTypeMysql + md.QueryTimeout = 5 return } @@ -352,7 +354,7 @@ func (md *MysqlDB) fillConnStr() string { md.UserName, md.Passwd, md.IP, md.Port, md.DBName) if md.ConnectTimeout > 0 { dbServerInfoStr = fmt.Sprintf("%s?timeout=%ds&readTimeout=%ds&writeTimeout=%ds", - dbServerInfoStr, md.ConnectTimeout, md.ConnectTimeout, md.ConnectTimeout) + dbServerInfoStr, md.ConnectTimeout, md.QueryTimeout, md.QueryTimeout) } return dbServerInfoStr diff --git a/vendor/github.com/zr-hebo/util-db/query_pooled_db.go b/vendor/github.com/zr-hebo/util-db/query_pooled_db.go index 051c35d..dbe820e 100644 --- a/vendor/github.com/zr-hebo/util-db/query_pooled_db.go +++ b/vendor/github.com/zr-hebo/util-db/query_pooled_db.go @@ -6,7 +6,6 @@ import ( "time" ) - // PooledMysqlDB Mysql主机实例 type PooledMysqlDB struct { MysqlDB @@ -46,6 +45,7 @@ func NewPooledMysqlDBWithAllParam( func NewPooledMysqlDB() (pmd *PooledMysqlDB) { pmd = new(PooledMysqlDB) pmd.DatabaseType = dbTypeMysql + pmd.QueryTimeout = 5 pmd.lock = new(sync.Mutex) return } From 1d64e41b58b9557b1a5b8b2413f7bfcfa9a7baa5 Mon Sep 17 00:00:00 2001 From: hebo Date: Tue, 8 Oct 2019 16:01:15 +0800 Subject: [PATCH 13/17] do not recover cache --- model/mysql_query_piece.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model/mysql_query_piece.go b/model/mysql_query_piece.go index 5ef6791..2261ac3 100644 --- a/model/mysql_query_piece.go +++ b/model/mysql_query_piece.go @@ -71,7 +71,7 @@ func (mqp *MysqlQueryPiece) GetSQL() (str *string) { } func (pmqp *PooledMysqlQueryPiece) Recovery() { - pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0]) + // pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0]) pmqp.jsonContent = nil pmqp.recoverPool.Enqueue(pmqp) } From d019ac76cff8a1a36a3333dfb095d8f421570861 Mon Sep 17 00:00:00 2001 From: hebo Date: Tue, 8 Oct 2019 17:54:25 +0800 Subject: [PATCH 14/17] fix bug of recover early --- model/mysql_query_piece.go | 7 ++++++- model/query_piece_pool.go | 1 - session-dealer/mysql/session.go | 8 ++++++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/model/mysql_query_piece.go b/model/mysql_query_piece.go index 2261ac3..6998835 100644 --- a/model/mysql_query_piece.go +++ b/model/mysql_query_piece.go @@ -62,10 +62,15 @@ func (mqp *MysqlQueryPiece) Bytes() (content []byte) { return mqp.jsonContent } - mqp.jsonContent = marsharQueryPieceMonopolize(mqp) + mqp.GenerateJsonBytes() return mqp.jsonContent } +func (mqp *MysqlQueryPiece) GenerateJsonBytes() { + mqp.jsonContent = marsharQueryPieceMonopolize(mqp) + return +} + func (mqp *MysqlQueryPiece) GetSQL() (str *string) { return mqp.QuerySQL } diff --git a/model/query_piece_pool.go b/model/query_piece_pool.go index c3eaa0f..3155cd5 100644 --- a/model/query_piece_pool.go +++ b/model/query_piece_pool.go @@ -19,7 +19,6 @@ func (mqpp *mysqlQueryPiecePool) Enqueue(pmqp *PooledMysqlQueryPiece) { mqpp.lock.Lock() defer mqpp.lock.Unlock() - mqpp.queue <- pmqp } diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 6640479..472b65b 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -301,10 +301,14 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { } } - return filterQueryPieceBySQL(mqp, querySQLInBytes) + mqp = filterQueryPieceBySQL(mqp, querySQLInBytes) + if mqp != nil { + mqp.GenerateJsonBytes() + } + return mqp } -func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (model.QueryPiece) { +func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (*model.PooledMysqlQueryPiece) { if mqp == nil || querySQL == nil { return nil From 3fa975a29bb283ae3fba4c4c35aedfd6b7563fc6 Mon Sep 17 00:00:00 2001 From: hebo Date: Wed, 9 Oct 2019 09:31:39 +0800 Subject: [PATCH 15/17] fix bug of nil pointer --- session-dealer/mysql/session.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 472b65b..26c91fa 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -302,9 +302,10 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { } mqp = filterQueryPieceBySQL(mqp, querySQLInBytes) - if mqp != nil { - mqp.GenerateJsonBytes() + if mqp == nil { + return nil } + mqp.GenerateJsonBytes() return mqp } From 39b5661e9fecbd5e70760b68feeb0ec9f24de14b Mon Sep 17 00:00:00 2001 From: hebo Date: Wed, 9 Oct 2019 10:42:49 +0800 Subject: [PATCH 16/17] change capture rate --- capture/network.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/capture/network.go b/capture/network.go index d5ee4b0..e9bada0 100644 --- a/capture/network.go +++ b/capture/network.go @@ -104,9 +104,6 @@ func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { return nc.receiver } -func isFINPacket(data []byte) (isFIN bool) { - return -} // Listen get a connection. func (nc *networkCard) listenNormal() { @@ -119,6 +116,18 @@ func (nc *networkCard) listenNormal() { var ci gopacket.CaptureInfo var err error + // capture packets according to a certain probability + capturePacketRate := communicator.GetMysqlCapturePacketRate() + if capturePacketRate <= 0 { + time.Sleep(time.Second*1) + aliveCounter += 1 + if aliveCounter >= checkCount { + aliveCounter = 0 + nc.receiver <- model.NewBaseQueryPiece(localIPAddr, nc.listenPort, capturePacketRate) + } + continue + } + data, ci, err = handler.ZeroCopyReadPacketData() if err != nil { log.Error(err.Error()) @@ -138,21 +147,10 @@ func (nc *networkCard) listenNormal() { continue } - // capture packets according to a certain probability - tcpCapturePacketRate := communicator.GetTCPCapturePacketRate() - if tcpCapturePacketRate <= 0 { - time.Sleep(time.Second*1) - aliveCounter += 1 - if aliveCounter >= checkCount { - aliveCounter = 0 - nc.receiver <- model.NewBaseQueryPiece(localIPAddr, nc.listenPort, tcpCapturePacketRate) - } - continue - - } else if 0 < tcpCapturePacketRate && tcpCapturePacketRate < 1.0 { + if 0 < capturePacketRate && capturePacketRate < 1.0 { // fall into throw range rn := rand.Float64() - if rn > tcpCapturePacketRate { + if rn > capturePacketRate { continue } } From fce024683af3f8c02c81449f21e946f459599cfd Mon Sep 17 00:00:00 2001 From: hebo Date: Thu, 10 Oct 2019 09:49:46 +0800 Subject: [PATCH 17/17] fix bug of set tcp packet rate --- capture/network.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capture/network.go b/capture/network.go index e9bada0..33cdedc 100644 --- a/capture/network.go +++ b/capture/network.go @@ -117,7 +117,7 @@ func (nc *networkCard) listenNormal() { var err error // capture packets according to a certain probability - capturePacketRate := communicator.GetMysqlCapturePacketRate() + capturePacketRate := communicator.GetTCPCapturePacketRate() if capturePacketRate <= 0 { time.Sleep(time.Second*1) aliveCounter += 1