From d576e085ff078b0ddabb77a85950eeb63aff0940 Mon Sep 17 00:00:00 2001 From: hebo Date: Fri, 27 Dec 2019 17:50:25 +0800 Subject: [PATCH 1/2] add query qps api --- communicator/config.go | 2 + communicator/qps.go | 82 +++++++++++++++++++++++++++++++++ docs/capture_rate.md | 5 ++ model/query_piece_pool.go | 7 ++- session-dealer/mysql/session.go | 29 ++++++------ 5 files changed, 107 insertions(+), 18 deletions(-) create mode 100644 communicator/qps.go diff --git a/communicator/config.go b/communicator/config.go index baee665..0a1fe05 100644 --- a/communicator/config.go +++ b/communicator/config.go @@ -9,6 +9,7 @@ import ( const ( CAPTURE_PACKET_RATE = "capture_packet_rate" + QPS = "qps" ) var ( @@ -38,4 +39,5 @@ func init() { func regsiterConfig() { configMap[CAPTURE_PACKET_RATE] = catpurePacketRate + configMap[QPS] = &qpsConfig{} } diff --git a/communicator/qps.go b/communicator/qps.go new file mode 100644 index 0000000..08d736c --- /dev/null +++ b/communicator/qps.go @@ -0,0 +1,82 @@ +package communicator + +import ( + "fmt" + "sync" + "time" +) + +const ( + CACHE_SIZE = 1024 + STATISTIC_SECONDS int64 = 5 +) + +var ( + execTimeChan chan int64 + execTimeCache []int64 + qpsLock sync.Mutex +) + +func init() { + execTimeChan = make(chan int64, 256) + execTimeCache = make([]int64, 0, CACHE_SIZE) + go updateCachedExecTime() +} + +type qpsConfig struct{} + +func (qc *qpsConfig) setVal(val interface{}) (err error) { + err = fmt.Errorf("cannot set QPS on sniffer") + return +} + +func (qc *qpsConfig) getVal() (val interface{}) { + return computeQPS() +} + +func ReceiveExecTime(execTime int64) { + select { + case execTimeChan <- execTime: + default: + return + } +} + +func updateCachedExecTime() { + for et := range execTimeChan { + qpsLock.Lock() + execTimeCache = append(execTimeCache, et) + if len(execTimeCache) > CACHE_SIZE { + execTimeCache = execTimeCache[1:] + } + qpsLock.Unlock() + } +} + +func computeQPS() (qps int64) { + qpsLock.Lock() + defer qpsLock.Unlock() + + // only deal execute time last 10 second + nowNano := time.Now().UnixNano() + lastTimeNano := nowNano - time.Second.Nanoseconds()*STATISTIC_SECONDS + minExecTimeNano := nowNano + var recentRecordNum int64 + for _, et := range execTimeCache { + // ignore execute time before 10 second + if et < lastTimeNano { + continue + } + + recentRecordNum += 1 + if et < minExecTimeNano { + minExecTimeNano = et + } + } + + if recentRecordNum < 1 || nowNano == minExecTimeNano { + return 0 + } + + return time.Second.Nanoseconds() / ((nowNano - minExecTimeNano) / recentRecordNum) +} diff --git a/docs/capture_rate.md b/docs/capture_rate.md index fd64239..7a74f47 100644 --- a/docs/capture_rate.md +++ b/docs/capture_rate.md @@ -21,3 +21,8 @@ curl 'http://127.0.0.1:8088/get_config?config_name=capture_packet_rate' curl -XPOST -d'{"config_name":"capture_packet_rate","value":0.01}' 'http://127.0.0.1:8088/set_config?config_name=capture_packet_rate' ``` +#### Get QPS +为了调整抓包率,sniffer提供了实时查询qps的功能 +``` +curl 'http://127.0.0.1:8088/get_config?config_name=qps' +``` \ No newline at end of file diff --git a/model/query_piece_pool.go b/model/query_piece_pool.go index 358c29f..ff1fed7 100644 --- a/model/query_piece_pool.go +++ b/model/query_piece_pool.go @@ -12,11 +12,10 @@ type PooledMysqlQueryPiece struct { func NewPooledMysqlQueryPiece( sessionID, clientIP, visitUser, visitDB, serverIP *string, - clientPort, serverPort int, throwPacketRate float64, stmtBeginTime int64) ( + clientPort, serverPort int, throwPacketRate float64, stmtBeginTimeNano int64) ( pmqp *PooledMysqlQueryPiece) { pmqp = mqpp.Dequeue() - nowInMS := time.Now().UnixNano() / millSecondUnit pmqp.SessionID = sessionID pmqp.ClientHost = clientIP pmqp.ClientPort = clientPort @@ -26,8 +25,8 @@ func NewPooledMysqlQueryPiece( pmqp.VisitDB = visitDB pmqp.SyncSend = false pmqp.CapturePacketRate = throwPacketRate - pmqp.EventTime = stmtBeginTime - pmqp.CostTimeInMS = nowInMS - stmtBeginTime + pmqp.EventTime = stmtBeginTimeNano / millSecondUnit + pmqp.CostTimeInMS = (time.Now().UnixNano() - stmtBeginTimeNano) / millSecondUnit pmqp.recoverPool = mqpp return diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 7016f00..270587c 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -12,16 +12,16 @@ import ( ) type MysqlSession struct { - connectionID *string - visitUser *string - visitDB *string - clientIP *string - clientPort int - srcIP *string - srcPort int - serverIP *string - serverPort int - stmtBeginTime int64 + connectionID *string + visitUser *string + visitDB *string + clientIP *string + clientPort int + srcIP *string + srcPort int + serverIP *string + serverPort int + stmtBeginTimeNano int64 // packageOffset int64 beginSeqID int64 endSeqID int64 @@ -58,7 +58,7 @@ func NewMysqlSession( srcPort: srcPort, serverIP: serverIP, serverPort: serverPort, - stmtBeginTime: time.Now().UnixNano() / millSecondUnit, + stmtBeginTimeNano: time.Now().UnixNano(), cachedPrepareStmt: make(map[int][]byte, 8), queryPieceReceiver: receiver, closeConn: make(chan bool, 1), @@ -100,7 +100,7 @@ func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) { } func (ms *MysqlSession) resetBeginTime() { - ms.stmtBeginTime = time.Now().UnixNano() / millSecondUnit + ms.stmtBeginTimeNano = time.Now().UnixNano() } func (ms *MysqlSession) readFromServer(respSeq int64, bytes []byte) { @@ -318,7 +318,8 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { if mqp == nil { return nil } - mqp.GenerateJsonBytes() + + communicator.ReceiveExecTime(ms.stmtBeginTimeNano) return mqp } @@ -346,5 +347,5 @@ func (ms *MysqlSession) composeQueryPiece() (mqp *model.PooledMysqlQueryPiece) { } return model.NewPooledMysqlQueryPiece( ms.connectionID, clientIP, ms.visitUser, ms.visitDB, ms.serverIP, - clientPort, ms.serverPort, communicator.GetMysqlCapturePacketRate(), ms.stmtBeginTime) + clientPort, ms.serverPort, communicator.GetMysqlCapturePacketRate(), ms.stmtBeginTimeNano) } From 4114906ece7f0265bee736db6c937af17b5004d4 Mon Sep 17 00:00:00 2001 From: hebo Date: Fri, 27 Dec 2019 18:15:46 +0800 Subject: [PATCH 2/2] =?UTF-8?q?qps=E8=80=83=E8=99=91=E6=8A=93=E5=8C=85?= =?UTF-8?q?=E7=8E=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- communicator/qps.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/communicator/qps.go b/communicator/qps.go index 08d736c..42fd6ae 100644 --- a/communicator/qps.go +++ b/communicator/qps.go @@ -2,6 +2,7 @@ package communicator import ( "fmt" + "math" "sync" "time" ) @@ -54,6 +55,10 @@ func updateCachedExecTime() { } func computeQPS() (qps int64) { + if catpurePacketRate.mysqlCPR <= 0 { + return 0 + } + qpsLock.Lock() defer qpsLock.Unlock() @@ -78,5 +83,8 @@ func computeQPS() (qps int64) { return 0 } - return time.Second.Nanoseconds() / ((nowNano - minExecTimeNano) / recentRecordNum) + qpsVal := float64(time.Second.Nanoseconds() / + ((nowNano - minExecTimeNano) / recentRecordNum)) / + catpurePacketRate.mysqlCPR + return int64(math.Floor(qpsVal)) }