fix bug when recover memory

This commit is contained in:
hebo 2019-11-01 17:51:57 +08:00
parent c9bdcceba9
commit 2b42cd4b3a
4 changed files with 23 additions and 67 deletions

View File

@ -93,13 +93,6 @@ func initEthernetHandlerFromPacp() (handler *pcap.Handle) {
} }
func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { func (nc *networkCard) Listen() (receiver chan model.QueryPiece) {
// if inParallel {
// nc.listenInParallel()
//
// } else {
// nc.listenNormal()
// }
nc.listenNormal() nc.listenNormal()
return nc.receiver return nc.receiver
} }
@ -163,62 +156,6 @@ func (nc *networkCard) listenNormal() {
return return
} }
// Listen get a connection.
func (nc *networkCard) listenInParallel() {
type captureInfo struct {
bytes []byte
captureInfo gopacket.CaptureInfo
}
rawDataChan := make(chan *captureInfo, 20)
packageChan := make(chan gopacket.Packet, 20)
// read packet
go func() {
defer func() {
close(packageChan)
}()
handler := initEthernetHandlerFromPacpgo()
for {
var data []byte
// data, ci, err := handler.ZeroCopyReadPacketData()
data, ci, err := handler.ReadPacketData()
if err != nil {
log.Error(err.Error())
time.Sleep(time.Second*3)
continue
}
rawDataChan <- &captureInfo{
bytes: data,
captureInfo: ci,
}
}
}()
// parse package
go func() {
for captureInfo := range rawDataChan {
packet := gopacket.NewPacket(captureInfo.bytes, layers.LayerTypeEthernet, gopacket.NoCopy)
m := packet.Metadata()
m.CaptureInfo = captureInfo.captureInfo
m.Truncated = m.Truncated || captureInfo.captureInfo.CaptureLength < captureInfo.captureInfo.Length
packageChan <- packet
}
}()
// parse package
go func() {
for packet := range packageChan {
nc.parseTCPPackage(packet)
}
}()
return
}
func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) { func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) {
var err error var err error
defer func() { defer func() {

View File

@ -30,7 +30,12 @@ func (sbp *sliceBufferPool) Enqueue(buffer []byte) {
return return
} }
sbp.queue <- buffer select {
case sbp.queue <- buffer:
return
default:
buffer = nil
}
} }
func (sbp *sliceBufferPool) DequeueWithInit(initSize int) (buffer []byte) { func (sbp *sliceBufferPool) DequeueWithInit(initSize int) (buffer []byte) {

View File

@ -19,7 +19,13 @@ func (mqpp *mysqlQueryPiecePool) Enqueue(pmqp *PooledMysqlQueryPiece) {
mqpp.lock.Lock() mqpp.lock.Lock()
defer mqpp.lock.Unlock() defer mqpp.lock.Unlock()
mqpp.queue <- pmqp select {
case mqpp.queue <- pmqp:
return
default:
pmqp = nil
return
}
} }
func (mqpp *mysqlQueryPiecePool) Dequeue() (pmqp *PooledMysqlQueryPiece) { func (mqpp *mysqlQueryPiecePool) Dequeue() (pmqp *PooledMysqlQueryPiece) {
@ -29,6 +35,7 @@ func (mqpp *mysqlQueryPiecePool) Dequeue() (pmqp *PooledMysqlQueryPiece) {
select { select {
case pmqp = <- mqpp.queue: case pmqp = <- mqpp.queue:
return return
default: default:
pmqp = &PooledMysqlQueryPiece{ pmqp = &PooledMysqlQueryPiece{
MysqlQueryPiece: MysqlQueryPiece{}, MysqlQueryPiece: MysqlQueryPiece{},

View File

@ -100,13 +100,19 @@ func (crp *coveragePool) NewCoverage(begin, end int64)(cn *coverageNode) {
return return
} }
func (crp *coveragePool) Enqueue(cn *coverageNode) { func (crp *coveragePool) Enqueue(cn *coverageNode) {
// log.Debugf("coveragePool enqueue: %d", len(crp.queue)) // log.Debugf("coveragePool enqueue: %d", len(crp.queue))
if cn == nil { if cn == nil {
return return
} }
crp.queue <- cn select {
case crp.queue <- cn:
return
default:
cn = nil
}
} }
func (crp *coveragePool) Dequeue() (cn *coverageNode) { func (crp *coveragePool) Dequeue() (cn *coverageNode) {
@ -122,6 +128,7 @@ func (crp *coveragePool) Dequeue() (cn *coverageNode) {
select { select {
case cn = <- crp.queue: case cn = <- crp.queue:
return return
default: default:
cn = &coverageNode{} cn = &coverageNode{}
return return