diff --git a/.gitignore b/.gitignore index 096e586..a63c235 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ one_key.sh sniffer-agent # vendor/github.com +profile* \ No newline at end of file diff --git a/capture/config.go b/capture/config.go index ecdbbec..c466107 100644 --- a/capture/config.go +++ b/capture/config.go @@ -6,17 +6,17 @@ import ( ) var ( - localIPAddr string + localIPAddr *string sessionPool = make(map[string]sd.ConnSession) ) func init() { - var err error - localIPAddr, err = getLocalIPAddr() + ipAddr, err := getLocalIPAddr() if err != nil { panic(err) } - log.Infof("parsed local ip address:%s", localIPAddr) + localIPAddr = &ipAddr + log.Infof("parsed local ip address:%s", *localIPAddr) } diff --git a/capture/network.go b/capture/network.go index 257be39..8f04f22 100644 --- a/capture/network.go +++ b/capture/network.go @@ -3,12 +3,12 @@ package capture import ( "flag" "fmt" - log "github.com/sirupsen/logrus" - sd "github.com/zr-hebo/sniffer-agent/session-dealer" - "github.com/zr-hebo/sniffer-agent/model" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" + log "github.com/sirupsen/logrus" + "github.com/zr-hebo/sniffer-agent/model" + sd "github.com/zr-hebo/sniffer-agent/session-dealer" ) var ( @@ -41,22 +41,29 @@ func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { close(receiver) }() - handle, err := pcap.OpenLive(DeviceName, 65535, false, pcap.BlockForever) + handler, err := pcap.OpenLive(DeviceName, 65535, false, pcap.BlockForever) if err != nil { panic(fmt.Sprintf("cannot open network interface %s <-- %s", nc.name, err.Error())) } + linkType := handler.LinkType() - packetSource := gopacket.NewPacketSource(handle, handle.LinkType()) - for packet := range packetSource.Packets() { - if packet.NetworkLayer() == nil || packet.TransportLayer() == nil { - // log.Info("empty network layer") + err = handler.SetBPFFilter(fmt.Sprintf("tcp and (port %d)", snifferPort)) + if err != nil { + panic(err.Error()) + } + + for { + var data []byte + data, ci, err := handler.ZeroCopyReadPacketData() + if err != nil { + log.Error(err.Error()) continue } - if packet.TransportLayer().LayerType() != layers.LayerTypeTCP { - // log.Info("packet type is %s, not TCP", packet.TransportLayer().LayerType()) - continue - } + packet := gopacket.NewPacket(data, linkType, gopacket.NoCopy) + m := packet.Metadata() + m.CaptureInfo = ci + m.Truncated = m.Truncated || ci.CaptureLength < ci.Length qp := nc.parseTCPPackage(packet) if qp != nil { @@ -81,10 +88,6 @@ func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) (qp model.QueryPi return } - if(int(tcpConn.DstPort) != nc.listenPort && int(tcpConn.SrcPort) != nc.listenPort) { - return - } - ipLayer := packet.Layer(layers.LayerTypeIPv4) if ipLayer == nil { err = fmt.Errorf("no ip layer found in package") @@ -102,16 +105,16 @@ func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) (qp model.QueryPi dstIP := ipInfo.DstIP.String() srcPort := int(tcpConn.SrcPort) dstPort := int(tcpConn.DstPort) - if dstIP == localIPAddr && dstPort == nc.listenPort { + if dstIP == *localIPAddr && dstPort == nc.listenPort { // deal mysql server response - err = readToServerPackage(srcIP, srcPort, tcpConn) + err = readToServerPackage(&srcIP, srcPort, tcpConn) if err != nil { return } - } else if srcIP == localIPAddr && srcPort == nc.listenPort { + } else if srcIP == *localIPAddr && srcPort == nc.listenPort { // deal mysql client request - qp, err = readFromServerPackage(dstIP, dstPort, tcpConn) + qp, err = readFromServerPackage(&dstIP, dstPort, tcpConn) if err != nil { return } @@ -120,17 +123,17 @@ func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) (qp model.QueryPi return } -func readFromServerPackage(srcIP string, srcPort int, tcpConn *layers.TCP) (qp model.QueryPiece, err error) { +func readFromServerPackage(srcIP *string, srcPort int, tcpConn *layers.TCP) (qp model.QueryPiece, err error) { defer func() { if err != nil { log.Error("read Mysql package send from mysql server to client failed <-- %s", err.Error()) } }() - sessionKey := spliceSessionKey(srcIP, srcPort) if tcpConn.FIN { - delete(sessionPool, sessionKey) - // log.Debugf("close connection from %s", sessionKey) + sessionKey := spliceSessionKey(srcIP, srcPort) + delete(sessionPool, *sessionKey) + log.Debugf("close connection from %s", *sessionKey) return } @@ -139,7 +142,8 @@ func readFromServerPackage(srcIP string, srcPort int, tcpConn *layers.TCP) (qp m return } - session := sessionPool[sessionKey] + sessionKey := spliceSessionKey(srcIP, srcPort) + session := sessionPool[*sessionKey] if session != nil { session.ReadFromServer(tcpPayload) qp = session.GenerateQueryPiece() @@ -148,18 +152,18 @@ func readFromServerPackage(srcIP string, srcPort int, tcpConn *layers.TCP) (qp m return } -func readToServerPackage(srcIP string, srcPort int, tcpConn *layers.TCP) (err error) { +func readToServerPackage(srcIP *string, srcPort int, tcpConn *layers.TCP) (err error) { defer func() { if err != nil { log.Error("read package send from client to mysql server failed <-- %s", err.Error()) } }() - sessionKey := spliceSessionKey(srcIP, srcPort) // when client try close connection remove session from session pool if tcpConn.FIN { - delete(sessionPool, sessionKey) - // log.Debugf("close connection from %s", sessionKey) + sessionKey := spliceSessionKey(srcIP, srcPort) + delete(sessionPool, *sessionKey) + log.Debugf("close connection from %s", *sessionKey) return } @@ -168,12 +172,14 @@ func readToServerPackage(srcIP string, srcPort int, tcpConn *layers.TCP) (err er return } - session := sessionPool[sessionKey] + sessionKey := spliceSessionKey(srcIP, srcPort) + session := sessionPool[*sessionKey] if session == nil { session = sd.NewSession(sessionKey, srcIP, srcPort, localIPAddr, snifferPort) - sessionPool[sessionKey] = session + sessionPool[*sessionKey] = session } + session.ResetBeginTime() session.ReadFromClient(tcpPayload) return } diff --git a/capture/util.go b/capture/util.go index 9bc4d1a..f9edd24 100644 --- a/capture/util.go +++ b/capture/util.go @@ -34,7 +34,19 @@ func getLocalIPAddr() (ipAddr string, err error) { return } -func spliceSessionKey(srcIP string, srcPort int) (sessionKey string) { - sessionKey = fmt.Sprintf("%s:%d", srcIP, srcPort) - return +func spliceSessionKey(srcIP *string, srcPort int) (*string) { + // var buf strings.Builder + // _, err := fmt.Fprint(&buf, *srcIP, ":", srcPort) + // if err != nil { + // panic(err.Error()) + // } + // sessionKey := buf.String() + + // buf := new(bytes.Buffer) + // _ = templateSessionKey.ExecuteTemplate(buf, "IP", srcIP) + // _ = templateSessionKey.ExecuteTemplate(buf, "Port", srcPort) + // sessionKey := buf.String() + + sessionKey := fmt.Sprintf("%s:%d", *srcIP, srcPort) + return &sessionKey } diff --git a/exporter/kafka.go b/exporter/kafka.go index 0e7dddc..fd5992d 100644 --- a/exporter/kafka.go +++ b/exporter/kafka.go @@ -3,16 +3,14 @@ package exporter import ( "flag" "fmt" - "regexp" "strings" - log "github.com/sirupsen/logrus" "github.com/Shopify/sarama" + log "github.com/sirupsen/logrus" "github.com/zr-hebo/sniffer-agent/model" ) var ( - ddlPatern = regexp.MustCompile(`(?i)^\s*(create|alter|drop)`) kafkaServer string kafkaGroupID string asyncTopic string @@ -92,12 +90,18 @@ func NewKafkaExporter() (ke *kafkaExporter) { } func (ke *kafkaExporter) Export (qp model.QueryPiece) (err error){ - if ddlPatern.MatchString(qp.GetSQL()) { - log.Debugf("deal ddl: %s\n", qp.String()) + defer func() { + if err != nil { + log.Errorf("export with kafka failed <-- %s", err.Error()) + } + }() + + if qp.NeedSyncSend() { + // log.Debugf("deal ddl: %s\n", *qp.String()) msg := &sarama.ProducerMessage{ Topic: ke.syncTopic, - Value: sarama.StringEncoder(qp.String()), + Value: sarama.ByteEncoder(qp.Bytes()), } _, _, err = ke.syncProducer.SendMessage(msg) if err != nil { @@ -105,7 +109,7 @@ func (ke *kafkaExporter) Export (qp model.QueryPiece) (err error){ } } else { - log.Debugf("deal non ddl: %s", qp.String()) + // log.Debugf("deal non ddl: %s", *qp.String()) msg := &sarama.ProducerMessage{ Topic: ke.asyncTopic, Value: sarama.ByteEncoder(qp.Bytes()), @@ -113,5 +117,6 @@ func (ke *kafkaExporter) Export (qp model.QueryPiece) (err error){ ke.asyncProducer.Input() <- msg } + return } diff --git a/main.go b/main.go index ff38957..c8b3a8e 100644 --- a/main.go +++ b/main.go @@ -55,6 +55,7 @@ func mainServer() { if err != nil { log.Error(err.Error()) } + queryPiece.Recovery() } log.Errorf("cannot get network package from %s", capture.DeviceName) diff --git a/model/query_piece.go b/model/query_piece.go index f0946e9..2dc7636 100644 --- a/model/query_piece.go +++ b/model/query_piece.go @@ -2,34 +2,76 @@ package model import ( "encoding/json" + "time" + + "github.com/pingcap/tidb/util/hack" ) type QueryPiece interface { - String() string + String() *string Bytes() []byte - GetSQL() string + GetSQL() *string + NeedSyncSend() bool + Recovery() } // MysqlQueryPiece 查询信息 type MysqlQueryPiece struct { - SessionID string `json:"sid"` - ClientHost string `json:"-"` - ServerIP string `json:"sip"` + SessionID *string `json:"sid"` + ClientHost *string `json:"-"` + SyncSend bool `json:"-"` + ServerIP *string `json:"sip"` ServerPort int `json:"sport"` VisitUser *string `json:"user"` VisitDB *string `json:"db"` QuerySQL *string `json:"sql"` BeginTime string `json:"bt"` - CostTimeInMS int64 `json:"cms"` + CostTimeInMS int64 `json:"cms"` } -func (qp *MysqlQueryPiece) String() (str string) { - content, err := json.Marshal(qp) - if err != nil { - return err.Error() +type PooledMysqlQueryPiece struct { + MysqlQueryPiece + recoverPool *mysqlQueryPiecePool +} + +const ( + datetimeFormat = "2006-01-02 15:04:05" + millSecondUnit = int64(time.Millisecond) +) + +var ( + mqpp = NewMysqlQueryPiecePool() +) + +func NewPooledMysqlQueryPiece( + sessionID, visitUser, visitDB, clientHost, serverIP *string, serverPort int, stmtBeginTime int64) ( + mqp *PooledMysqlQueryPiece) { + mqp = mqpp.Dequeue() + if mqp == nil { + mqp = &PooledMysqlQueryPiece{ + MysqlQueryPiece: MysqlQueryPiece{}, + } } - return string(content) + nowInMS := time.Now().UnixNano() / millSecondUnit + mqp.SessionID = sessionID + mqp.ClientHost = clientHost + mqp.ServerIP = serverIP + mqp.ServerPort = serverPort + mqp.VisitUser = visitUser + mqp.VisitDB = visitDB + mqp.SyncSend = false + mqp.BeginTime = time.Unix(stmtBeginTime/1000, 0).Format(datetimeFormat) + mqp.CostTimeInMS = nowInMS - stmtBeginTime + mqp.recoverPool = mqpp + + return +} + +func (qp *MysqlQueryPiece) String() (*string) { + content := qp.Bytes() + contentStr := hack.String(content) + return &contentStr } func (qp *MysqlQueryPiece) Bytes() (bytes []byte) { @@ -41,9 +83,18 @@ func (qp *MysqlQueryPiece) Bytes() (bytes []byte) { return content } -func (qp *MysqlQueryPiece) GetSQL() (str string) { - if qp.QuerySQL != nil { - return *qp.QuerySQL - } - return "" +func (qp *MysqlQueryPiece) GetSQL() (str *string) { + return qp.QuerySQL +} + +func (qp *MysqlQueryPiece) NeedSyncSend() (bool) { + return qp.SyncSend +} + +func (qp *MysqlQueryPiece) SetNeedSyncSend(syncSend bool) { + qp.SyncSend = syncSend +} + +func (pmqp *PooledMysqlQueryPiece) Recovery() { + pmqp.recoverPool.Enqueue(pmqp) } diff --git a/model/query_piece_queue.go b/model/query_piece_queue.go new file mode 100644 index 0000000..a6382b6 --- /dev/null +++ b/model/query_piece_queue.go @@ -0,0 +1,36 @@ +package model + +import ( + "sync" +) + +type mysqlQueryPiecePool struct { + queue []*PooledMysqlQueryPiece + lock sync.Mutex +} + +func NewMysqlQueryPiecePool() (mqpp *mysqlQueryPiecePool) { + return &mysqlQueryPiecePool{ + queue: make([]*PooledMysqlQueryPiece, 0, 5000), + } +} + +func (mqpp *mysqlQueryPiecePool) Enqueue(pmqp *PooledMysqlQueryPiece) { + mqpp.lock.Lock() + defer mqpp.lock.Unlock() + + mqpp.queue = append(mqpp.queue, pmqp) +} + +func (mqpp *mysqlQueryPiecePool) Dequeue() (pmqp *PooledMysqlQueryPiece) { + mqpp.lock.Lock() + defer mqpp.lock.Unlock() + + if len(mqpp.queue) < 1 { + return nil + } + + pmqp = mqpp.queue[0] + mqpp.queue = mqpp.queue[1:] + return +} \ No newline at end of file diff --git a/scripts/generate_mysql_select.sh b/scripts/generate_mysql_select.sh index 1e85a79..498cffa 100755 --- a/scripts/generate_mysql_select.sh +++ b/scripts/generate_mysql_select.sh @@ -6,22 +6,22 @@ function execute_real(){ user_name=user passwd=123456 - mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd jmms -e "select 1" + mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd sniffer -e "select 1" sleep 1 - mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd jmms -e "use sniffer;show tables;create table haha(id int, name text)" + mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd sniffer -e "use sniffer;show tables;create table haha(id int, name text)" sleep 1 - mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd jmms -e "" + mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd sniffer -e "" sleep 1 - mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd jmms -e "" + mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd sniffer -e "" sleep 1 insert_cmd="insert into unibase.haha(id, name) values(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')" insert_cmd="$insert_cmd,(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')" insert_cmd="$insert_cmd,(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')" insert_cmd="$insert_cmd,(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')" insert_cmd="$insert_cmd,(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')" - mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd jmms -e "$insert_cmd" + mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd sniffer -e "$insert_cmd" sleep 1 - mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd jmms -e "use unibase; select * from haha; drop table haha" + mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd sniffer -e "use unibase; select * from haha; drop table haha" sleep 1 } diff --git a/session-dealer/controller.go b/session-dealer/controller.go index f1a41f7..963233c 100644 --- a/session-dealer/controller.go +++ b/session-dealer/controller.go @@ -4,7 +4,7 @@ import ( "github.com/zr-hebo/sniffer-agent/session-dealer/mysql" ) -func NewSession(sessionKey string, clientIP string, clientPort int, serverIP string, serverPort int) (session ConnSession) { +func NewSession(sessionKey *string, clientIP *string, clientPort int, serverIP *string, serverPort int) (session ConnSession) { switch serviceType { case ServiceTypeMysql: session = mysql.NewMysqlSession(sessionKey, clientIP, clientPort, serverIP, serverPort) diff --git a/session-dealer/model.go b/session-dealer/model.go index fb306d3..d8c242c 100644 --- a/session-dealer/model.go +++ b/session-dealer/model.go @@ -5,5 +5,6 @@ import "github.com/zr-hebo/sniffer-agent/model" type ConnSession interface { ReadFromClient(bytes []byte) ReadFromServer(bytes []byte) + ResetBeginTime() GenerateQueryPiece() (qp model.QueryPiece) } diff --git a/session-dealer/mysql/config.go b/session-dealer/mysql/config.go index c671275..46f4225 100644 --- a/session-dealer/mysql/config.go +++ b/session-dealer/mysql/config.go @@ -9,6 +9,7 @@ import ( var ( uselessSQLPattern = regexp.MustCompile(`(?i)^\s*(select 1|select @@version_comment limit 1|`+ `SELECT user, db FROM information_schema.processlist WHERE host=|commit|begin)`) + ddlPatern = regexp.MustCompile(`(?i)^\s*(create|alter|drop)`) ) var ( diff --git a/session-dealer/mysql/connections.go b/session-dealer/mysql/connections.go index 96cdcda..84b7cd0 100644 --- a/session-dealer/mysql/connections.go +++ b/session-dealer/mysql/connections.go @@ -5,7 +5,6 @@ import ( _ "github.com/go-sql-driver/mysql" du "github.com/zr-hebo/util-db" - // log "github.com/sirupsen/logrus" ) func expandLocalMysql(port int) (mysqlHost *du.MysqlDB) { @@ -20,7 +19,7 @@ func expandLocalMysql(port int) (mysqlHost *du.MysqlDB) { return } -func querySessionInfo(snifferPort int, clientHost string) (user, db *string, err error) { +func querySessionInfo(snifferPort int, clientHost *string) (user, db *string, err error) { mysqlServer := expandLocalMysql(snifferPort) querySQL := fmt.Sprintf( "SELECT user, db FROM information_schema.processlist WHERE host='%s'", clientHost) diff --git a/session-dealer/mysql/const.go b/session-dealer/mysql/const.go index cb4b30d..a3b87ac 100644 --- a/session-dealer/mysql/const.go +++ b/session-dealer/mysql/const.go @@ -1,6 +1,9 @@ package mysql -import "errors" +import ( + "errors" + "time" +) // Command information. const ( @@ -97,7 +100,7 @@ const ( ) const ( - datetimeFormat = "2006-01-02 15:04:05" + millSecondUnit = int64(time.Millisecond) ) var ( diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index d734a7c..833331d 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -2,6 +2,7 @@ package mysql import ( "fmt" + "github.com/siddontang/go/hack" "time" "github.com/zr-hebo/sniffer-agent/model" @@ -9,38 +10,42 @@ import ( ) type MysqlSession struct { - connectionID string - visitUser *string - visitDB *string - clientHost string - clientPort int - serverIP string - serverPort int - beginTime int64 - expectSize int - prepareInfo *prepareInfo + connectionID *string + visitUser *string + visitDB *string + clientHost *string + clientPort int + serverIP *string + serverPort int + stmtBeginTime int64 + expectSize int + prepareInfo *prepareInfo cachedPrepareStmt map[int]*string - tcpCache []byte - cachedStmtBytes []byte + tcpCache []byte + cachedStmtBytes []byte } type prepareInfo struct { prepareStmtID int } -func NewMysqlSession(sessionKey string, clientIP string, clientPort int, serverIP string, serverPort int) (ms *MysqlSession) { +func NewMysqlSession(sessionKey *string, clientIP *string, clientPort int, serverIP *string, serverPort int) (ms *MysqlSession) { ms = &MysqlSession{ - connectionID: sessionKey, - clientHost: clientIP, - clientPort: clientPort, - serverIP: serverIP, - serverPort: serverPort, - beginTime: time.Now().UnixNano() / int64(time.Millisecond), - cachedPrepareStmt: make(map[int]*string), + connectionID: sessionKey, + clientHost: clientIP, + clientPort: clientPort, + serverIP: serverIP, + serverPort: serverPort, + stmtBeginTime: time.Now().UnixNano() / millSecondUnit, + cachedPrepareStmt: make(map[int]*string, 8), } return } +func (ms *MysqlSession) ResetBeginTime() { + ms.stmtBeginTime = time.Now().UnixNano() / millSecondUnit +} + func (ms *MysqlSession) ReadFromServer(bytes []byte) { if ms.expectSize < 1 { ms.expectSize = extractMysqlPayloadSize(bytes) @@ -82,7 +87,8 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { return } - var mqp *model.MysqlQueryPiece = nil + var mqp *model.PooledMysqlQueryPiece + var querySQLInBytes []byte ms.cachedStmtBytes = append(ms.cachedStmtBytes, ms.tcpCache...) switch ms.cachedStmtBytes[0] { case ComAuth: @@ -107,12 +113,16 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { case ComDropDB: case ComQuery: mqp = ms.composeQueryPiece() - querySQL := string(ms.cachedStmtBytes[1:]) + querySQLInBytes = make([]byte, len(ms.cachedStmtBytes[1:])) + copy(querySQLInBytes, ms.cachedStmtBytes[1:]) + querySQL := hack.String(querySQLInBytes) mqp.QuerySQL = &querySQL case ComStmtPrepare: mqp = ms.composeQueryPiece() - querySQL := string(ms.cachedStmtBytes[1:]) + querySQLInBytes = make([]byte, len(ms.cachedStmtBytes[1:])) + copy(querySQLInBytes, ms.cachedStmtBytes[1:]) + querySQL := hack.String(querySQLInBytes) mqp.QuerySQL = &querySQL ms.cachedPrepareStmt[ms.prepareInfo.prepareStmtID] = &querySQL log.Debugf("prepare statement %s, get id:%d", querySQL, ms.prepareInfo.prepareStmtID) @@ -145,31 +155,27 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { ms.cachedStmtBytes = ms.cachedStmtBytes[:0] ms.expectSize = 0 ms.prepareInfo = nil - return filterQueryPieceBySQL(mqp) + return filterQueryPieceBySQL(mqp, querySQLInBytes) } -func filterQueryPieceBySQL(mqp *model.MysqlQueryPiece) (model.QueryPiece) { - if mqp == nil || mqp.QuerySQL == nil { +func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (model.QueryPiece) { + if mqp == nil || querySQL == nil { return nil - } else if (uselessSQLPattern.MatchString(*mqp.QuerySQL)) { + } else if (uselessSQLPattern.Match(querySQL)) { return nil + } + if ddlPatern.Match(querySQL) { + mqp.SetNeedSyncSend(true) + } + + // log.Debug(mqp.String()) return mqp } -func (ms *MysqlSession) composeQueryPiece() (mqp *model.MysqlQueryPiece) { - nowInMS := time.Now().UnixNano() / int64(time.Millisecond) - mqp = &model.MysqlQueryPiece{ - SessionID: ms.connectionID, - ClientHost: ms.clientHost, - ServerIP: ms.serverIP, - ServerPort: ms.serverPort, - VisitUser: ms.visitUser, - VisitDB: ms.visitDB, - BeginTime: time.Unix(ms.beginTime/1000, 0).Format(datetimeFormat), - CostTimeInMS: nowInMS - ms.beginTime, - } - return mqp +func (ms *MysqlSession) composeQueryPiece() (mqp *model.PooledMysqlQueryPiece) { + return model.NewPooledMysqlQueryPiece( + ms.connectionID, ms.visitUser, ms.visitDB, ms.clientHost, ms.serverIP, ms.serverPort, ms.stmtBeginTime) }