From b79e9f6867a7be1b01d744e78b65d3dd2f80cd96 Mon Sep 17 00:00:00 2001 From: hebo Date: Fri, 27 Sep 2019 16:48:47 +0800 Subject: [PATCH] do not ignore FIN tcp packet --- capture/network.go | 40 +++++++++++++++++++++++++------------- communicator/config.go | 6 ++---- communicator/controller.go | 6 ++++++ 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/capture/network.go b/capture/network.go index 9f00792..d5ee4b0 100644 --- a/capture/network.go +++ b/capture/network.go @@ -20,13 +20,13 @@ import ( var ( DeviceName string snifferPort int - inParallel bool + // inParallel bool ) func init() { flag.StringVar(&DeviceName, "interface", "eth0", "network device name. Default is eth0") flag.IntVar(&snifferPort, "port", 3306, "sniffer port. Default is 3306") - flag.BoolVar(&inParallel, "in_parallel", false, "if capture and deal package in parallel. Default is false") + // flag.BoolVar(&inParallel, "in_parallel", false, "if capture and deal package in parallel. Default is false") } // networkCard is network device @@ -93,21 +93,27 @@ func initEthernetHandlerFromPacp() (handler *pcap.Handle) { } func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { - if inParallel { - nc.listenInParallel() - - } else { - nc.listenNormal() - } + // if inParallel { + // nc.listenInParallel() + // + // } else { + // nc.listenNormal() + // } + nc.listenNormal() return nc.receiver } +func isFINPacket(data []byte) (isFIN bool) { + return +} + // Listen get a connection. func (nc *networkCard) listenNormal() { go func() { aliveCounter := 0 - handler := initEthernetHandlerFromPacpgo() + handler := initEthernetHandlerFromPacp() + for { var data []byte var ci gopacket.CaptureInfo @@ -120,6 +126,18 @@ func (nc *networkCard) listenNormal() { continue } + // packet := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.NoCopy) + packet := gopacket.NewPacket(data, handler.LinkType(), gopacket.NoCopy) + m := packet.Metadata() + m.CaptureInfo = ci + + // send FIN tcp packet to avoid not complete session cannot be released + tcpPkt := packet.TransportLayer().(*layers.TCP) + if tcpPkt.FIN { + nc.parseTCPPackage(packet) + continue + } + // capture packets according to a certain probability tcpCapturePacketRate := communicator.GetTCPCapturePacketRate() if tcpCapturePacketRate <= 0 { @@ -140,10 +158,6 @@ func (nc *networkCard) listenNormal() { } aliveCounter = 0 - packet := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.NoCopy) - m := packet.Metadata() - m.CaptureInfo = ci - m.Truncated = m.Truncated || ci.CaptureLength < ci.Length nc.parseTCPPackage(packet) } }() diff --git a/communicator/config.go b/communicator/config.go index e8d2910..73806a2 100644 --- a/communicator/config.go +++ b/communicator/config.go @@ -13,7 +13,6 @@ const ( var ( communicatePort int - // capturePacketRate float64 router = mux.NewRouter() ) @@ -21,15 +20,14 @@ var ( configMapLock sync.RWMutex configMap map[string]configItem catpurePacketRate *capturePacketRateConfig + catpurePacketRateVal float64 ) func init() { catpurePacketRate = newCapturePacketRateConfig() flag.IntVar(&communicatePort, "communicate_port", 8088, "http server port. Default is 8088") - var cpr float64 - flag.Float64Var(&cpr, CAPTURE_PACKET_RATE, 0.01, "capture packet rate. Default is 0.01") - _ = catpurePacketRate.setVal(cpr) + flag.Float64Var(&catpurePacketRateVal, CAPTURE_PACKET_RATE, 0.01, "capture packet rate. Default is 0.01") configMap = make(map[string]configItem) } diff --git a/communicator/controller.go b/communicator/controller.go index 5b97185..43a1e44 100644 --- a/communicator/controller.go +++ b/communicator/controller.go @@ -11,6 +11,8 @@ import ( ) func Server() { + initConfig() + server := &http.Server{ Addr: "0.0.0.0:" + strconv.Itoa(communicatePort), IdleTimeout: time.Second * 5, @@ -22,6 +24,10 @@ func Server() { } } +func initConfig() { + _ = catpurePacketRate.setVal(catpurePacketRateVal) +} + func outletCheckAlive(resp http.ResponseWriter, req *http.Request) { mp := hu.NewMouthpiece(resp) defer func() {