add query qps api

This commit is contained in:
hebo 2019-12-27 17:50:25 +08:00
parent 467b0681c6
commit d576e085ff
5 changed files with 107 additions and 18 deletions

View File

@ -9,6 +9,7 @@ import (
const ( const (
CAPTURE_PACKET_RATE = "capture_packet_rate" CAPTURE_PACKET_RATE = "capture_packet_rate"
QPS = "qps"
) )
var ( var (
@ -38,4 +39,5 @@ func init() {
func regsiterConfig() { func regsiterConfig() {
configMap[CAPTURE_PACKET_RATE] = catpurePacketRate configMap[CAPTURE_PACKET_RATE] = catpurePacketRate
configMap[QPS] = &qpsConfig{}
} }

82
communicator/qps.go Normal file
View File

@ -0,0 +1,82 @@
package communicator
import (
"fmt"
"sync"
"time"
)
const (
CACHE_SIZE = 1024
STATISTIC_SECONDS int64 = 5
)
var (
execTimeChan chan int64
execTimeCache []int64
qpsLock sync.Mutex
)
func init() {
execTimeChan = make(chan int64, 256)
execTimeCache = make([]int64, 0, CACHE_SIZE)
go updateCachedExecTime()
}
type qpsConfig struct{}
func (qc *qpsConfig) setVal(val interface{}) (err error) {
err = fmt.Errorf("cannot set QPS on sniffer")
return
}
func (qc *qpsConfig) getVal() (val interface{}) {
return computeQPS()
}
func ReceiveExecTime(execTime int64) {
select {
case execTimeChan <- execTime:
default:
return
}
}
func updateCachedExecTime() {
for et := range execTimeChan {
qpsLock.Lock()
execTimeCache = append(execTimeCache, et)
if len(execTimeCache) > CACHE_SIZE {
execTimeCache = execTimeCache[1:]
}
qpsLock.Unlock()
}
}
func computeQPS() (qps int64) {
qpsLock.Lock()
defer qpsLock.Unlock()
// only deal execute time last 10 second
nowNano := time.Now().UnixNano()
lastTimeNano := nowNano - time.Second.Nanoseconds()*STATISTIC_SECONDS
minExecTimeNano := nowNano
var recentRecordNum int64
for _, et := range execTimeCache {
// ignore execute time before 10 second
if et < lastTimeNano {
continue
}
recentRecordNum += 1
if et < minExecTimeNano {
minExecTimeNano = et
}
}
if recentRecordNum < 1 || nowNano == minExecTimeNano {
return 0
}
return time.Second.Nanoseconds() / ((nowNano - minExecTimeNano) / recentRecordNum)
}

View File

@ -21,3 +21,8 @@ curl 'http://127.0.0.1:8088/get_config?config_name=capture_packet_rate'
curl -XPOST -d'{"config_name":"capture_packet_rate","value":0.01}' 'http://127.0.0.1:8088/set_config?config_name=capture_packet_rate' curl -XPOST -d'{"config_name":"capture_packet_rate","value":0.01}' 'http://127.0.0.1:8088/set_config?config_name=capture_packet_rate'
``` ```
#### Get QPS
为了调整抓包率sniffer提供了实时查询qps的功能
```
curl 'http://127.0.0.1:8088/get_config?config_name=qps'
```

View File

@ -12,11 +12,10 @@ type PooledMysqlQueryPiece struct {
func NewPooledMysqlQueryPiece( func NewPooledMysqlQueryPiece(
sessionID, clientIP, visitUser, visitDB, serverIP *string, sessionID, clientIP, visitUser, visitDB, serverIP *string,
clientPort, serverPort int, throwPacketRate float64, stmtBeginTime int64) ( clientPort, serverPort int, throwPacketRate float64, stmtBeginTimeNano int64) (
pmqp *PooledMysqlQueryPiece) { pmqp *PooledMysqlQueryPiece) {
pmqp = mqpp.Dequeue() pmqp = mqpp.Dequeue()
nowInMS := time.Now().UnixNano() / millSecondUnit
pmqp.SessionID = sessionID pmqp.SessionID = sessionID
pmqp.ClientHost = clientIP pmqp.ClientHost = clientIP
pmqp.ClientPort = clientPort pmqp.ClientPort = clientPort
@ -26,8 +25,8 @@ func NewPooledMysqlQueryPiece(
pmqp.VisitDB = visitDB pmqp.VisitDB = visitDB
pmqp.SyncSend = false pmqp.SyncSend = false
pmqp.CapturePacketRate = throwPacketRate pmqp.CapturePacketRate = throwPacketRate
pmqp.EventTime = stmtBeginTime pmqp.EventTime = stmtBeginTimeNano / millSecondUnit
pmqp.CostTimeInMS = nowInMS - stmtBeginTime pmqp.CostTimeInMS = (time.Now().UnixNano() - stmtBeginTimeNano) / millSecondUnit
pmqp.recoverPool = mqpp pmqp.recoverPool = mqpp
return return

View File

@ -12,16 +12,16 @@ import (
) )
type MysqlSession struct { type MysqlSession struct {
connectionID *string connectionID *string
visitUser *string visitUser *string
visitDB *string visitDB *string
clientIP *string clientIP *string
clientPort int clientPort int
srcIP *string srcIP *string
srcPort int srcPort int
serverIP *string serverIP *string
serverPort int serverPort int
stmtBeginTime int64 stmtBeginTimeNano int64
// packageOffset int64 // packageOffset int64
beginSeqID int64 beginSeqID int64
endSeqID int64 endSeqID int64
@ -58,7 +58,7 @@ func NewMysqlSession(
srcPort: srcPort, srcPort: srcPort,
serverIP: serverIP, serverIP: serverIP,
serverPort: serverPort, serverPort: serverPort,
stmtBeginTime: time.Now().UnixNano() / millSecondUnit, stmtBeginTimeNano: time.Now().UnixNano(),
cachedPrepareStmt: make(map[int][]byte, 8), cachedPrepareStmt: make(map[int][]byte, 8),
queryPieceReceiver: receiver, queryPieceReceiver: receiver,
closeConn: make(chan bool, 1), closeConn: make(chan bool, 1),
@ -100,7 +100,7 @@ func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) {
} }
func (ms *MysqlSession) resetBeginTime() { func (ms *MysqlSession) resetBeginTime() {
ms.stmtBeginTime = time.Now().UnixNano() / millSecondUnit ms.stmtBeginTimeNano = time.Now().UnixNano()
} }
func (ms *MysqlSession) readFromServer(respSeq int64, bytes []byte) { func (ms *MysqlSession) readFromServer(respSeq int64, bytes []byte) {
@ -318,7 +318,8 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) {
if mqp == nil { if mqp == nil {
return nil return nil
} }
mqp.GenerateJsonBytes()
communicator.ReceiveExecTime(ms.stmtBeginTimeNano)
return mqp return mqp
} }
@ -346,5 +347,5 @@ func (ms *MysqlSession) composeQueryPiece() (mqp *model.PooledMysqlQueryPiece) {
} }
return model.NewPooledMysqlQueryPiece( return model.NewPooledMysqlQueryPiece(
ms.connectionID, clientIP, ms.visitUser, ms.visitDB, ms.serverIP, ms.connectionID, clientIP, ms.visitUser, ms.visitDB, ms.serverIP,
clientPort, ms.serverPort, communicator.GetMysqlCapturePacketRate(), ms.stmtBeginTime) clientPort, ms.serverPort, communicator.GetMysqlCapturePacketRate(), ms.stmtBeginTimeNano)
} }