diff --git a/capture/network.go b/capture/network.go index 33cdedc..b063389 100644 --- a/capture/network.go +++ b/capture/network.go @@ -93,13 +93,6 @@ func initEthernetHandlerFromPacp() (handler *pcap.Handle) { } func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { - // if inParallel { - // nc.listenInParallel() - // - // } else { - // nc.listenNormal() - // } - nc.listenNormal() return nc.receiver } @@ -163,62 +156,6 @@ func (nc *networkCard) listenNormal() { return } -// Listen get a connection. -func (nc *networkCard) listenInParallel() { - type captureInfo struct { - bytes []byte - captureInfo gopacket.CaptureInfo - } - - rawDataChan := make(chan *captureInfo, 20) - packageChan := make(chan gopacket.Packet, 20) - - // read packet - go func() { - defer func() { - close(packageChan) - }() - - handler := initEthernetHandlerFromPacpgo() - for { - var data []byte - // data, ci, err := handler.ZeroCopyReadPacketData() - data, ci, err := handler.ReadPacketData() - if err != nil { - log.Error(err.Error()) - time.Sleep(time.Second*3) - continue - } - - rawDataChan <- &captureInfo{ - bytes: data, - captureInfo: ci, - } - } - }() - - // parse package - go func() { - for captureInfo := range rawDataChan { - packet := gopacket.NewPacket(captureInfo.bytes, layers.LayerTypeEthernet, gopacket.NoCopy) - m := packet.Metadata() - m.CaptureInfo = captureInfo.captureInfo - m.Truncated = m.Truncated || captureInfo.captureInfo.CaptureLength < captureInfo.captureInfo.Length - - packageChan <- packet - } - }() - - // parse package - go func() { - for packet := range packageChan { - nc.parseTCPPackage(packet) - } - }() - - return -} - func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) { var err error defer func() { diff --git a/model/cache_pool.go b/model/cache_pool.go index 8e5e348..8019211 100644 --- a/model/cache_pool.go +++ b/model/cache_pool.go @@ -30,7 +30,12 @@ func (sbp *sliceBufferPool) Enqueue(buffer []byte) { return } - sbp.queue <- buffer + select { + case sbp.queue <- buffer: + return + default: + buffer = nil + } } func (sbp *sliceBufferPool) DequeueWithInit(initSize int) (buffer []byte) { diff --git a/model/query_piece_pool.go b/model/query_piece_pool.go index 3155cd5..de3fabf 100644 --- a/model/query_piece_pool.go +++ b/model/query_piece_pool.go @@ -19,7 +19,13 @@ func (mqpp *mysqlQueryPiecePool) Enqueue(pmqp *PooledMysqlQueryPiece) { mqpp.lock.Lock() defer mqpp.lock.Unlock() - mqpp.queue <- pmqp + select { + case mqpp.queue <- pmqp: + return + default: + pmqp = nil + return + } } func (mqpp *mysqlQueryPiecePool) Dequeue() (pmqp *PooledMysqlQueryPiece) { @@ -29,6 +35,7 @@ func (mqpp *mysqlQueryPiecePool) Dequeue() (pmqp *PooledMysqlQueryPiece) { select { case pmqp = <- mqpp.queue: return + default: pmqp = &PooledMysqlQueryPiece{ MysqlQueryPiece: MysqlQueryPiece{}, diff --git a/session-dealer/mysql/cover_range.go b/session-dealer/mysql/cover_range.go index 7654c30..da5b4bb 100644 --- a/session-dealer/mysql/cover_range.go +++ b/session-dealer/mysql/cover_range.go @@ -100,13 +100,19 @@ func (crp *coveragePool) NewCoverage(begin, end int64)(cn *coverageNode) { return } -func (crp *coveragePool) Enqueue(cn *coverageNode) { +func (crp *coveragePool) Enqueue(cn *coverageNode) { // log.Debugf("coveragePool enqueue: %d", len(crp.queue)) if cn == nil { return } - crp.queue <- cn + select { + case crp.queue <- cn: + return + + default: + cn = nil + } } func (crp *coveragePool) Dequeue() (cn *coverageNode) { @@ -122,6 +128,7 @@ func (crp *coveragePool) Dequeue() (cn *coverageNode) { select { case cn = <- crp.queue: return + default: cn = &coverageNode{} return