diff --git a/.gitignore b/.gitignore index 0d72c1f..32835eb 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ sniffer-agent # vendor/github.com profile* *.svg +nohup.out +*/nohup.out diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 9476f6d..8fa4383 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -109,7 +109,7 @@ }, { "ImportPath": "github.com/zr-hebo/util-db", - "Rev": "3ff29f916f7b712b3adc53c4b9b19b13b8bbed87" + "Rev": "06948bca5665b2d80078f9ff5c015eabb43c54f8" }, { "ImportPath": "github.com/zr-hebo/util-http", diff --git a/capture/network.go b/capture/network.go index a981347..33cdedc 100644 --- a/capture/network.go +++ b/capture/network.go @@ -20,13 +20,13 @@ import ( var ( DeviceName string snifferPort int - inParallel bool + // inParallel bool ) func init() { flag.StringVar(&DeviceName, "interface", "eth0", "network device name. Default is eth0") flag.IntVar(&snifferPort, "port", 3306, "sniffer port. Default is 3306") - flag.BoolVar(&inParallel, "in_parallel", false, "if capture and deal package in parallel. Default is false") + // flag.BoolVar(&inParallel, "in_parallel", false, "if capture and deal package in parallel. Default is false") } // networkCard is network device @@ -93,21 +93,24 @@ func initEthernetHandlerFromPacp() (handler *pcap.Handle) { } func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { - if inParallel { - nc.listenInParallel() - - } else { - nc.listenNormal() - } + // if inParallel { + // nc.listenInParallel() + // + // } else { + // nc.listenNormal() + // } + nc.listenNormal() return nc.receiver } + // Listen get a connection. func (nc *networkCard) listenNormal() { go func() { aliveCounter := 0 - handler := initEthernetHandlerFromPacpgo() + handler := initEthernetHandlerFromPacp() + for { var data []byte var ci gopacket.CaptureInfo @@ -123,8 +126,28 @@ func (nc *networkCard) listenNormal() { nc.receiver <- model.NewBaseQueryPiece(localIPAddr, nc.listenPort, capturePacketRate) } continue + } - } else if 0 < capturePacketRate && capturePacketRate < 1.0 { + data, ci, err = handler.ZeroCopyReadPacketData() + if err != nil { + log.Error(err.Error()) + time.Sleep(time.Second*3) + continue + } + + // packet := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.NoCopy) + packet := gopacket.NewPacket(data, handler.LinkType(), gopacket.NoCopy) + m := packet.Metadata() + m.CaptureInfo = ci + + // send FIN tcp packet to avoid not complete session cannot be released + tcpPkt := packet.TransportLayer().(*layers.TCP) + if tcpPkt.FIN { + nc.parseTCPPackage(packet) + continue + } + + if 0 < capturePacketRate && capturePacketRate < 1.0 { // fall into throw range rn := rand.Float64() if rn > capturePacketRate { @@ -133,17 +156,6 @@ func (nc *networkCard) listenNormal() { } aliveCounter = 0 - data, ci, err = handler.ZeroCopyReadPacketData() - if err != nil { - log.Error(err.Error()) - time.Sleep(time.Second*3) - continue - } - - packet := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.NoCopy) - m := packet.Metadata() - m.CaptureInfo = ci - m.Truncated = m.Truncated || ci.CaptureLength < ci.Length nc.parseTCPPackage(packet) } }() diff --git a/communicator/config.go b/communicator/config.go index aebf222..73806a2 100644 --- a/communicator/config.go +++ b/communicator/config.go @@ -13,7 +13,6 @@ const ( var ( communicatePort int - // capturePacketRate float64 router = mux.NewRouter() ) @@ -21,15 +20,14 @@ var ( configMapLock sync.RWMutex configMap map[string]configItem catpurePacketRate *capturePacketRateConfig + catpurePacketRateVal float64 ) func init() { catpurePacketRate = newCapturePacketRateConfig() flag.IntVar(&communicatePort, "communicate_port", 8088, "http server port. Default is 8088") - var cpr float64 - flag.Float64Var(&cpr, CAPTURE_PACKET_RATE, 1, "capture packet rate. Default is 1.0") - _ = catpurePacketRate.setVal(cpr) + flag.Float64Var(&catpurePacketRateVal, CAPTURE_PACKET_RATE, 0.01, "capture packet rate. Default is 0.01") configMap = make(map[string]configItem) } diff --git a/communicator/controller.go b/communicator/controller.go index 5b97185..43a1e44 100644 --- a/communicator/controller.go +++ b/communicator/controller.go @@ -11,6 +11,8 @@ import ( ) func Server() { + initConfig() + server := &http.Server{ Addr: "0.0.0.0:" + strconv.Itoa(communicatePort), IdleTimeout: time.Second * 5, @@ -22,6 +24,10 @@ func Server() { } } +func initConfig() { + _ = catpurePacketRate.setVal(catpurePacketRateVal) +} + func outletCheckAlive(resp http.ResponseWriter, req *http.Request) { mp := hu.NewMouthpiece(resp) defer func() { diff --git a/communicator/model.go b/communicator/model.go index 44ad29e..ec420f3 100644 --- a/communicator/model.go +++ b/communicator/model.go @@ -32,6 +32,7 @@ func (cprc *capturePacketRateConfig) setVal (val interface{}) (err error){ return } + fmt.Printf("set config %s: %v\n", CAPTURE_PACKET_RATE, realVal) cprc.mysqlTPR = realVal cprc.tcpTPR = math.Sqrt(realVal) return diff --git a/model/mysql_query_piece.go b/model/mysql_query_piece.go index bebe694..6998835 100644 --- a/model/mysql_query_piece.go +++ b/model/mysql_query_piece.go @@ -9,9 +9,9 @@ import ( type MysqlQueryPiece struct { BaseQueryPiece - SessionID *string `json:"cid"` - ClientHost *string `json:"-"` - ClientPort int `json:"-"` + SessionID *string `json:"-"` + ClientHost *string `json:"cip"` + ClientPort int `json:"cport"` VisitUser *string `json:"user"` VisitDB *string `json:"db"` @@ -62,16 +62,21 @@ func (mqp *MysqlQueryPiece) Bytes() (content []byte) { return mqp.jsonContent } - mqp.jsonContent = marsharQueryPiece(mqp) + mqp.GenerateJsonBytes() return mqp.jsonContent } +func (mqp *MysqlQueryPiece) GenerateJsonBytes() { + mqp.jsonContent = marsharQueryPieceMonopolize(mqp) + return +} + func (mqp *MysqlQueryPiece) GetSQL() (str *string) { return mqp.QuerySQL } func (pmqp *PooledMysqlQueryPiece) Recovery() { - pmqp.recoverPool.Enqueue(pmqp) - pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0]) + // pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0]) pmqp.jsonContent = nil + pmqp.recoverPool.Enqueue(pmqp) } diff --git a/model/query_piece.go b/model/query_piece.go index 440a664..80f207d 100644 --- a/model/query_piece.go +++ b/model/query_piece.go @@ -1,13 +1,15 @@ package model import ( - // "github.com/json-iterator/go" "bytes" "encoding/json" + jsoniter "github.com/json-iterator/go" "github.com/pingcap/tidb/util/hack" "time" ) +var jsonIterator = jsoniter.ConfigCompatibleWithStandardLibrary + type QueryPiece interface { String() *string Bytes() []byte @@ -70,7 +72,7 @@ func (bqp *BaseQueryPiece) Bytes() (content []byte) { return bqp.jsonContent } - bqp.jsonContent = marsharQueryPiece(bqp) + bqp.jsonContent = marsharQueryPieceMonopolize(bqp) return bqp.jsonContent } @@ -81,8 +83,12 @@ func (bqp *BaseQueryPiece) GetSQL() (*string) { func (bqp *BaseQueryPiece) Recovery() { } -func marsharQueryPiece(qp interface{}) []byte { +func marsharQueryPieceShareMemory(qp interface{}) []byte { var cacheBuffer = localSliceBufferPool.Dequeue() + if len(cacheBuffer) > 0 { + panic("there already have bytes in buffer") + } + buffer := bytes.NewBuffer(cacheBuffer) err := json.NewEncoder(buffer).Encode(qp) if err != nil { @@ -91,3 +97,12 @@ func marsharQueryPiece(qp interface{}) []byte { return buffer.Bytes() } + +func marsharQueryPieceMonopolize(qp interface{}) (content []byte) { + content, err := jsonIterator.Marshal(qp) + if err != nil { + return []byte(err.Error()) + } + + return content +} \ No newline at end of file diff --git a/model/query_piece_pool.go b/model/query_piece_pool.go index c3eaa0f..3155cd5 100644 --- a/model/query_piece_pool.go +++ b/model/query_piece_pool.go @@ -19,7 +19,6 @@ func (mqpp *mysqlQueryPiecePool) Enqueue(pmqp *PooledMysqlQueryPiece) { mqpp.lock.Lock() defer mqpp.lock.Unlock() - mqpp.queue <- pmqp } diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 99f6a96..26c91fa 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -12,17 +12,17 @@ import ( ) type MysqlSession struct { - connectionID *string - visitUser *string - visitDB *string - clientHost *string - clientPort int - serverIP *string - serverPort int - stmtBeginTime int64 + connectionID *string + visitUser *string + visitDB *string + clientHost *string + clientPort int + serverIP *string + serverPort int + stmtBeginTime int64 // packageOffset int64 - beginSeqID int64 - endSeqID int64 + beginSeqID int64 + endSeqID int64 coverRanges *coverRanges expectReceiveSize int expectSendSize int @@ -59,7 +59,7 @@ func NewMysqlSession( queryPieceReceiver: receiver, closeConn: make(chan bool, 1), expectReceiveSize: -1, - coverRanges: NewCoverRanges(), + coverRanges: NewCoverRanges(), ignoreAckID: -1, sendSize: 0, pkgCacheLock: sync.Mutex{}, @@ -100,7 +100,7 @@ func (ms *MysqlSession) resetBeginTime() { } func (ms *MysqlSession) readFromServer(bytes []byte) { - if ms.expectSendSize < 1 { + if ms.expectSendSize < 1 && len(bytes) > 4 { ms.expectSendSize = extractMysqlPayloadSize(bytes[:4]) contents := bytes[4:] if ms.prepareInfo != nil && contents[0] == 0 { @@ -115,7 +115,7 @@ func (ms *MysqlSession) checkFinish() bool { } checkNode := ms.coverRanges.head.next - if checkNode.end - checkNode.begin == int64(len(ms.cachedStmtBytes)) { + if checkNode.end-checkNode.begin == int64(len(ms.cachedStmtBytes)) { return true } @@ -143,6 +143,11 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { contentSize := int64(len(bytes)) if ms.expectReceiveSize == -1 { + // ignore invalid head package + if len(bytes) <= 4{ + return + } + ms.expectReceiveSize = extractMysqlPayloadSize(bytes[:4]) // ignore too big mysql packet if ms.expectReceiveSize >= MaxMysqlPacketLen { @@ -160,9 +165,8 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { ms.beginSeqID = seqID ms.endSeqID = seqID - if int64(ms.expectReceiveSize) < int64(len(contents)) { - log.Warnf("receive invalid mysql packet") + log.Debug("receive invalid mysql packet") return } @@ -177,7 +181,7 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { } if ms.beginSeqID == -1 { - log.Warnf("cover range is empty") + log.Debug("cover range is empty") return } @@ -192,7 +196,7 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { if seqOffset+contentSize > int64(len(ms.cachedStmtBytes)) { // not in a normal mysql packet log.Debugf("receive an unexpect packet") - ms.clear() + ms.clear() return } @@ -204,7 +208,6 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { // ms.expectReceiveSize = ms.expectReceiveSize - int(contentSize) } - func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { defer ms.clear() @@ -298,10 +301,15 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { } } - return filterQueryPieceBySQL(mqp, querySQLInBytes) + mqp = filterQueryPieceBySQL(mqp, querySQLInBytes) + if mqp == nil { + return nil + } + mqp.GenerateJsonBytes() + return mqp } -func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (model.QueryPiece) { +func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (*model.PooledMysqlQueryPiece) { if mqp == nil || querySQL == nil { return nil diff --git a/vendor/github.com/zr-hebo/util-db/query_db.go b/vendor/github.com/zr-hebo/util-db/query_db.go index 574ae53..64b0f22 100644 --- a/vendor/github.com/zr-hebo/util-db/query_db.go +++ b/vendor/github.com/zr-hebo/util-db/query_db.go @@ -43,12 +43,14 @@ type MysqlDB struct { DatabaseType string DBName string ConnectTimeout int + QueryTimeout int } // NewMysqlDB 创建MySQL数据库 func NewMysqlDB() (md *MysqlDB) { md = new(MysqlDB) md.DatabaseType = dbTypeMysql + md.QueryTimeout = 5 return } @@ -352,7 +354,7 @@ func (md *MysqlDB) fillConnStr() string { md.UserName, md.Passwd, md.IP, md.Port, md.DBName) if md.ConnectTimeout > 0 { dbServerInfoStr = fmt.Sprintf("%s?timeout=%ds&readTimeout=%ds&writeTimeout=%ds", - dbServerInfoStr, md.ConnectTimeout, md.ConnectTimeout, md.ConnectTimeout) + dbServerInfoStr, md.ConnectTimeout, md.QueryTimeout, md.QueryTimeout) } return dbServerInfoStr diff --git a/vendor/github.com/zr-hebo/util-db/query_pooled_db.go b/vendor/github.com/zr-hebo/util-db/query_pooled_db.go index 051c35d..dbe820e 100644 --- a/vendor/github.com/zr-hebo/util-db/query_pooled_db.go +++ b/vendor/github.com/zr-hebo/util-db/query_pooled_db.go @@ -6,7 +6,6 @@ import ( "time" ) - // PooledMysqlDB Mysql主机实例 type PooledMysqlDB struct { MysqlDB @@ -46,6 +45,7 @@ func NewPooledMysqlDBWithAllParam( func NewPooledMysqlDB() (pmd *PooledMysqlDB) { pmd = new(PooledMysqlDB) pmd.DatabaseType = dbTypeMysql + pmd.QueryTimeout = 5 pmd.lock = new(sync.Mutex) return }