deal incomplete sql

This commit is contained in:
hebo 2019-08-23 15:05:21 +08:00
parent 8b86db5907
commit 29abb4a212
5 changed files with 148 additions and 152 deletions

View File

@ -21,9 +21,9 @@ sniffer-agent采用模块化结构支持用户添加自己的解析模块
目前输出的内容都是解析结果组成的json。 目前输出的内容都是解析结果组成的json。
MySQL协议的解析结果示例如下 MySQL协议的解析结果示例如下
``` ```
{"sid":"10.XX.XX.XX:54656","sip":"192.168.XX.XX","sport":3306,"user":"root","db":"sniffer","sql":"show tables","bt":"2019-08-05 18:23:09","cms":15} {"cip":"192.168.XXX.XXX","cport":63888,"sip":"192.168.XX.XX","sport":3306,"user":"root","db":"sniffer","sql":"show tables","bt":"2019-08-05 18:23:09","cms":15}
``` ```
其中sid代表客户端ipport组成的session标识sip代表server ipsport代表server portuser代表查询用户db代表当前连接的库名sql代表查询语句bt代表查询开始时间cms代表查询消耗的时间单位是毫秒 其中cip代表客户端ipcport代表客户端port(客户端ipport组成session标识)sip代表server ipsport代表server portuser代表查询用户db代表当前连接的库名sql代表查询语句bt代表查询开始时间cms代表查询消耗的时间单位是毫秒
### Exporter ### Exporter

View File

