sniffer-agent/model/query_piece_pool.go

114 lines
2.4 KiB
Go

package model
import (
"github.com/zr-hebo/sniffer-agent/util"
"sync"
"time"
)
var (
localSliceBufferPool = util.NewSliceBufferPool("json cache", (128+1)*1024)
)
type PooledMysqlQueryPiece struct {
MysqlQueryPiece
recoverPool *mysqlQueryPiecePool
sliceBufferPool *util.SliceBufferPool
}
func NewPooledMysqlQueryPiece(
sessionID, clientIP, visitUser, visitDB, serverIP *string,
clientPort, serverPort int, throwPacketRate float64, stmtBeginTime int64) (
pmqp *PooledMysqlQueryPiece) {
pmqp = mqpp.Dequeue()
pmqp.sliceBufferPool = localSliceBufferPool
nowInMS := time.Now().UnixNano() / millSecondUnit
pmqp.SessionID = sessionID
pmqp.ClientHost = clientIP
pmqp.ClientPort = clientPort
pmqp.ServerIP = serverIP
pmqp.ServerPort = serverPort
pmqp.VisitUser = visitUser
pmqp.VisitDB = visitDB
pmqp.SyncSend = false
pmqp.CapturePacketRate = throwPacketRate
pmqp.EventTime = stmtBeginTime
pmqp.CostTimeInMS = nowInMS - stmtBeginTime
pmqp.recoverPool = mqpp
return
}
func (pmqp *PooledMysqlQueryPiece) Recovery() {
if pmqp.sliceBufferPool != nil {
pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0])
}
pmqp.jsonContent = nil
pmqp.recoverPool.Enqueue(pmqp)
}
func (pmqp *PooledMysqlQueryPiece) Bytes() (content []byte) {
// content, err := json.Marshal(mqp)
if len(pmqp.jsonContent) > 0 {
return pmqp.jsonContent
}
pmqp.GenerateJsonBytes()
return pmqp.jsonContent
}
func (pmqp *PooledMysqlQueryPiece) GenerateJsonBytes() {
if pmqp.sliceBufferPool == nil {
pmqp.jsonContent = marsharQueryPieceMonopolize(pmqp)
return
}
var cacheBuffer = pmqp.sliceBufferPool.Dequeue()
if len(cacheBuffer) > 0 {
panic("there already have bytes in buffer")
}
pmqp.jsonContent = marsharQueryPieceShareMemory(pmqp, cacheBuffer)
return
}
type mysqlQueryPiecePool struct {
queue chan *PooledMysqlQueryPiece
lock sync.Mutex
}
func NewMysqlQueryPiecePool() (mqpp *mysqlQueryPiecePool) {
return &mysqlQueryPiecePool{
queue: make(chan *PooledMysqlQueryPiece, 512),
}
}
func (mqpp *mysqlQueryPiecePool) Enqueue(pmqp *PooledMysqlQueryPiece) {
mqpp.lock.Lock()
defer mqpp.lock.Unlock()
select {
case mqpp.queue <- pmqp:
return
default:
pmqp = nil
return
}
}
func (mqpp *mysqlQueryPiecePool) Dequeue() (pmqp *PooledMysqlQueryPiece) {
mqpp.lock.Lock()
defer mqpp.lock.Unlock()
select {
case pmqp = <- mqpp.queue:
return
default:
pmqp = &PooledMysqlQueryPiece{
MysqlQueryPiece: MysqlQueryPiece{},
}
return
}
}