From 8db6c6f4fa7cd877828a27e99a9772fb3fafb22c Mon Sep 17 00:00:00 2001 From: hebo Date: Tue, 10 Dec 2019 19:31:11 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9buffer=20pool=E5=A4=A7?= =?UTF-8?q?=E5=B0=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 2 +- model/mysql_query_piece.go | 37 ----------------- model/query_piece.go | 15 +++---- model/query_piece_pool.go | 71 ++++++++++++++++++++++++++++++++- session-dealer/mysql/config.go | 4 +- session-dealer/mysql/session.go | 1 + {model => util}/cache_pool.go | 16 ++++---- 7 files changed, 87 insertions(+), 59 deletions(-) rename {model => util}/cache_pool.go (73%) diff --git a/main.go b/main.go index c8b3a8e..39c7972 100644 --- a/main.go +++ b/main.go @@ -17,7 +17,7 @@ var ( ) func init() { - flag.StringVar(&logLevel, "log_level", "info", "log level. Default is info") + flag.StringVar(&logLevel, "log_level", "warn", "log level. Default is info") } func initLog() { diff --git a/model/mysql_query_piece.go b/model/mysql_query_piece.go index a4587e7..fe19aaf 100644 --- a/model/mysql_query_piece.go +++ b/model/mysql_query_piece.go @@ -2,7 +2,6 @@ package model import ( "github.com/pingcap/tidb/util/hack" - "time" ) // MysqlQueryPiece 查询信息 @@ -19,36 +18,6 @@ type MysqlQueryPiece struct { CostTimeInMS int64 `json:"cms"` } -type PooledMysqlQueryPiece struct { - MysqlQueryPiece - recoverPool *mysqlQueryPiecePool - sliceBufferPool *sliceBufferPool -} - -func NewPooledMysqlQueryPiece( - sessionID, clientIP, visitUser, visitDB, serverIP *string, - clientPort, serverPort int, throwPacketRate float64, stmtBeginTime int64) ( - mqp *PooledMysqlQueryPiece) { - mqp = mqpp.Dequeue() - - nowInMS := time.Now().UnixNano() / millSecondUnit - mqp.SessionID = sessionID - mqp.ClientHost = clientIP - mqp.ClientPort = clientPort - mqp.ServerIP = serverIP - mqp.ServerPort = serverPort - mqp.VisitUser = visitUser - mqp.VisitDB = visitDB - mqp.SyncSend = false - mqp.CapturePacketRate = throwPacketRate - mqp.BeginTime = stmtBeginTime - mqp.CostTimeInMS = nowInMS - stmtBeginTime - mqp.recoverPool = mqpp - mqp.sliceBufferPool = localSliceBufferPool - - return -} - func (mqp *MysqlQueryPiece) String() (*string) { content := mqp.Bytes() contentStr := hack.String(content) @@ -73,9 +42,3 @@ func (mqp *MysqlQueryPiece) GenerateJsonBytes() { func (mqp *MysqlQueryPiece) GetSQL() (str *string) { return mqp.QuerySQL } - -func (pmqp *PooledMysqlQueryPiece) Recovery() { - // pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0]) - pmqp.jsonContent = nil - pmqp.recoverPool.Enqueue(pmqp) -} diff --git a/model/query_piece.go b/model/query_piece.go index c0375de..a99d446 100644 --- a/model/query_piece.go +++ b/model/query_piece.go @@ -24,7 +24,7 @@ type BaseQueryPiece struct { ServerIP *string `json:"sip"` ServerPort int `json:"sport"` CapturePacketRate float64 `json:"cpr"` - BeginTime int64 `json:"bt"` + EventTime int64 `json:"bt"` jsonContent []byte `json:"-"` } @@ -33,8 +33,7 @@ const ( ) var ( - mqpp = NewMysqlQueryPiecePool() - localSliceBufferPool = NewSliceBufferPool("json cache", 256*1024) + mqpp = NewMysqlQueryPiecePool() ) var commonBaseQueryPiece = &BaseQueryPiece{} @@ -47,7 +46,7 @@ func NewBaseQueryPiece( bqp.ServerPort = serverPort bqp.SyncSend = false bqp.CapturePacketRate = capturePacketRate - bqp.BeginTime = time.Now().UnixNano() / millSecondUnit + bqp.EventTime = time.Now().UnixNano() / millSecondUnit return } @@ -83,11 +82,7 @@ func (bqp *BaseQueryPiece) GetSQL() (*string) { func (bqp *BaseQueryPiece) Recovery() { } -func marsharQueryPieceShareMemory(qp interface{}) []byte { - var cacheBuffer = localSliceBufferPool.Dequeue() - if len(cacheBuffer) > 0 { - panic("there already have bytes in buffer") - } +func marsharQueryPieceShareMemory(qp interface{}, cacheBuffer []byte) []byte { buffer := bytes.NewBuffer(cacheBuffer) err := json.NewEncoder(buffer).Encode(qp) @@ -105,4 +100,4 @@ func marsharQueryPieceMonopolize(qp interface{}) (content []byte) { } return content -} \ No newline at end of file +} diff --git a/model/query_piece_pool.go b/model/query_piece_pool.go index de3fabf..7a58d67 100644 --- a/model/query_piece_pool.go +++ b/model/query_piece_pool.go @@ -1,9 +1,78 @@ package model import ( + "github.com/zr-hebo/sniffer-agent/util" "sync" + "time" ) +var ( + localSliceBufferPool = util.NewSliceBufferPool("json cache", (128+1)*1024) +) + +type PooledMysqlQueryPiece struct { + MysqlQueryPiece + recoverPool *mysqlQueryPiecePool + sliceBufferPool *util.SliceBufferPool +} + +func NewPooledMysqlQueryPiece( + sessionID, clientIP, visitUser, visitDB, serverIP *string, + clientPort, serverPort int, throwPacketRate float64, stmtBeginTime int64) ( + pmqp *PooledMysqlQueryPiece) { + pmqp = mqpp.Dequeue() + + pmqp.sliceBufferPool = localSliceBufferPool + nowInMS := time.Now().UnixNano() / millSecondUnit + pmqp.SessionID = sessionID + pmqp.ClientHost = clientIP + pmqp.ClientPort = clientPort + pmqp.ServerIP = serverIP + pmqp.ServerPort = serverPort + pmqp.VisitUser = visitUser + pmqp.VisitDB = visitDB + pmqp.SyncSend = false + pmqp.CapturePacketRate = throwPacketRate + pmqp.EventTime = stmtBeginTime + pmqp.CostTimeInMS = nowInMS - stmtBeginTime + pmqp.recoverPool = mqpp + + return +} + +func (pmqp *PooledMysqlQueryPiece) Recovery() { + if pmqp.sliceBufferPool != nil { + pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0]) + } + pmqp.jsonContent = nil + pmqp.recoverPool.Enqueue(pmqp) +} + +func (pmqp *PooledMysqlQueryPiece) Bytes() (content []byte) { + // content, err := json.Marshal(mqp) + if len(pmqp.jsonContent) > 0 { + return pmqp.jsonContent + } + + pmqp.GenerateJsonBytes() + return pmqp.jsonContent +} + +func (pmqp *PooledMysqlQueryPiece) GenerateJsonBytes() { + if pmqp.sliceBufferPool == nil { + pmqp.jsonContent = marsharQueryPieceMonopolize(pmqp) + return + } + + var cacheBuffer = pmqp.sliceBufferPool.Dequeue() + if len(cacheBuffer) > 0 { + panic("there already have bytes in buffer") + } + + pmqp.jsonContent = marsharQueryPieceShareMemory(pmqp, cacheBuffer) + return +} + type mysqlQueryPiecePool struct { queue chan *PooledMysqlQueryPiece lock sync.Mutex @@ -11,7 +80,7 @@ type mysqlQueryPiecePool struct { func NewMysqlQueryPiecePool() (mqpp *mysqlQueryPiecePool) { return &mysqlQueryPiecePool{ - queue: make(chan *PooledMysqlQueryPiece, 1024), + queue: make(chan *PooledMysqlQueryPiece, 512), } } diff --git a/session-dealer/mysql/config.go b/session-dealer/mysql/config.go index b03d16e..a12eeed 100644 --- a/session-dealer/mysql/config.go +++ b/session-dealer/mysql/config.go @@ -3,7 +3,7 @@ package mysql import ( "flag" "fmt" - "github.com/zr-hebo/sniffer-agent/model" + "github.com/zr-hebo/sniffer-agent/util" "regexp" ) @@ -18,7 +18,7 @@ var ( adminPasswd string coverRangePool = NewCoveragePool() - localStmtCache = model.NewSliceBufferPool("statement cache", MaxMysqlPacketLen) + localStmtCache = util.NewSliceBufferPool("statement cache", MaxMysqlPacketLen) PrepareStatement = []byte(":prepare") ) diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 79440b2..7016f00 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -159,6 +159,7 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { ms.expectReceiveSize = extractMysqlPayloadSize(bytes[:4]) // ignore too big mysql packet if ms.expectReceiveSize >= MaxMysqlPacketLen { + log.Infof("expect receive size is bigger than max deal size: %d", MaxMysqlPacketLen) return } diff --git a/model/cache_pool.go b/util/cache_pool.go similarity index 73% rename from model/cache_pool.go rename to util/cache_pool.go index 121d811..26e9ee9 100644 --- a/model/cache_pool.go +++ b/util/cache_pool.go @@ -1,4 +1,4 @@ -package model +package util import ( "fmt" @@ -6,22 +6,22 @@ import ( "unsafe" ) - -type sliceBufferPool struct { +// sliceBufferPool bytes buffer for reuse +type SliceBufferPool struct { queue chan []byte bufferSize int name string } -func NewSliceBufferPool(name string, bufferSize int) (sbp *sliceBufferPool) { - return &sliceBufferPool{ +func NewSliceBufferPool(name string, bufferSize int) (sbp *SliceBufferPool) { + return &SliceBufferPool{ queue: make(chan []byte, 256), bufferSize: bufferSize, name: name, } } -func (sbp *sliceBufferPool) Enqueue(buffer []byte) { +func (sbp *SliceBufferPool) Enqueue(buffer []byte) { // defer func() { // log.Debugf("after enqueue from %s, there is %d elements", sbp.name, len(sbp.queue)) // }() @@ -38,7 +38,7 @@ func (sbp *sliceBufferPool) Enqueue(buffer []byte) { } } -func (sbp *sliceBufferPool) DequeueWithInit(initSize int) (buffer []byte) { +func (sbp *SliceBufferPool) DequeueWithInit(initSize int) (buffer []byte) { if initSize >= sbp.bufferSize { panic(fmt.Sprintf("package size bigger than max buffer size need deal:%d", sbp.bufferSize)) @@ -54,7 +54,7 @@ func (sbp *sliceBufferPool) DequeueWithInit(initSize int) (buffer []byte) { return } -func (sbp *sliceBufferPool) Dequeue() (buffer []byte) { +func (sbp *SliceBufferPool) Dequeue() (buffer []byte) { // defer func() { // log.Debugf("after dequeue from %s, there is %d elements", sbp.name, len(sbp.queue)) // }()