@ -71,7 +71,7 @@ func initEthernetHandlerFromPacpgo() (handler *pcapgo.EthernetHandle) {
panic(err.Error()) panic(err.Error())
} }
_ = handler.SetCaptureLength(65535) _ = handler.SetCaptureLength(1024*1024*10)
return return
} }
@ -92,16 +92,19 @@ func initEthernetHandlerFromPacp() (handler *pcap.Handle) {
func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { func (nc *networkCard) Listen() (receiver chan model.QueryPiece) {
if inParallel { if inParallel {
return nc.listenInParallel() nc.listenInParallel()
} else {
nc.listenNormal()
} }
return nc.listenNormal() return nc.receiver
} }
// Listen get a connection. // Listen get a connection.
func (nc *networkCard) listenNormal() (receiver chan model.QueryPiece) { func (nc *networkCard) listenNormal() {
go func() { go func() {
handler := initEthernetHandlerFromPacp() handler := initEthernetHandlerFromPacpgo()
for { for {
var data []byte var data []byte
data, ci, err := handler.ZeroCopyReadPacketData() data, ci, err := handler.ZeroCopyReadPacketData()
@ -123,7 +126,7 @@ func (nc *networkCard) listenNormal() (receiver chan model.QueryPiece) {
} }
// Listen get a connection. // Listen get a connection.
func (nc *networkCard) listenInParallel() (receiver chan model.QueryPiece) { func (nc *networkCard) listenInParallel() {
type captureInfo struct { type captureInfo struct {
bytes []byte bytes []byte
captureInfo gopacket.CaptureInfo captureInfo gopacket.CaptureInfo
@ -158,16 +161,19 @@ func (nc *networkCard) listenInParallel() (receiver chan model.QueryPiece) {
// parse package // parse package
go func() { go func() {
defer func() {
close(receiver)
}()
for captureInfo := range rawDataChan { for captureInfo := range rawDataChan {
packet := gopacket.NewPacket(captureInfo.bytes, layers.LayerTypeEthernet, gopacket.NoCopy) packet := gopacket.NewPacket(captureInfo.bytes, layers.LayerTypeEthernet, gopacket.NoCopy)
m := packet.Metadata() m := packet.Metadata()
m.CaptureInfo = captureInfo.captureInfo m.CaptureInfo = captureInfo.captureInfo
m.Truncated = m.Truncated || captureInfo.captureInfo.CaptureLength < captureInfo.captureInfo.Length m.Truncated = m.Truncated || captureInfo.captureInfo.CaptureLength < captureInfo.captureInfo.Length
packageChan <- packet
}
}()
// parse package
go func() {
for packet := range packageChan {
nc.parseTCPPackage(packet) nc.parseTCPPackage(packet)
} }
}() }()
@ -246,7 +252,7 @@ func readFromServerPackage(
sessionKey := spliceSessionKey(srcIP, srcPort) sessionKey := spliceSessionKey(srcIP, srcPort)
session := sessionPool[*sessionKey] session := sessionPool[*sessionKey]
if session != nil { if session != nil {
// session.ReadFromServer(tcpPayload) // session.readFromServer(tcpPayload)
// qp = session.GenerateQueryPiece() // qp = session.GenerateQueryPiece()
pkt := model.NewTCPPacket(tcpPayload, int64(tcpPkt.Ack), false) pkt := model.NewTCPPacket(tcpPayload, int64(tcpPkt.Ack), false)
session.ReceiveTCPPacket(pkt) session.ReceiveTCPPacket(pkt)
@ -286,8 +292,6 @@ func readToServerPackage(
pkt := model.NewTCPPacket(tcpPayload, int64(tcpPkt.Seq), true) pkt := model.NewTCPPacket(tcpPayload, int64(tcpPkt.Seq), true)
session.ReceiveTCPPacket(pkt) session.ReceiveTCPPacket(pkt)
// session.ResetBeginTime()
// session.ReadFromClient(int64(tcpPkt.Seq), tcpPayload)
return return
} }

View File

@ -18,8 +18,9 @@ type QueryPiece interface {
// MysqlQueryPiece 查询信息 // MysqlQueryPiece 查询信息
type MysqlQueryPiece struct { type MysqlQueryPiece struct {
SessionID *string `json:"sid"` SessionID *string `json:"-"`
ClientHost *string `json:"-"` ClientHost *string `json:"cip"`
ClientPort int `json:"cport"`
SyncSend bool `json:"-"` SyncSend bool `json:"-"`
ServerIP *string `json:"sip"` ServerIP *string `json:"sip"`
ServerPort int `json:"sport"` ServerPort int `json:"sport"`
@ -45,7 +46,8 @@ var (
) )
func NewPooledMysqlQueryPiece( func NewPooledMysqlQueryPiece(
sessionID, visitUser, visitDB, clientHost, serverIP *string, serverPort int, stmtBeginTime int64) ( sessionID, clientIP, visitUser, visitDB, clientHost, serverIP *string,
clientPort, serverPort int, stmtBeginTime int64) (
mqp *PooledMysqlQueryPiece) { mqp *PooledMysqlQueryPiece) {
mqp = mqpp.Dequeue() mqp = mqpp.Dequeue()
if mqp == nil { if mqp == nil {
@ -56,6 +58,8 @@ func NewPooledMysqlQueryPiece(
nowInMS := time.Now().UnixNano() / millSecondUnit nowInMS := time.Now().UnixNano() / millSecondUnit
mqp.SessionID = sessionID mqp.SessionID = sessionID
mqp.ClientHost = clientIP
mqp.ClientPort = clientPort
mqp.ClientHost = clientHost mqp.ClientHost = clientHost
mqp.ServerIP = serverIP mqp.ServerIP = serverIP
mqp.ServerPort = serverPort mqp.ServerPort = serverPort

View File

@ -3,11 +3,5 @@ package session_dealer
import "github.com/zr-hebo/sniffer-agent/model" import "github.com/zr-hebo/sniffer-agent/model"
type ConnSession interface { type ConnSession interface {
ReadFromClient(seqID int64, bytes []byte)
ReadFromServer(bytes []byte)
ResetBeginTime()
GenerateQueryPiece() (qp model.QueryPiece)
ReceiveTCPPacket(*model.TCPPacket) ReceiveTCPPacket(*model.TCPPacket)
Stop()
} }

View File

@ -3,6 +3,7 @@ package mysql
import ( import (
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"github.com/siddontang/go/hack" "github.com/siddontang/go/hack"
@ -11,31 +12,32 @@ import (
) )
type MysqlSession struct { type MysqlSession struct {
connectionID *string connectionID *string
visitUser *string visitUser *string
visitDB *string visitDB *string
clientHost *string clientHost *string
clientPort int clientPort int
serverIP *string serverIP *string
serverPort int serverPort int
stmtBeginTime int64 stmtBeginTime int64
beginSeqID int64 beginSeqID int64
packageOffset int64 packageOffset int64
expectReceiveSize int expectReceiveSize int
coverRanges []*jigsaw coverRanges []*jigsaw
expectSendSize int expectSendSize int
prepareInfo *prepareInfo prepareInfo *prepareInfo
cachedPrepareStmt map[int]*string cachedPrepareStmt map[int]*string
cachedStmtBytes []byte cachedStmtBytes []byte
computeWindowSizeCounter int computeWindowSizeCounter int
tcpPacketCache []*model.TCPPacket tcpPacketCache []*model.TCPPacket
queryPieceReceiver chan model.QueryPiece queryPieceReceiver chan model.QueryPiece
lastSeq int64 lastSeq int64
keepAlive chan bool closeConn chan bool
pkgCacheLock sync.Mutex
ackID int64 ignoreAckID int64
sendSize int64 sendSize int64
} }
@ -55,118 +57,127 @@ func NewMysqlSession(
sessionKey *string, clientIP *string, clientPort int, serverIP *string, serverPort int, sessionKey *string, clientIP *string, clientPort int, serverIP *string, serverPort int,
receiver chan model.QueryPiece) (ms *MysqlSession) { receiver chan model.QueryPiece) (ms *MysqlSession) {
ms = &MysqlSession{ ms = &MysqlSession{
connectionID: sessionKey, connectionID: sessionKey,
clientHost: clientIP, clientHost: clientIP,
clientPort: clientPort, clientPort: clientPort,
serverIP: serverIP, serverIP: serverIP,
serverPort: serverPort, serverPort: serverPort,
stmtBeginTime: time.Now().UnixNano() / millSecondUnit, stmtBeginTime: time.Now().UnixNano() / millSecondUnit,
cachedPrepareStmt: make(map[int]*string, 8), cachedPrepareStmt: make(map[int]*string, 8),
coverRanges: make([]*jigsaw, 0, 4), coverRanges: make([]*jigsaw, 0, 4),
queryPieceReceiver: receiver, queryPieceReceiver: receiver,
keepAlive: make(chan bool, 1), closeConn: make(chan bool, 1),
lastSeq: -1, lastSeq: -1,
ackID: -1, ignoreAckID: -1,
sendSize: 0, sendSize: 0,
pkgCacheLock: sync.Mutex{},
} }
go ms.haha()
return return
} }
func (ms *MysqlSession) Stop() {
ms.keepAlive <- false
}
func (ms *MysqlSession) haha() { func (ms *MysqlSession) dealTCPPacket() {
for {
select {
case closeConn := <-ms.closeConn:
for true { if closeConn {
// select { return
// case <- ms.keepAlive:
// return
// default:
// }
if len(ms.tcpPacketCache) < 1 {
// log.Debugf("there are %d packages in tcp packet cache", )
time.Sleep(1)
continue
}
beginIdx := -1
if ms.lastSeq < 0 {
ms.lastSeq = ms.tcpPacketCache[0].Seq
}
for idx := 0; idx < len(ms.tcpPacketCache); idx++ {
pkt := ms.tcpPacketCache[idx]
if ms.lastSeq == pkt.Seq {
beginIdx = idx
ms.lastSeq = pkt.Seq + int64(len(pkt.Payload))
} else {
break
} }
}
default:
if beginIdx < 0 { if len(ms.tcpPacketCache) > 0 {
return ms.parseTCPPacket()
}
inOrderPkgs := ms.tcpPacketCache[:beginIdx+1]
if beginIdx == len(ms.tcpPacketCache) - 1 {
ms.tcpPacketCache = make([]*model.TCPPacket, 0, 4)
} else {
ms.tcpPacketCache = ms.tcpPacketCache[beginIdx+1:]
}
for _, pkg := range inOrderPkgs {
if pkg.ToServer {
ms.ReadFromClient(pkg.Seq, pkg.Payload)
} else { } else {
ms.ReadFromServer(pkg.Payload) log.Debugf("no package need deal in session:%s, so sleep", *ms.connectionID)
ms.queryPieceReceiver <- ms.GenerateQueryPiece() time.Sleep(time.Second)
} }
} }
} }
} }
func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) { func (ms *MysqlSession) parseTCPPacket() {
if !newPkt.ToServer && ms.ackID + ms.sendSize == newPkt.Seq { ms.pkgCacheLock.Lock()
var pkg *model.TCPPacket
if len(ms.tcpPacketCache) < 1 {
ms.pkgCacheLock.Unlock()
return
}
pkg = ms.tcpPacketCache[0]
ms.tcpPacketCache = ms.tcpPacketCache[1:]
ms.pkgCacheLock.Unlock()
if pkg.ToServer {
ms.resetBeginTime()
ms.readFromClient(pkg.Seq, pkg.Payload)
} else {
ms.readFromServer(pkg.Payload)
qp := ms.GenerateQueryPiece()
if qp != nil {
ms.queryPieceReceiver <- qp
}
}
return
}
func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) {
if newPkt == nil {
return
}
if !newPkt.ToServer && ms.ignoreAckID == newPkt.Seq {
// ignore to response to client data // ignore to response to client data
ms.ackID = ms.ackID + newPkt.Seq ms.ignoreAckID = ms.ignoreAckID + int64(len(newPkt.Payload))
ms.sendSize = ms.sendSize + int64(len(newPkt.Payload))
return return
} else if !newPkt.ToServer { } else if !newPkt.ToServer {
ms.ackID = newPkt.Seq ms.ignoreAckID = newPkt.Seq + int64(len(newPkt.Payload))
ms.sendSize = int64(len(newPkt.Payload))
} }
insertIdx := len(ms.tcpPacketCache) if newPkt.ToServer {
for idx, pkt := range ms.tcpPacketCache { ms.resetBeginTime()
if pkt.Seq > newPkt.Seq { ms.readFromClient(newPkt.Seq, newPkt.Payload)
insertIdx = idx
} else {
ms.readFromServer(newPkt.Payload)
qp := ms.GenerateQueryPiece()
if qp != nil {
ms.queryPieceReceiver <- qp
} }
} }
if insertIdx == len(ms.tcpPacketCache) { // ms.pkgCacheLock.Lock()
ms.tcpPacketCache = append(ms.tcpPacketCache, newPkt) // defer ms.pkgCacheLock.Unlock()
} else { //
newCache := make([]*model.TCPPacket, len(ms.tcpPacketCache)+1) // insertIdx := len(ms.tcpPacketCache)
copy(newCache[:insertIdx], ms.tcpPacketCache[:insertIdx]) // for idx, pkt := range ms.tcpPacketCache {
newCache[insertIdx] = newPkt // if pkt.Seq > newPkt.Seq {
copy(newCache[insertIdx+1:], ms.tcpPacketCache[insertIdx:]) // insertIdx = idx
} // }
// }
//
// if insertIdx == len(ms.tcpPacketCache) {
// ms.tcpPacketCache = append(ms.tcpPacketCache, newPkt)
// } else {
// newCache := make([]*model.TCPPacket, len(ms.tcpPacketCache)+1)
// copy(newCache[:insertIdx], ms.tcpPacketCache[:insertIdx])
// newCache[insertIdx] = newPkt
// copy(newCache[insertIdx+1:], ms.tcpPacketCache[insertIdx:])
// ms.tcpPacketCache = newCache
// }
} }
func (ms *MysqlSession) ResetBeginTime() { func (ms *MysqlSession) resetBeginTime() {
ms.stmtBeginTime = time.Now().UnixNano() / millSecondUnit ms.stmtBeginTime = time.Now().UnixNano() / millSecondUnit
} }
func (ms *MysqlSession) ReadFromServer(bytes []byte) { func (ms *MysqlSession) readFromServer(bytes []byte) {
if ms.expectSendSize < 1 { if ms.expectSendSize < 1 {
ms.expectSendSize = extractMysqlPayloadSize(bytes[:4]) ms.expectSendSize = extractMysqlPayloadSize(bytes[:4])
contents := bytes[4:] contents := bytes[4:]
@ -176,7 +187,7 @@ func (ms *MysqlSession) ReadFromServer(bytes []byte) {
} }
} }
func (ms *MysqlSession) mergeRanges() { func (ms *MysqlSession) mergeRanges() {
if len(ms.coverRanges) > 1 { if len(ms.coverRanges) > 1 {
newRange, newPkgRanges := mergeRanges(ms.coverRanges[0], ms.coverRanges[1:]) newRange, newPkgRanges := mergeRanges(ms.coverRanges[0], ms.coverRanges[1:])
tmpRanges := make([]*jigsaw, len(newPkgRanges)+1) tmpRanges := make([]*jigsaw, len(newPkgRanges)+1)
@ -219,7 +230,7 @@ func mergeRanges(currRange *jigsaw, pkgRanges []*jigsaw) (mergedRange *jigsaw, n
} }
func (ms *MysqlSession) oneMysqlPackageFinish() bool { func (ms *MysqlSession) oneMysqlPackageFinish() bool {
if int64(len(ms.cachedStmtBytes)) % MaxMysqlPacketLen == 0 { if int64(len(ms.cachedStmtBytes))%MaxMysqlPacketLen == 0 {
return true return true
} }
@ -230,25 +241,22 @@ func (ms *MysqlSession) checkFinish() bool {
if len(ms.coverRanges) != 1 { if len(ms.coverRanges) != 1 {
ranges := make([]string, 0, len(ms.coverRanges)) ranges := make([]string, 0, len(ms.coverRanges))
for _, cr := range ms.coverRanges { for _, cr := range ms.coverRanges {
log.Errorf("miss values: %s", string(ms.cachedStmtBytes[cr.b-ms.beginSeqID: cr.e-ms.beginSeqID]))
ranges = append(ranges, fmt.Sprintf("[%d -- %d]", cr.b, cr.e)) ranges = append(ranges, fmt.Sprintf("[%d -- %d]", cr.b, cr.e))
} }
log.Debugf("in session %s get invalid range: %s", *ms.connectionID, strings.Join(ranges, ", "))
log.Errorf("in session %s get invalid range: %s", *ms.connectionID, strings.Join(ranges, ", "))
return false return false
} }
firstRange := ms.coverRanges[0] firstRange := ms.coverRanges[0]
if firstRange.e - firstRange.b != int64(len(ms.cachedStmtBytes)) { if firstRange.e-firstRange.b != int64(len(ms.cachedStmtBytes)) {
return false return false
} }
return true return true
} }
func (ms *MysqlSession) ReadFromClient(seqID int64, bytes []byte) { func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) {
contentSize := int64(len(bytes)) contentSize := int64(len(bytes))
if ms.expectReceiveSize == 0 || ms.oneMysqlPackageFinish() { if ms.expectReceiveSize == 0 || ms.oneMysqlPackageFinish() {
@ -268,12 +276,9 @@ func (ms *MysqlSession) ReadFromClient(seqID int64, bytes []byte) {
copy(newCache[:len(ms.cachedStmtBytes)], ms.cachedStmtBytes) copy(newCache[:len(ms.cachedStmtBytes)], ms.cachedStmtBytes)
} }
if int64(ms.expectReceiveSize+len(ms.cachedStmtBytes)) > ms.packageOffset+int64(len(contents)) { if int64(ms.expectReceiveSize+len(ms.cachedStmtBytes)) >= ms.packageOffset+int64(len(contents)) {
copy(newCache[ms.packageOffset:ms.packageOffset+int64(len(contents))], contents) copy(newCache[ms.packageOffset:ms.packageOffset+int64(len(contents))], contents)
ms.cachedStmtBytes = newCache ms.cachedStmtBytes = newCache
} else {
log.Debugf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXxxx")
return
} }
} else { } else {
@ -296,7 +301,7 @@ func (ms *MysqlSession) ReadFromClient(seqID int64, bytes []byte) {
} }
} }
cr := &jigsaw{b: seqID, e: seqID+contentSize} cr := &jigsaw{b: seqID, e: seqID + contentSize}
if len(ms.coverRanges) < 1 || insertIdx == len(ms.coverRanges) { if len(ms.coverRanges) < 1 || insertIdx == len(ms.coverRanges) {
ms.coverRanges = append(ms.coverRanges, cr) ms.coverRanges = append(ms.coverRanges, cr)
@ -311,17 +316,6 @@ func (ms *MysqlSession) ReadFromClient(seqID int64, bytes []byte) {
ms.mergeRanges() ms.mergeRanges()
} }
func (ms *MysqlSession) refreshWindowSize(readSize int) {
windowCounter := windowSizeCache[*ms.clientHost]
if windowCounter == nil {
windowCounter = newPackageWindowCounter()
windowSizeCache[*ms.clientHost] = windowCounter
}
// windowCounter.refresh(readSize, ms.checkFinish())
// ms.tcpWindowSize = windowCounter.suggestSize
}
func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) {
defer func() { defer func() {
ms.cachedStmtBytes = nil ms.cachedStmtBytes = nil
@ -330,7 +324,7 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) {
ms.prepareInfo = nil ms.prepareInfo = nil
ms.coverRanges = make([]*jigsaw, 0, 4) ms.coverRanges = make([]*jigsaw, 0, 4)
ms.lastSeq = -1 ms.lastSeq = -1
ms.ackID = -1 ms.ignoreAckID = -1
ms.sendSize = 0 ms.sendSize = 0
}() }()
@ -338,9 +332,8 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) {
return return
} }
// fmt.Printf("packageComplete in generate: %v\n", ms.packageComplete)
if !ms.checkFinish() { if !ms.checkFinish() {
log.Errorf("receive a not complete cover") log.Debugf("receive a not complete cover")
return return
} }
@ -430,5 +423,6 @@ func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (m
func (ms *MysqlSession) composeQueryPiece() (mqp *model.PooledMysqlQueryPiece) { func (ms *MysqlSession) composeQueryPiece() (mqp *model.PooledMysqlQueryPiece) {
return model.NewPooledMysqlQueryPiece( return model.NewPooledMysqlQueryPiece(
ms.connectionID, ms.visitUser, ms.visitDB, ms.clientHost, ms.serverIP, ms.serverPort, ms.stmtBeginTime) ms.connectionID, ms.clientHost, ms.visitUser, ms.visitDB, ms.clientHost, ms.serverIP,
ms.clientPort, ms.serverPort, ms.stmtBeginTime)
} }