use cache reduce malloc

This commit is contained in:
hebo 2019-09-09 21:36:18 +08:00
parent 3294a0fc5f
commit 4086028217
12 changed files with 280 additions and 169 deletions

1
.gitignore vendored
View File

@ -5,3 +5,4 @@ one_key.sh
sniffer-agent sniffer-agent
# vendor/github.com # vendor/github.com
profile* profile*
*.svg

View File

@ -117,7 +117,7 @@ func (nc *networkCard) listenNormal() {
} }
// throw packets according to a certain probability // throw packets according to a certain probability
throwPacketRate := communicator.GetThrowPacketRate() throwPacketRate := communicator.GetTCPThrowPacketRate()
if throwPacketRate >= 1.0 { if throwPacketRate >= 1.0 {
time.Sleep(time.Second*3) time.Sleep(time.Second*3)
continue continue
@ -255,7 +255,11 @@ func readFromServerPackage(
if tcpPkt.FIN { if tcpPkt.FIN {
sessionKey := spliceSessionKey(srcIP, srcPort) sessionKey := spliceSessionKey(srcIP, srcPort)
session := sessionPool[*sessionKey]
if session != nil {
session.Close()
delete(sessionPool, *sessionKey) delete(sessionPool, *sessionKey)
}
return return
} }
@ -285,7 +289,11 @@ func readToServerPackage(
// when client try close connection remove session from session pool // when client try close connection remove session from session pool
if tcpPkt.FIN { if tcpPkt.FIN {
sessionKey := spliceSessionKey(srcIP, srcPort) sessionKey := spliceSessionKey(srcIP, srcPort)
session := sessionPool[*sessionKey]
if session != nil {
session.Close()
delete(sessionPool, *sessionKey) delete(sessionPool, *sessionKey)
}
log.Debugf("close connection from %s", *sessionKey) log.Debugf("close connection from %s", *sessionKey)
return return
} }

View File

@ -82,6 +82,10 @@ func outletSetConfig(resp http.ResponseWriter, req *http.Request) {
mp.Err = SetConfig(ep.ConfigName, ep.Value) mp.Err = SetConfig(ep.ConfigName, ep.Value)
} }
func GetThrowPacketRate() float64 { func GetTCPThrowPacketRate() float64 {
return throwPacketRate.value return throwPacketRate.tcpTPR
}
func GetMysqlThrowPacketRate() float64 {
return throwPacketRate.mysqlTPR
} }

View File

@ -1,6 +1,9 @@
package communicator package communicator
import "fmt" import (
"fmt"
"math"
)
type configItem interface { type configItem interface {
setVal (interface{}) error setVal (interface{}) error
@ -9,13 +12,15 @@ type configItem interface {
type throwPacketRateConfig struct { type throwPacketRateConfig struct {
name string name string
value float64 tcpTPR float64
mysqlTPR float64
} }
func newThrowPacketRateConfig() (tpr *throwPacketRateConfig) { func newThrowPacketRateConfig() (tpr *throwPacketRateConfig) {
tpr = &throwPacketRateConfig{ tpr = &throwPacketRateConfig{
name: THROW_PACKET_RATE, name: THROW_PACKET_RATE,
value: 0.0, tcpTPR: 0.0,
mysqlTPR: 0.0,
} }
return return
} }
@ -27,14 +32,11 @@ func (tc *throwPacketRateConfig) setVal (val interface{}) (err error){
return return
} }
tc.value = realVal tc.mysqlTPR = realVal
tc.tcpTPR = math.Sqrt(realVal)
return return
} }
func (tc *throwPacketRateConfig) getVal () (val interface{}){ func (tc *throwPacketRateConfig) getVal () (val interface{}){
return tc.value return tc.mysqlTPR
}
func (tc *throwPacketRateConfig) GetValFloat64 () (val float64){
return tc.value
} }

66
model/cache_pool.go Normal file
View File

@ -0,0 +1,66 @@
package model
import (
"fmt"
"reflect"
"unsafe"
log "github.com/sirupsen/logrus"
)
type sliceBufferPool struct {
queue chan []byte
bufferSize int
name string
}
func NewSliceBufferPool(name string, bufferSize int) (sbp *sliceBufferPool) {
return &sliceBufferPool{
queue: make(chan []byte, 512),
bufferSize: bufferSize,
name: name,
}
}
func (sbp *sliceBufferPool) Enqueue(buffer []byte) {
defer func() {
log.Debugf("after enqueue from %s, there is %d elements", sbp.name, len(sbp.queue))
}()
if cap(buffer) < 1 {
return
}
sbp.queue <- buffer
}
func (sbp *sliceBufferPool) DequeueWithInit(initSize int) (buffer []byte) {
if initSize >= sbp.bufferSize {
panic(fmt.Sprintf("package size bigger than max buffer size need deal:%d",
sbp.bufferSize))
}
defer func() {
// reset cache byte
pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&buffer))
pbytes.Len = initSize
}()
buffer = sbp.Dequeue()
return
}
func (sbp *sliceBufferPool) Dequeue() (buffer []byte) {
defer func() {
log.Debugf("after dequeue from %s, there is %d elements", sbp.name, len(sbp.queue))
}()
select {
case buffer = <- sbp.queue:
return
default:
buffer = make([]byte, 0, sbp.bufferSize)
return
}
}

View File

@ -1,14 +1,16 @@
package model package model
import ( import (
// "encoding/json" "bytes"
"github.com/json-iterator/go"
"encoding/json"
// "github.com/json-iterator/go"
"time" "time"
"github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/hack"
) )
var json = jsoniter.ConfigCompatibleWithStandardLibrary // var json = jsoniter.ConfigCompatibleWithStandardLibrary
type QueryPiece interface { type QueryPiece interface {
String() *string String() *string
@ -32,20 +34,23 @@ type MysqlQueryPiece struct {
ThrowPacketRate float64 `json:"tpr"` ThrowPacketRate float64 `json:"tpr"`
BeginTime int64 `json:"bt"` BeginTime int64 `json:"bt"`
CostTimeInMS int64 `json:"cms"` CostTimeInMS int64 `json:"cms"`
jsonContent []byte `json:"-"`
} }
type PooledMysqlQueryPiece struct { type PooledMysqlQueryPiece struct {
MysqlQueryPiece MysqlQueryPiece
recoverPool *mysqlQueryPiecePool recoverPool *mysqlQueryPiecePool
sliceBufferPool *sliceBufferPool
} }
const ( const (
datetimeFormat = "2006-01-02 15:04:05"
millSecondUnit = int64(time.Millisecond) millSecondUnit = int64(time.Millisecond)
) )
var ( var (
mqpp = NewMysqlQueryPiecePool() mqpp = NewMysqlQueryPiecePool()
localSliceBufferPool = NewSliceBufferPool("json cache", 2*1024*1024)
) )
func NewPooledMysqlQueryPiece( func NewPooledMysqlQueryPiece(
@ -53,11 +58,6 @@ func NewPooledMysqlQueryPiece(
clientPort, serverPort int, throwPacketRate float64, stmtBeginTime int64) ( clientPort, serverPort int, throwPacketRate float64, stmtBeginTime int64) (
mqp *PooledMysqlQueryPiece) { mqp *PooledMysqlQueryPiece) {
mqp = mqpp.Dequeue() mqp = mqpp.Dequeue()
if mqp == nil {
mqp = &PooledMysqlQueryPiece{
MysqlQueryPiece: MysqlQueryPiece{},
}
}
nowInMS := time.Now().UnixNano() / millSecondUnit nowInMS := time.Now().UnixNano() / millSecondUnit
mqp.SessionID = sessionID mqp.SessionID = sessionID
@ -73,6 +73,7 @@ func NewPooledMysqlQueryPiece(
mqp.BeginTime = stmtBeginTime mqp.BeginTime = stmtBeginTime
mqp.CostTimeInMS = nowInMS - stmtBeginTime mqp.CostTimeInMS = nowInMS - stmtBeginTime
mqp.recoverPool = mqpp mqp.recoverPool = mqpp
mqp.sliceBufferPool = localSliceBufferPool
return return
} }
@ -83,13 +84,23 @@ func (mqp *MysqlQueryPiece) String() (*string) {
return &contentStr return &contentStr
} }
func (mqp *MysqlQueryPiece) Bytes() (bytes []byte) { func (mqp *MysqlQueryPiece) Bytes() (content []byte) {
content, err := json.Marshal(mqp) // content, err := json.Marshal(mqp)
if err != nil { if len(mqp.jsonContent) > 0 {
return []byte(err.Error()) return mqp.jsonContent
} }
return content var cacheBuffer = localSliceBufferPool.Dequeue()
buffer := bytes.NewBuffer(cacheBuffer)
err := json.NewEncoder(buffer).Encode(mqp)
if err != nil {
mqp.jsonContent = []byte(err.Error())
} else {
mqp.jsonContent = buffer.Bytes()
}
return mqp.jsonContent
} }
func (mqp *MysqlQueryPiece) GetSQL() (str *string) { func (mqp *MysqlQueryPiece) GetSQL() (str *string) {
@ -106,4 +117,6 @@ func (mqp *MysqlQueryPiece) SetNeedSyncSend(syncSend bool) {
func (pmqp *PooledMysqlQueryPiece) Recovery() { func (pmqp *PooledMysqlQueryPiece) Recovery() {
pmqp.recoverPool.Enqueue(pmqp) pmqp.recoverPool.Enqueue(pmqp)
pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0])
pmqp.jsonContent = nil
} }

View File

@ -5,13 +5,13 @@ import (
) )
type mysqlQueryPiecePool struct { type mysqlQueryPiecePool struct {
queue []*PooledMysqlQueryPiece queue chan *PooledMysqlQueryPiece
lock sync.Mutex lock sync.Mutex
} }
func NewMysqlQueryPiecePool() (mqpp *mysqlQueryPiecePool) { func NewMysqlQueryPiecePool() (mqpp *mysqlQueryPiecePool) {
return &mysqlQueryPiecePool{ return &mysqlQueryPiecePool{
queue: make([]*PooledMysqlQueryPiece, 0, 5000), queue: make(chan *PooledMysqlQueryPiece, 1024),
} }
} }
@ -19,18 +19,21 @@ func (mqpp *mysqlQueryPiecePool) Enqueue(pmqp *PooledMysqlQueryPiece) {
mqpp.lock.Lock() mqpp.lock.Lock()
defer mqpp.lock.Unlock() defer mqpp.lock.Unlock()
mqpp.queue = append(mqpp.queue, pmqp)
mqpp.queue <- pmqp
} }
func (mqpp *mysqlQueryPiecePool) Dequeue() (pmqp *PooledMysqlQueryPiece) { func (mqpp *mysqlQueryPiecePool) Dequeue() (pmqp *PooledMysqlQueryPiece) {
mqpp.lock.Lock() mqpp.lock.Lock()
defer mqpp.lock.Unlock() defer mqpp.lock.Unlock()
if len(mqpp.queue) < 1 { select {
return nil case pmqp = <- mqpp.queue:
}
pmqp = mqpp.queue[0]
mqpp.queue = mqpp.queue[1:]
return return
default:
pmqp = &PooledMysqlQueryPiece{
MysqlQueryPiece: MysqlQueryPiece{},
}
return
}
} }

View File

@ -4,4 +4,5 @@ import "github.com/zr-hebo/sniffer-agent/model"
type ConnSession interface { type ConnSession interface {
ReceiveTCPPacket(*model.TCPPacket) ReceiveTCPPacket(*model.TCPPacket)
Close()
} }

View File

@ -3,6 +3,7 @@ package mysql
import ( import (
"flag" "flag"
"fmt" "fmt"
"github.com/zr-hebo/sniffer-agent/model"
"regexp" "regexp"
) )
@ -17,6 +18,7 @@ var (
adminPasswd string adminPasswd string
coverRangePool = NewCoveragePool() coverRangePool = NewCoveragePool()
localStmtCache = model.NewSliceBufferPool("statement cache", MaxMysqlPacketLen)
) )
func init() { func init() {

View File

@ -0,0 +1,133 @@
package mysql
import (
log "github.com/sirupsen/logrus"
)
// coverageNode record tcp package begin and end seq id
type coverageNode struct {
begin int64
end int64
next *coverageNode
crp *coveragePool
}
func newCoverage(begin, end int64) (*coverageNode) {
return &coverageNode{
begin: begin,
end: end,
}
}
func (crn *coverageNode) Recovery() {
crn.crp.Enqueue(crn)
}
type coverRanges struct {
head *coverageNode
}
func NewCoverRanges() *coverRanges {
return &coverRanges{
head: &coverageNode{
begin: -1,
},
}
}
func (crs *coverRanges) clear() {
currRange := crs.head.next;
for currRange != nil {
node := currRange
currRange = currRange.next
node.Recovery()
}
crs.head.next = nil
}
func (crs *coverRanges) addRange(node *coverageNode) {
// insert range in asc order
var currRange = crs.head;
for currRange != nil && currRange.next != nil {
checkRange := currRange.next
if checkRange != nil && checkRange.begin >= node.begin {
currRange.next = node
node.next = checkRange
node = nil
break
} else {
currRange = checkRange
}
}
if node != nil && currRange != nil {
currRange.next = node
}
crs.mergeRanges()
}
func (crs *coverRanges) mergeRanges() {
// merge ranges
currRange := crs.head.next
for currRange != nil && currRange.next != nil {
checkRange := currRange.next
if currRange.end >= checkRange.begin && currRange.end < checkRange.end {
currRange.end = checkRange.end
currRange.next = checkRange.next
checkRange.Recovery()
} else {
currRange = currRange.next
}
}
}
type coveragePool struct {
queue chan *coverageNode
}
func NewCoveragePool() (cp *coveragePool) {
return &coveragePool{
queue: make(chan *coverageNode, 256),
}
}
func (crp *coveragePool) NewCoverage(begin, end int64)(cn *coverageNode) {
cn = crp.Dequeue()
cn.begin = begin
cn.end = end
return
}
func (crp *coveragePool) Enqueue(cn *coverageNode) {
log.Debugf("coveragePool enqueue: %d", len(crp.queue))
if cn == nil {
return
}
crp.queue <- cn
}
func (crp *coveragePool) Dequeue() (cn *coverageNode) {
log.Debugf("coveragePool dequeue: %d", len(crp.queue))
defer func() {
cn.begin = -1
cn.end = -1
cn.next = nil
cn.crp = crp
}()
select {
case cn = <- crp.queue:
return
default:
cn = &coverageNode{}
return
}
}

View File

@ -7,128 +7,3 @@ type handshakeResponse41 struct {
DBName string DBName string
Auth []byte Auth []byte
} }
// coverageNode record tcp package begin and end seq id
type coverageNode struct {
begin int64
end int64
next *coverageNode
crp *coveragePool
}
func newCoverage(begin, end int64) (*coverageNode) {
return &coverageNode{
begin: begin,
end: end,
}
}
func (crn *coverageNode) Recovery() {
crn.crp.Enqueue(crn)
}
type coveragePool struct {
queue []*coverageNode
}
func NewCoveragePool() (cp *coveragePool) {
return &coveragePool{
queue: make([]*coverageNode, 0, 256),
}
}
func (crp *coveragePool) Enqueue(cn *coverageNode) {
if cn == nil {
return
}
crp.queue = append(crp.queue, cn)
}
func (crp *coveragePool) NewCoverage(begin, end int64)(cn *coverageNode) {
cn = crp.Dequeue()
cn.begin = begin
cn.end = end
return
}
func (crp *coveragePool) Dequeue() (cn *coverageNode) {
defer func() {
cn.begin = -1
cn.end = -1
cn.next = nil
cn.crp = crp
}()
if len(crp.queue) < 1 {
cn = &coverageNode{}
return
}
cn = crp.queue[0]
crp.queue = crp.queue[1:]
return
}
type coverRanges struct {
head *coverageNode
}
func NewCoverRanges() *coverRanges {
return &coverRanges{
head: &coverageNode{
begin: -1,
},
}
}
func (crs *coverRanges) clear() {
currRange := crs.head.next;
if currRange != nil {
node := currRange
currRange = currRange.next
node.Recovery()
}
crs.head.next = nil
}
func (crs *coverRanges) addRange(node *coverageNode) {
// insert range in asc order
var currRange = crs.head;
for currRange != nil && currRange.next != nil {
checkRange := currRange.next
if checkRange != nil && checkRange.begin >= node.begin {
currRange.next = node
node.next = checkRange
node = nil
break
} else {
currRange = checkRange
}
}
if node != nil {
currRange.next = node
}
crs.mergeRanges()
}
func (crs *coverRanges) mergeRanges() {
// merge ranges
currRange := crs.head.next
for currRange != nil && currRange.next != nil {
checkRange := currRange.next
if currRange.end >= checkRange.begin && currRange.end < checkRange.end {
currRange.end = checkRange.end
currRange.next = checkRange.next
checkRange.Recovery()
} else {
currRange = currRange.next
}
}
}

View File

@ -122,7 +122,12 @@ func (ms *MysqlSession) checkFinish() bool {
return false return false
} }
func (ms *MysqlSession) Close() {
ms.clear()
}
func (ms *MysqlSession) clear() { func (ms *MysqlSession) clear() {
localStmtCache.Enqueue(ms.cachedStmtBytes)
ms.cachedStmtBytes = nil ms.cachedStmtBytes = nil
ms.expectReceiveSize = -1 ms.expectReceiveSize = -1
ms.expectSendSize = -1 ms.expectSendSize = -1
@ -155,15 +160,13 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) {
ms.beginSeqID = seqID ms.beginSeqID = seqID
ms.endSeqID = seqID ms.endSeqID = seqID
// if len(ms.cachedStmtBytes) > 0 {
// copy(newCache[:len(ms.cachedStmtBytes)], ms.cachedStmtBytes)
// }
if int64(ms.expectReceiveSize) < int64(len(contents)) { if int64(ms.expectReceiveSize) < int64(len(contents)) {
log.Warnf("receive invalid mysql packet") log.Warnf("receive invalid mysql packet")
return return
} }
newCache := make([]byte, ms.expectReceiveSize) newCache := localStmtCache.DequeueWithInit(ms.expectReceiveSize)
copy(newCache[:len(contents)], contents) copy(newCache[:len(contents)], contents)
ms.cachedStmtBytes = newCache ms.cachedStmtBytes = newCache
@ -310,5 +313,5 @@ func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (m
func (ms *MysqlSession) composeQueryPiece() (mqp *model.PooledMysqlQueryPiece) { func (ms *MysqlSession) composeQueryPiece() (mqp *model.PooledMysqlQueryPiece) {
return model.NewPooledMysqlQueryPiece( return model.NewPooledMysqlQueryPiece(
ms.connectionID, ms.clientHost, ms.visitUser, ms.visitDB, ms.clientHost, ms.serverIP, ms.connectionID, ms.clientHost, ms.visitUser, ms.visitDB, ms.clientHost, ms.serverIP,
ms.clientPort, ms.serverPort, communicator.GetThrowPacketRate(), ms.stmtBeginTime) ms.clientPort, ms.serverPort, communicator.GetMysqlThrowPacketRate(), ms.stmtBeginTime)
} }