From 92e592ed738de23846ef84b99a89d21ddff903c7 Mon Sep 17 00:00:00 2001 From: hebo Date: Tue, 10 Sep 2019 18:34:40 +0800 Subject: [PATCH 1/2] add doc --- docs/capture_rate.md | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 docs/capture_rate.md diff --git a/docs/capture_rate.md b/docs/capture_rate.md new file mode 100644 index 0000000..e69de29 From a9f0ee309eb2f270e540d5b08ab010e72b5b6f1f Mon Sep 17 00:00:00 2001 From: hebo Date: Tue, 24 Sep 2019 22:40:17 +0800 Subject: [PATCH 2/2] deal uncached prepare qps --- capture/const.go | 4 +- capture/network.go | 24 ++++-- model/mysql_query_piece.go | 77 +++++++++++++++++++ model/query_piece.go | 129 +++++++++++++------------------- session-dealer/mysql/config.go | 2 + session-dealer/mysql/session.go | 12 ++- 6 files changed, 159 insertions(+), 89 deletions(-) create mode 100644 model/mysql_query_piece.go diff --git a/capture/const.go b/capture/const.go index 647d52b..357d8bf 100644 --- a/capture/const.go +++ b/capture/const.go @@ -1 +1,3 @@ -package capture \ No newline at end of file +package capture + +const checkCount = 30 \ No newline at end of file diff --git a/capture/network.go b/capture/network.go index f7d715e..a981347 100644 --- a/capture/network.go +++ b/capture/network.go @@ -106,20 +106,22 @@ func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { // Listen get a connection. func (nc *networkCard) listenNormal() { go func() { + aliveCounter := 0 handler := initEthernetHandlerFromPacpgo() for { var data []byte - data, ci, err := handler.ZeroCopyReadPacketData() - if err != nil { - log.Error(err.Error()) - time.Sleep(time.Second*3) - continue - } + var ci gopacket.CaptureInfo + var err error // capture packets according to a certain probability capturePacketRate := communicator.GetTCPCapturePacketRate() if capturePacketRate <= 0 { - time.Sleep(time.Second*3) + time.Sleep(time.Second*1) + aliveCounter += 1 + if aliveCounter >= checkCount { + aliveCounter = 0 + nc.receiver <- model.NewBaseQueryPiece(localIPAddr, nc.listenPort, capturePacketRate) + } continue } else if 0 < capturePacketRate && capturePacketRate < 1.0 { @@ -130,6 +132,14 @@ func (nc *networkCard) listenNormal() { } } + aliveCounter = 0 + data, ci, err = handler.ZeroCopyReadPacketData() + if err != nil { + log.Error(err.Error()) + time.Sleep(time.Second*3) + continue + } + packet := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.NoCopy) m := packet.Metadata() m.CaptureInfo = ci diff --git a/model/mysql_query_piece.go b/model/mysql_query_piece.go new file mode 100644 index 0000000..bebe694 --- /dev/null +++ b/model/mysql_query_piece.go @@ -0,0 +1,77 @@ +package model + +import ( + "github.com/pingcap/tidb/util/hack" + "time" +) + +// MysqlQueryPiece 查询信息 +type MysqlQueryPiece struct { + BaseQueryPiece + + SessionID *string `json:"cid"` + ClientHost *string `json:"-"` + ClientPort int `json:"-"` + + VisitUser *string `json:"user"` + VisitDB *string `json:"db"` + QuerySQL *string `json:"sql"` + CostTimeInMS int64 `json:"cms"` +} + +type PooledMysqlQueryPiece struct { + MysqlQueryPiece + recoverPool *mysqlQueryPiecePool + sliceBufferPool *sliceBufferPool +} + +func NewPooledMysqlQueryPiece( + sessionID, clientIP, visitUser, visitDB, clientHost, serverIP *string, + clientPort, serverPort int, throwPacketRate float64, stmtBeginTime int64) ( + mqp *PooledMysqlQueryPiece) { + mqp = mqpp.Dequeue() + + nowInMS := time.Now().UnixNano() / millSecondUnit + mqp.SessionID = sessionID + mqp.ClientHost = clientIP + mqp.ClientPort = clientPort + mqp.ClientHost = clientHost + mqp.ServerIP = serverIP + mqp.ServerPort = serverPort + mqp.VisitUser = visitUser + mqp.VisitDB = visitDB + mqp.SyncSend = false + mqp.ThrowPacketRate = throwPacketRate + mqp.BeginTime = stmtBeginTime + mqp.CostTimeInMS = nowInMS - stmtBeginTime + mqp.recoverPool = mqpp + mqp.sliceBufferPool = localSliceBufferPool + + return +} + +func (mqp *MysqlQueryPiece) String() (*string) { + content := mqp.Bytes() + contentStr := hack.String(content) + return &contentStr +} + +func (mqp *MysqlQueryPiece) Bytes() (content []byte) { + // content, err := json.Marshal(mqp) + if len(mqp.jsonContent) > 0 { + return mqp.jsonContent + } + + mqp.jsonContent = marsharQueryPiece(mqp) + return mqp.jsonContent +} + +func (mqp *MysqlQueryPiece) GetSQL() (str *string) { + return mqp.QuerySQL +} + +func (pmqp *PooledMysqlQueryPiece) Recovery() { + pmqp.recoverPool.Enqueue(pmqp) + pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0]) + pmqp.jsonContent = nil +} diff --git a/model/query_piece.go b/model/query_piece.go index f70ec28..440a664 100644 --- a/model/query_piece.go +++ b/model/query_piece.go @@ -1,13 +1,11 @@ package model import ( - "bytes" - - "encoding/json" // "github.com/json-iterator/go" - "time" - + "bytes" + "encoding/json" "github.com/pingcap/tidb/util/hack" + "time" ) type QueryPiece interface { @@ -18,28 +16,14 @@ type QueryPiece interface { Recovery() } -// MysqlQueryPiece 查询信息 -type MysqlQueryPiece struct { - SessionID *string `json:"cid"` - ClientHost *string `json:"-"` - ClientPort int `json:"-"` - SyncSend bool `json:"-"` - ServerIP *string `json:"sip"` - ServerPort int `json:"sport"` - VisitUser *string `json:"user"` - VisitDB *string `json:"db"` - QuerySQL *string `json:"sql"` - ThrowPacketRate float64 `json:"tpr"` - BeginTime int64 `json:"bt"` - CostTimeInMS int64 `json:"cms"` - - jsonContent []byte `json:"-"` -} - -type PooledMysqlQueryPiece struct { - MysqlQueryPiece - recoverPool *mysqlQueryPiecePool - sliceBufferPool *sliceBufferPool +// BaseQueryPiece 查询信息 +type BaseQueryPiece struct { + SyncSend bool `json:"-"` + ServerIP *string `json:"sip"` + ServerPort int `json:"sport"` + ThrowPacketRate float64 `json:"tpr"` + BeginTime int64 `json:"bt"` + jsonContent []byte `json:"-"` } const ( @@ -47,74 +31,63 @@ const ( ) var ( - mqpp = NewMysqlQueryPiecePool() + mqpp = NewMysqlQueryPiecePool() localSliceBufferPool = NewSliceBufferPool("json cache", 2*1024*1024) ) -func NewPooledMysqlQueryPiece( - sessionID, clientIP, visitUser, visitDB, clientHost, serverIP *string, - clientPort, serverPort int, throwPacketRate float64, stmtBeginTime int64) ( - mqp *PooledMysqlQueryPiece) { - mqp = mqpp.Dequeue() +var commonBaseQueryPiece = &BaseQueryPiece{} - nowInMS := time.Now().UnixNano() / millSecondUnit - mqp.SessionID = sessionID - mqp.ClientHost = clientIP - mqp.ClientPort = clientPort - mqp.ClientHost = clientHost - mqp.ServerIP = serverIP - mqp.ServerPort = serverPort - mqp.VisitUser = visitUser - mqp.VisitDB = visitDB - mqp.SyncSend = false - mqp.ThrowPacketRate = throwPacketRate - mqp.BeginTime = stmtBeginTime - mqp.CostTimeInMS = nowInMS - stmtBeginTime - mqp.recoverPool = mqpp - mqp.sliceBufferPool = localSliceBufferPool +func NewBaseQueryPiece( + serverIP *string, serverPort int, throwPacketRate float64) ( + bqp *BaseQueryPiece) { + bqp = commonBaseQueryPiece + bqp.ServerIP = serverIP + bqp.ServerPort = serverPort + bqp.SyncSend = false + bqp.ThrowPacketRate = throwPacketRate + bqp.BeginTime = time.Now().UnixNano() / millSecondUnit return } -func (mqp *MysqlQueryPiece) String() (*string) { - content := mqp.Bytes() +func (bqp *BaseQueryPiece) NeedSyncSend() (bool) { + return bqp.SyncSend +} + +func (bqp *BaseQueryPiece) SetNeedSyncSend(syncSend bool) { + bqp.SyncSend = syncSend +} + +func (bqp *BaseQueryPiece) String() (*string) { + content := bqp.Bytes() contentStr := hack.String(content) return &contentStr } -func (mqp *MysqlQueryPiece) Bytes() (content []byte) { - // content, err := json.Marshal(mqp) - if len(mqp.jsonContent) > 0 { - return mqp.jsonContent +func (bqp *BaseQueryPiece) Bytes() (content []byte) { + // content, err := json.Marshal(bqp) + if bqp.jsonContent != nil && len(bqp.jsonContent) > 0 { + return bqp.jsonContent } + bqp.jsonContent = marsharQueryPiece(bqp) + return bqp.jsonContent +} + +func (bqp *BaseQueryPiece) GetSQL() (*string) { + return nil +} + +func (bqp *BaseQueryPiece) Recovery() { +} + +func marsharQueryPiece(qp interface{}) []byte { var cacheBuffer = localSliceBufferPool.Dequeue() buffer := bytes.NewBuffer(cacheBuffer) - err := json.NewEncoder(buffer).Encode(mqp) + err := json.NewEncoder(buffer).Encode(qp) if err != nil { - mqp.jsonContent = []byte(err.Error()) - - } else { - mqp.jsonContent = buffer.Bytes() + return []byte(err.Error()) } - return mqp.jsonContent -} - -func (mqp *MysqlQueryPiece) GetSQL() (str *string) { - return mqp.QuerySQL -} - -func (mqp *MysqlQueryPiece) NeedSyncSend() (bool) { - return mqp.SyncSend -} - -func (mqp *MysqlQueryPiece) SetNeedSyncSend(syncSend bool) { - mqp.SyncSend = syncSend -} - -func (pmqp *PooledMysqlQueryPiece) Recovery() { - pmqp.recoverPool.Enqueue(pmqp) - pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0]) - pmqp.jsonContent = nil + return buffer.Bytes() } diff --git a/session-dealer/mysql/config.go b/session-dealer/mysql/config.go index f622a4c..b03d16e 100644 --- a/session-dealer/mysql/config.go +++ b/session-dealer/mysql/config.go @@ -19,6 +19,8 @@ var ( coverRangePool = NewCoveragePool() localStmtCache = model.NewSliceBufferPool("statement cache", MaxMysqlPacketLen) + + PrepareStatement = []byte(":prepare") ) func init() { diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 4da966d..99f6a96 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -258,7 +258,8 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { case ComStmtPrepare: mqp = ms.composeQueryPiece() - querySQLInBytes = ms.cachedStmtBytes[1:] + querySQLInBytes = make([]byte, len(ms.cachedStmtBytes[1:])) + copy(querySQLInBytes, ms.cachedStmtBytes[1:]) querySQL := hack.String(querySQLInBytes) mqp.QuerySQL = &querySQL ms.cachedPrepareStmt[ms.prepareInfo.prepareStmtID] = querySQLInBytes @@ -267,10 +268,15 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { case ComStmtExecute: prepareStmtID := bytesToInt(ms.cachedStmtBytes[1:5]) mqp = ms.composeQueryPiece() - querySQLInBytes = ms.cachedPrepareStmt[prepareStmtID] + var ok bool + querySQLInBytes, ok = ms.cachedPrepareStmt[prepareStmtID] + if !ok { + querySQLInBytes = PrepareStatement + } querySQL := hack.String(querySQLInBytes) mqp.QuerySQL = &querySQL - log.Debugf("execute prepare statement:%d", prepareStmtID) + + // log.Debugf("execute prepare statement:%d", prepareStmtID) case ComStmtClose: prepareStmtID := bytesToInt(ms.cachedStmtBytes[1:5])