This commit is contained in:
hebo 2019-09-06 15:54:48 +08:00
parent c9f514feed
commit 0506fa2f63
4 changed files with 67 additions and 165 deletions

View File

@ -73,13 +73,13 @@ func initEthernetHandlerFromPacpgo() (handler *pcapgo.EthernetHandle) {
panic(err.Error())
}
_ = handler.SetCaptureLength(1024*1024*5)
_ = handler.SetCaptureLength(65536)
return
}
func initEthernetHandlerFromPacp() (handler *pcap.Handle) {
handler, err := pcap.OpenLive(DeviceName, 65535, false, pcap.BlockForever)
handler, err := pcap.OpenLive(DeviceName, 65536, false, pcap.BlockForever)
if err != nil {
panic(fmt.Sprintf("cannot open network interface %s <-- %s", DeviceName, err.Error()))
}
@ -256,7 +256,6 @@ func readFromServerPackage(
if tcpPkt.FIN {
sessionKey := spliceSessionKey(srcIP, srcPort)
delete(sessionPool, *sessionKey)
log.Debugf("close connection from %s", *sessionKey)
return
}

View File

@ -27,6 +27,7 @@ type MysqlQueryPiece struct {
VisitUser *string `json:"user"`
VisitDB *string `json:"db"`
QuerySQL *string `json:"sql"`
ThrowPacketRate float64 `json:"tpr"`
BeginTime int64 `json:"bt"`
CostTimeInMS int64 `json:"cms"`
}
@ -47,7 +48,7 @@ var (
func NewPooledMysqlQueryPiece(
sessionID, clientIP, visitUser, visitDB, clientHost, serverIP *string,
clientPort, serverPort int, stmtBeginTime int64) (
clientPort, serverPort int, throwPacketRate float64, stmtBeginTime int64) (
mqp *PooledMysqlQueryPiece) {
mqp = mqpp.Dequeue()
if mqp == nil {
@ -66,6 +67,7 @@ func NewPooledMysqlQueryPiece(
mqp.VisitUser = visitUser
mqp.VisitDB = visitDB
mqp.SyncSend = false
mqp.ThrowPacketRate = throwPacketRate
mqp.BeginTime = stmtBeginTime
mqp.CostTimeInMS = nowInMS - stmtBeginTime
mqp.recoverPool = mqpp

View File

@ -1,9 +1,5 @@
package mysql
import (
log "github.com/sirupsen/logrus"
)
type handshakeResponse41 struct {
Capability uint32
Collation uint8
@ -12,52 +8,8 @@ type handshakeResponse41 struct {
Auth []byte
}
// jigsaw record tcp package begin and end seq id
type jigsaw struct {
b int64
e int64
// receiveRange record mysql package begin and end seq id
type receiveRange struct {
beginSeqID int64
endSeqID int64
}
type packageWindowCounter struct {
sizeCount map[int]int64
visitCount int64
suggestSize int
}
func newPackageWindowCounter() *packageWindowCounter {
return &packageWindowCounter{
sizeCount: make(map[int]int64, 4),
suggestSize: 512,
}
}
func (pwc *packageWindowCounter) refresh (readSize int, isLastPackage bool) {
if pwc.visitCount > 10000 {
return
}
log.Debugf("WindowCounter: %#v", pwc.sizeCount)
pwc.visitCount += 1
miniMatchSize := maxIPPackageSize
for size := range pwc.sizeCount {
if readSize % size == 0 && miniMatchSize > size {
miniMatchSize = size
}
}
if miniMatchSize < maxIPPackageSize {
pwc.sizeCount[miniMatchSize] = pwc.sizeCount[miniMatchSize] + 1
} else if !isLastPackage {
pwc.sizeCount[readSize] = 1
}
mostFrequentSize := pwc.suggestSize
mostFrequentCount := int64(0)
for size, count := range pwc.sizeCount {
if count > mostFrequentCount {
mostFrequentSize = size
mostFrequentCount = count
}
}
pwc.suggestSize = mostFrequentSize
}

View File

@ -2,7 +2,9 @@ package mysql
import (
"fmt"
"strings"
"github.com/zr-hebo/sniffer-agent/communicator"
// "strings"
"sync"
"time"
@ -20,11 +22,13 @@ type MysqlSession struct {
serverIP *string
serverPort int
stmtBeginTime int64
beginSeqID int64
packageOffset int64
expectReceiveSize int
coverRanges []*jigsaw
coverRange *jigsaw
// coverRanges []*receiveRange
// coverRange *receiveRange
beginSeqID int64
expectSeqID int64
expectSendSize int
prepareInfo *prepareInfo
cachedPrepareStmt map[int][]byte
@ -34,7 +38,6 @@ type MysqlSession struct {
tcpPacketCache []*model.TCPPacket
queryPieceReceiver chan model.QueryPiece
lastSeq int64
closeConn chan bool
pkgCacheLock sync.Mutex
@ -46,14 +49,6 @@ type prepareInfo struct {
prepareStmtID int
}
var (
windowSizeCache = make(map[string]*packageWindowCounter, 16)
)
const (
maxIPPackageSize = 1 << 16
)
func NewMysqlSession(
sessionKey *string, clientIP *string, clientPort int, serverIP *string, serverPort int,
receiver chan model.QueryPiece) (ms *MysqlSession) {
@ -65,10 +60,10 @@ func NewMysqlSession(
serverPort: serverPort,
stmtBeginTime: time.Now().UnixNano() / millSecondUnit,
cachedPrepareStmt: make(map[int][]byte, 8),
coverRanges: make([]*jigsaw, 0, 4),
queryPieceReceiver: receiver,
closeConn: make(chan bool, 1),
lastSeq: -1,
expectReceiveSize: -1,
expectSeqID: -1,
ignoreAckID: -1,
sendSize: 0,
pkgCacheLock: sync.Mutex{},
@ -118,49 +113,6 @@ func (ms *MysqlSession) readFromServer(bytes []byte) {
}
}
func (ms *MysqlSession) mergeRanges() {
if len(ms.coverRanges) > 1 {
newRange, newPkgRanges := mergeRanges(ms.coverRanges[0], ms.coverRanges[1:])
tmpRanges := make([]*jigsaw, len(newPkgRanges)+1)
tmpRanges[0] = newRange
if len(newPkgRanges) > 0 {
copy(tmpRanges[1:], newPkgRanges)
}
ms.coverRanges = tmpRanges
}
}
func mergeRanges(currRange *jigsaw, pkgRanges []*jigsaw) (mergedRange *jigsaw, newPkgRanges []*jigsaw) {
var nextRange *jigsaw
newPkgRanges = make([]*jigsaw, 0, 4)
if len(pkgRanges) < 1 {
return currRange, newPkgRanges
} else if len(pkgRanges) == 1 {
//
nextRange = pkgRanges[0]
} else {
nextRange, newPkgRanges = mergeRanges(pkgRanges[0], pkgRanges[1:])
}
if currRange.e >= nextRange.b {
mergedRange = &jigsaw{b: currRange.b, e: nextRange.e}
} else {
tmpRanges := make([]*jigsaw, len(newPkgRanges)+1)
tmpRanges[0] = nextRange
if len(newPkgRanges) > 0 {
copy(tmpRanges[1:], newPkgRanges)
}
newPkgRanges = tmpRanges
mergedRange = currRange
}
return
}
func (ms *MysqlSession) oneMysqlPackageFinish() bool {
if int64(len(ms.cachedStmtBytes))%MaxMysqlPacketLen == 0 {
return true
@ -170,30 +122,30 @@ func (ms *MysqlSession) oneMysqlPackageFinish() bool {
}
func (ms *MysqlSession) checkFinish() bool {
if len(ms.coverRanges) != 1 {
ranges := make([]string, 0, len(ms.coverRanges))
for _, cr := range ms.coverRanges {
ranges = append(ranges, fmt.Sprintf("[%d -- %d]", cr.b, cr.e))
}
log.Debugf("in session %s get invalid range: %s", *ms.connectionID, strings.Join(ranges, ", "))
return false
if ms.beginSeqID != -1 && ms.expectReceiveSize == 0 {
return true
}
firstRange := ms.coverRanges[0]
if firstRange.e-firstRange.b != int64(len(ms.cachedStmtBytes)) {
return false
}
return false
}
return true
func (ms *MysqlSession) clear() {
ms.cachedStmtBytes = nil
ms.expectReceiveSize = -1
ms.expectSendSize = -1
ms.prepareInfo = nil
ms.beginSeqID = -1
ms.expectSeqID = -1
ms.ignoreAckID = -1
ms.sendSize = 0
}
func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) {
contentSize := int64(len(bytes))
if ms.expectReceiveSize == 0 || ms.oneMysqlPackageFinish() {
if ms.expectReceiveSize == -1 || ms.oneMysqlPackageFinish() {
ms.expectReceiveSize = extractMysqlPayloadSize(bytes[:4])
ms.packageOffset = int64(len(ms.cachedStmtBytes))
// ms.packageOffset = int64(len(ms.cachedStmtBytes))
contents := bytes[4:]
if contents[0] == ComStmtPrepare {
@ -203,6 +155,7 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) {
contentSize = int64(len(contents))
seqID += 4
ms.beginSeqID = seqID
newCache := make([]byte, ms.expectReceiveSize+len(ms.cachedStmtBytes))
if len(ms.cachedStmtBytes) > 0 {
copy(newCache[:len(ms.cachedStmtBytes)], ms.cachedStmtBytes)
@ -214,52 +167,47 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) {
}
} else {
if seqID < ms.beginSeqID {
log.Debugf("in session %s get outdate package with Seq:%d", *ms.connectionID, seqID)
if ms.beginSeqID == -1 {
log.Warnf("cover range is empty")
return
}
seqOffset := seqID - ms.beginSeqID
if ms.packageOffset+seqOffset+int64(len(bytes)) <= int64(ms.expectReceiveSize) {
copy(ms.cachedStmtBytes[ms.packageOffset+seqOffset:ms.packageOffset+seqOffset+int64(len(bytes))], bytes)
if seqID < ms.beginSeqID {
// out date packet
log.Debugf("in session %s get outdate package with Seq:%d, beginSeq:%d",
*ms.connectionID, seqID, ms.beginSeqID)
return
} else if seqID + int64(len(bytes)) <= ms.beginSeqID {
// repeat packet
log.Debugf("receive repeat packet")
return
} else if seqID > ms.expectSeqID {
// discontinuous packet
log.Debugf("receive discontinuous packet")
ms.clear()
return
}
}
insertIdx := len(ms.coverRanges)
for idx, cr := range ms.coverRanges {
if seqID < cr.b {
insertIdx = idx
break
seqOffset := seqID - ms.beginSeqID + ms.packageOffset
if seqOffset+int64(len(bytes)) > int64(len(ms.cachedStmtBytes)) {
// is not a normal mysql packet
log.Debugf("receive an unexpect packet")
ms.clear()
return
}
// add byte to stmt cache
copy(ms.cachedStmtBytes[ms.packageOffset+seqOffset:ms.packageOffset+seqOffset+contentSize], bytes)
}
cr := &jigsaw{b: seqID, e: seqID + contentSize}
if len(ms.coverRanges) < 1 || insertIdx == len(ms.coverRanges) {
ms.coverRanges = append(ms.coverRanges, cr)
} else {
newCoverRanges := make([]*jigsaw, len(ms.coverRanges)+1)
copy(newCoverRanges[:insertIdx], ms.coverRanges[:insertIdx])
newCoverRanges[insertIdx] = cr
copy(newCoverRanges[insertIdx+1:], ms.coverRanges[insertIdx:])
ms.coverRanges = newCoverRanges
}
ms.mergeRanges()
ms.expectReceiveSize = ms.expectReceiveSize - int(contentSize)
ms.expectSeqID = seqID + contentSize
}
func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) {
defer func() {
ms.cachedStmtBytes = nil
ms.expectReceiveSize = 0
ms.expectSendSize = 0
ms.prepareInfo = nil
ms.coverRanges = make([]*jigsaw, 0, 4)
ms.coverRange = nil
ms.lastSeq = -1
ms.ignoreAckID = -1
ms.sendSize = 0
}()
defer ms.clear()
if len(ms.cachedStmtBytes) < 1 {
return
@ -366,5 +314,6 @@ func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (m
func (ms *MysqlSession) composeQueryPiece() (mqp *model.PooledMysqlQueryPiece) {
return model.NewPooledMysqlQueryPiece(
ms.connectionID, ms.clientHost, ms.visitUser, ms.visitDB, ms.clientHost, ms.serverIP,
ms.clientPort, ms.serverPort, ms.stmtBeginTime)
ms.clientPort, ms.serverPort, communicator.GetConfig(communicator.THROW_PACKET_RATE).(float64),
ms.stmtBeginTime)
}