diff --git a/capture/network.go b/capture/network.go index 33cdedc..a9b2424 100644 --- a/capture/network.go +++ b/capture/network.go @@ -31,17 +31,17 @@ func init() { // networkCard is network device type networkCard struct { - name string + name string listenPort int - receiver chan model.QueryPiece + receiver chan model.QueryPiece } func NewNetworkCard() (nc *networkCard) { // init device return &networkCard{ - name: DeviceName, + name: DeviceName, listenPort: snifferPort, - receiver: make(chan model.QueryPiece, 100), + receiver: make(chan model.QueryPiece, 100), } } @@ -93,18 +93,10 @@ func initEthernetHandlerFromPacp() (handler *pcap.Handle) { } func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { - // if inParallel { - // nc.listenInParallel() - // - // } else { - // nc.listenNormal() - // } - nc.listenNormal() return nc.receiver } - // Listen get a connection. func (nc *networkCard) listenNormal() { go func() { @@ -119,7 +111,7 @@ func (nc *networkCard) listenNormal() { // capture packets according to a certain probability capturePacketRate := communicator.GetTCPCapturePacketRate() if capturePacketRate <= 0 { - time.Sleep(time.Second*1) + time.Sleep(time.Second * 1) aliveCounter += 1 if aliveCounter >= checkCount { aliveCounter = 0 @@ -131,7 +123,7 @@ func (nc *networkCard) listenNormal() { data, ci, err = handler.ZeroCopyReadPacketData() if err != nil { log.Error(err.Error()) - time.Sleep(time.Second*3) + time.Sleep(time.Second * 3) continue } @@ -142,7 +134,9 @@ func (nc *networkCard) listenNormal() { // send FIN tcp packet to avoid not complete session cannot be released tcpPkt := packet.TransportLayer().(*layers.TCP) - if tcpPkt.FIN { + // deal FIN packet + // deal auth packet + if tcpPkt.FIN || sd.IsAuthPacket(tcpPkt.Payload) { nc.parseTCPPackage(packet) continue } @@ -163,62 +157,6 @@ func (nc *networkCard) listenNormal() { return } -// Listen get a connection. -func (nc *networkCard) listenInParallel() { - type captureInfo struct { - bytes []byte - captureInfo gopacket.CaptureInfo - } - - rawDataChan := make(chan *captureInfo, 20) - packageChan := make(chan gopacket.Packet, 20) - - // read packet - go func() { - defer func() { - close(packageChan) - }() - - handler := initEthernetHandlerFromPacpgo() - for { - var data []byte - // data, ci, err := handler.ZeroCopyReadPacketData() - data, ci, err := handler.ReadPacketData() - if err != nil { - log.Error(err.Error()) - time.Sleep(time.Second*3) - continue - } - - rawDataChan <- &captureInfo{ - bytes: data, - captureInfo: ci, - } - } - }() - - // parse package - go func() { - for captureInfo := range rawDataChan { - packet := gopacket.NewPacket(captureInfo.bytes, layers.LayerTypeEthernet, gopacket.NoCopy) - m := packet.Metadata() - m.CaptureInfo = captureInfo.captureInfo - m.Truncated = m.Truncated || captureInfo.captureInfo.CaptureLength < captureInfo.captureInfo.Length - - packageChan <- packet - } - }() - - // parse package - go func() { - for packet := range packageChan { - nc.parseTCPPackage(packet) - } - }() - - return -} - func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) { var err error defer func() { @@ -337,4 +275,3 @@ func readToServerPackage( return } - diff --git a/model/cache_pool.go b/model/cache_pool.go index 8e5e348..8019211 100644 --- a/model/cache_pool.go +++ b/model/cache_pool.go @@ -30,7 +30,12 @@ func (sbp *sliceBufferPool) Enqueue(buffer []byte) { return } - sbp.queue <- buffer + select { + case sbp.queue <- buffer: + return + default: + buffer = nil + } } func (sbp *sliceBufferPool) DequeueWithInit(initSize int) (buffer []byte) { diff --git a/model/query_piece_pool.go b/model/query_piece_pool.go index 3155cd5..de3fabf 100644 --- a/model/query_piece_pool.go +++ b/model/query_piece_pool.go @@ -19,7 +19,13 @@ func (mqpp *mysqlQueryPiecePool) Enqueue(pmqp *PooledMysqlQueryPiece) { mqpp.lock.Lock() defer mqpp.lock.Unlock() - mqpp.queue <- pmqp + select { + case mqpp.queue <- pmqp: + return + default: + pmqp = nil + return + } } func (mqpp *mysqlQueryPiecePool) Dequeue() (pmqp *PooledMysqlQueryPiece) { @@ -29,6 +35,7 @@ func (mqpp *mysqlQueryPiecePool) Dequeue() (pmqp *PooledMysqlQueryPiece) { select { case pmqp = <- mqpp.queue: return + default: pmqp = &PooledMysqlQueryPiece{ MysqlQueryPiece: MysqlQueryPiece{}, diff --git a/session-dealer/controller.go b/session-dealer/controller.go index 72b7572..cbd8541 100644 --- a/session-dealer/controller.go +++ b/session-dealer/controller.go @@ -24,3 +24,13 @@ func CheckParams() { mysql.CheckParams() } } + +func IsAuthPacket(payload []byte) bool { + switch serviceType { + case ServiceTypeMysql: + return len(payload) >= 5 && mysql.IsAuth(payload[4]) + + default: + return false + } +} diff --git a/session-dealer/mysql/cover_range.go b/session-dealer/mysql/cover_range.go index 7654c30..da5b4bb 100644 --- a/session-dealer/mysql/cover_range.go +++ b/session-dealer/mysql/cover_range.go @@ -100,13 +100,19 @@ func (crp *coveragePool) NewCoverage(begin, end int64)(cn *coverageNode) { return } -func (crp *coveragePool) Enqueue(cn *coverageNode) { +func (crp *coveragePool) Enqueue(cn *coverageNode) { // log.Debugf("coveragePool enqueue: %d", len(crp.queue)) if cn == nil { return } - crp.queue <- cn + select { + case crp.queue <- cn: + return + + default: + cn = nil + } } func (crp *coveragePool) Dequeue() (cn *coverageNode) { @@ -122,6 +128,7 @@ func (crp *coveragePool) Dequeue() (cn *coverageNode) { select { case cn = <- crp.queue: return + default: cn = &coverageNode{} return diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 26c91fa..2b65f0c 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -144,7 +144,7 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { if ms.expectReceiveSize == -1 { // ignore invalid head package - if len(bytes) <= 4{ + if len(bytes) <= 4 { return } @@ -208,6 +208,10 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { // ms.expectReceiveSize = ms.expectReceiveSize - int(contentSize) } +func IsAuth(val byte) bool { + return val > 32 +} + func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { defer ms.clear() @@ -227,7 +231,7 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { var mqp *model.PooledMysqlQueryPiece var querySQLInBytes []byte - if ms.cachedStmtBytes[0] > 32 { + if IsAuth(ms.cachedStmtBytes[0]) { userName, dbName, err := parseAuthInfo(ms.cachedStmtBytes) if err != nil { log.Errorf("parse auth info failed <-- %s", err.Error())