优化性能

This commit is contained in:
hebo 2019-08-13 15:51:33 +08:00
parent c42113b8f8
commit f8a85e0af6
15 changed files with 234 additions and 112 deletions

1
.gitignore vendored
View File

@ -4,3 +4,4 @@
one_key.sh one_key.sh
sniffer-agent sniffer-agent
# vendor/github.com # vendor/github.com
profile*

View File

@ -6,17 +6,17 @@ import (
) )
var ( var (
localIPAddr string localIPAddr *string
sessionPool = make(map[string]sd.ConnSession) sessionPool = make(map[string]sd.ConnSession)
) )
func init() { func init() {
var err error ipAddr, err := getLocalIPAddr()
localIPAddr, err = getLocalIPAddr()
if err != nil { if err != nil {
panic(err) panic(err)
} }
log.Infof("parsed local ip address:%s", localIPAddr) localIPAddr = &ipAddr
log.Infof("parsed local ip address:%s", *localIPAddr)
} }

View File

@ -3,12 +3,12 @@ package capture
import ( import (
"flag" "flag"
"fmt" "fmt"
log "github.com/sirupsen/logrus"
sd "github.com/zr-hebo/sniffer-agent/session-dealer"
"github.com/zr-hebo/sniffer-agent/model"
"github.com/google/gopacket" "github.com/google/gopacket"
"github.com/google/gopacket/layers" "github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap" "github.com/google/gopacket/pcap"
log "github.com/sirupsen/logrus"
"github.com/zr-hebo/sniffer-agent/model"
sd "github.com/zr-hebo/sniffer-agent/session-dealer"
) )
var ( var (
@ -41,22 +41,29 @@ func (nc *networkCard) Listen() (receiver chan model.QueryPiece) {
close(receiver) close(receiver)
}() }()
handle, err := pcap.OpenLive(DeviceName, 65535, false, pcap.BlockForever) handler, err := pcap.OpenLive(DeviceName, 65535, false, pcap.BlockForever)
if err != nil { if err != nil {
panic(fmt.Sprintf("cannot open network interface %s <-- %s", nc.name, err.Error())) panic(fmt.Sprintf("cannot open network interface %s <-- %s", nc.name, err.Error()))
} }
linkType := handler.LinkType()
packetSource := gopacket.NewPacketSource(handle, handle.LinkType()) err = handler.SetBPFFilter(fmt.Sprintf("tcp and (port %d)", snifferPort))
for packet := range packetSource.Packets() { if err != nil {
if packet.NetworkLayer() == nil || packet.TransportLayer() == nil { panic(err.Error())
// log.Info("empty network layer") }
for {
var data []byte
data, ci, err := handler.ZeroCopyReadPacketData()
if err != nil {
log.Error(err.Error())
continue continue
} }
if packet.TransportLayer().LayerType() != layers.LayerTypeTCP { packet := gopacket.NewPacket(data, linkType, gopacket.NoCopy)
// log.Info("packet type is %s, not TCP", packet.TransportLayer().LayerType()) m := packet.Metadata()
continue m.CaptureInfo = ci
} m.Truncated = m.Truncated || ci.CaptureLength < ci.Length
qp := nc.parseTCPPackage(packet) qp := nc.parseTCPPackage(packet)
if qp != nil { if qp != nil {
@ -81,10 +88,6 @@ func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) (qp model.QueryPi
return return
} }
if(int(tcpConn.DstPort) != nc.listenPort && int(tcpConn.SrcPort) != nc.listenPort) {
return
}
ipLayer := packet.Layer(layers.LayerTypeIPv4) ipLayer := packet.Layer(layers.LayerTypeIPv4)
if ipLayer == nil { if ipLayer == nil {
err = fmt.Errorf("no ip layer found in package") err = fmt.Errorf("no ip layer found in package")
@ -102,16 +105,16 @@ func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) (qp model.QueryPi
dstIP := ipInfo.DstIP.String() dstIP := ipInfo.DstIP.String()
srcPort := int(tcpConn.SrcPort) srcPort := int(tcpConn.SrcPort)
dstPort := int(tcpConn.DstPort) dstPort := int(tcpConn.DstPort)
if dstIP == localIPAddr && dstPort == nc.listenPort { if dstIP == *localIPAddr && dstPort == nc.listenPort {
// deal mysql server response // deal mysql server response
err = readToServerPackage(srcIP, srcPort, tcpConn) err = readToServerPackage(&srcIP, srcPort, tcpConn)
if err != nil { if err != nil {
return return
} }
} else if srcIP == localIPAddr && srcPort == nc.listenPort { } else if srcIP == *localIPAddr && srcPort == nc.listenPort {
// deal mysql client request // deal mysql client request
qp, err = readFromServerPackage(dstIP, dstPort, tcpConn) qp, err = readFromServerPackage(&dstIP, dstPort, tcpConn)
if err != nil { if err != nil {
return return
} }
@ -120,17 +123,17 @@ func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) (qp model.QueryPi
return return
} }
func readFromServerPackage(srcIP string, srcPort int, tcpConn *layers.TCP) (qp model.QueryPiece, err error) { func readFromServerPackage(srcIP *string, srcPort int, tcpConn *layers.TCP) (qp model.QueryPiece, err error) {
defer func() { defer func() {
if err != nil { if err != nil {
log.Error("read Mysql package send from mysql server to client failed <-- %s", err.Error()) log.Error("read Mysql package send from mysql server to client failed <-- %s", err.Error())
} }
}() }()
sessionKey := spliceSessionKey(srcIP, srcPort)
if tcpConn.FIN { if tcpConn.FIN {
delete(sessionPool, sessionKey) sessionKey := spliceSessionKey(srcIP, srcPort)
// log.Debugf("close connection from %s", sessionKey) delete(sessionPool, *sessionKey)
log.Debugf("close connection from %s", *sessionKey)
return return
} }
@ -139,7 +142,8 @@ func readFromServerPackage(srcIP string, srcPort int, tcpConn *layers.TCP) (qp m
return return
} }
session := sessionPool[sessionKey] sessionKey := spliceSessionKey(srcIP, srcPort)
session := sessionPool[*sessionKey]
if session != nil { if session != nil {
session.ReadFromServer(tcpPayload) session.ReadFromServer(tcpPayload)
qp = session.GenerateQueryPiece() qp = session.GenerateQueryPiece()
@ -148,18 +152,18 @@ func readFromServerPackage(srcIP string, srcPort int, tcpConn *layers.TCP) (qp m
return return
} }
func readToServerPackage(srcIP string, srcPort int, tcpConn *layers.TCP) (err error) { func readToServerPackage(srcIP *string, srcPort int, tcpConn *layers.TCP) (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
log.Error("read package send from client to mysql server failed <-- %s", err.Error()) log.Error("read package send from client to mysql server failed <-- %s", err.Error())
} }
}() }()
sessionKey := spliceSessionKey(srcIP, srcPort)
// when client try close connection remove session from session pool // when client try close connection remove session from session pool
if tcpConn.FIN { if tcpConn.FIN {
delete(sessionPool, sessionKey) sessionKey := spliceSessionKey(srcIP, srcPort)
// log.Debugf("close connection from %s", sessionKey) delete(sessionPool, *sessionKey)
log.Debugf("close connection from %s", *sessionKey)
return return
} }
@ -168,12 +172,14 @@ func readToServerPackage(srcIP string, srcPort int, tcpConn *layers.TCP) (err er
return return
} }
session := sessionPool[sessionKey] sessionKey := spliceSessionKey(srcIP, srcPort)
session := sessionPool[*sessionKey]
if session == nil { if session == nil {
session = sd.NewSession(sessionKey, srcIP, srcPort, localIPAddr, snifferPort) session = sd.NewSession(sessionKey, srcIP, srcPort, localIPAddr, snifferPort)
sessionPool[sessionKey] = session sessionPool[*sessionKey] = session
} }
session.ResetBeginTime()
session.ReadFromClient(tcpPayload) session.ReadFromClient(tcpPayload)
return return
} }

View File

@ -34,7 +34,19 @@ func getLocalIPAddr() (ipAddr string, err error) {
return return
} }
func spliceSessionKey(srcIP string, srcPort int) (sessionKey string) { func spliceSessionKey(srcIP *string, srcPort int) (*string) {
sessionKey = fmt.Sprintf("%s:%d", srcIP, srcPort) // var buf strings.Builder
return // _, err := fmt.Fprint(&buf, *srcIP, ":", srcPort)
// if err != nil {
// panic(err.Error())
// }
// sessionKey := buf.String()
// buf := new(bytes.Buffer)
// _ = templateSessionKey.ExecuteTemplate(buf, "IP", srcIP)
// _ = templateSessionKey.ExecuteTemplate(buf, "Port", srcPort)
// sessionKey := buf.String()
sessionKey := fmt.Sprintf("%s:%d", *srcIP, srcPort)
return &sessionKey
} }

View File

@ -3,16 +3,14 @@ package exporter
import ( import (
"flag" "flag"
"fmt" "fmt"
"regexp"
"strings" "strings"
log "github.com/sirupsen/logrus"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
log "github.com/sirupsen/logrus"
"github.com/zr-hebo/sniffer-agent/model" "github.com/zr-hebo/sniffer-agent/model"
) )
var ( var (
ddlPatern = regexp.MustCompile(`(?i)^\s*(create|alter|drop)`)
kafkaServer string kafkaServer string
kafkaGroupID string kafkaGroupID string
asyncTopic string asyncTopic string
@ -92,12 +90,18 @@ func NewKafkaExporter() (ke *kafkaExporter) {
} }
func (ke *kafkaExporter) Export (qp model.QueryPiece) (err error){ func (ke *kafkaExporter) Export (qp model.QueryPiece) (err error){
if ddlPatern.MatchString(qp.GetSQL()) { defer func() {
log.Debugf("deal ddl: %s\n", qp.String()) if err != nil {
log.Errorf("export with kafka failed <-- %s", err.Error())
}
}()
if qp.NeedSyncSend() {
// log.Debugf("deal ddl: %s\n", *qp.String())
msg := &sarama.ProducerMessage{ msg := &sarama.ProducerMessage{
Topic: ke.syncTopic, Topic: ke.syncTopic,
Value: sarama.StringEncoder(qp.String()), Value: sarama.ByteEncoder(qp.Bytes()),
} }
_, _, err = ke.syncProducer.SendMessage(msg) _, _, err = ke.syncProducer.SendMessage(msg)
if err != nil { if err != nil {
@ -105,7 +109,7 @@ func (ke *kafkaExporter) Export (qp model.QueryPiece) (err error){
} }
} else { } else {
log.Debugf("deal non ddl: %s", qp.String()) // log.Debugf("deal non ddl: %s", *qp.String())
msg := &sarama.ProducerMessage{ msg := &sarama.ProducerMessage{
Topic: ke.asyncTopic, Topic: ke.asyncTopic,
Value: sarama.ByteEncoder(qp.Bytes()), Value: sarama.ByteEncoder(qp.Bytes()),
@ -113,5 +117,6 @@ func (ke *kafkaExporter) Export (qp model.QueryPiece) (err error){
ke.asyncProducer.Input() <- msg ke.asyncProducer.Input() <- msg
} }
return return
} }

View File

@ -55,6 +55,7 @@ func mainServer() {
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
} }
queryPiece.Recovery()
} }
log.Errorf("cannot get network package from %s", capture.DeviceName) log.Errorf("cannot get network package from %s", capture.DeviceName)

View File

@ -2,19 +2,25 @@ package model
import ( import (
"encoding/json" "encoding/json"
"time"
"github.com/pingcap/tidb/util/hack"
) )
type QueryPiece interface { type QueryPiece interface {
String() string String() *string
Bytes() []byte Bytes() []byte
GetSQL() string GetSQL() *string
NeedSyncSend() bool
Recovery()
} }
// MysqlQueryPiece 查询信息 // MysqlQueryPiece 查询信息
type MysqlQueryPiece struct { type MysqlQueryPiece struct {
SessionID string `json:"sid"` SessionID *string `json:"sid"`
ClientHost string `json:"-"` ClientHost *string `json:"-"`
ServerIP string `json:"sip"` SyncSend bool `json:"-"`
ServerIP *string `json:"sip"`
ServerPort int `json:"sport"` ServerPort int `json:"sport"`
VisitUser *string `json:"user"` VisitUser *string `json:"user"`
VisitDB *string `json:"db"` VisitDB *string `json:"db"`
@ -23,13 +29,49 @@ type MysqlQueryPiece struct {
CostTimeInMS int64 `json:"cms"` CostTimeInMS int64 `json:"cms"`
} }
func (qp *MysqlQueryPiece) String() (str string) { type PooledMysqlQueryPiece struct {
content, err := json.Marshal(qp) MysqlQueryPiece
if err != nil { recoverPool *mysqlQueryPiecePool
return err.Error()
} }
return string(content) const (
datetimeFormat = "2006-01-02 15:04:05"
millSecondUnit = int64(time.Millisecond)
)
var (
mqpp = NewMysqlQueryPiecePool()
)
func NewPooledMysqlQueryPiece(
sessionID, visitUser, visitDB, clientHost, serverIP *string, serverPort int, stmtBeginTime int64) (
mqp *PooledMysqlQueryPiece) {
mqp = mqpp.Dequeue()
if mqp == nil {
mqp = &PooledMysqlQueryPiece{
MysqlQueryPiece: MysqlQueryPiece{},
}
}
nowInMS := time.Now().UnixNano() / millSecondUnit
mqp.SessionID = sessionID
mqp.ClientHost = clientHost
mqp.ServerIP = serverIP
mqp.ServerPort = serverPort
mqp.VisitUser = visitUser
mqp.VisitDB = visitDB
mqp.SyncSend = false
mqp.BeginTime = time.Unix(stmtBeginTime/1000, 0).Format(datetimeFormat)
mqp.CostTimeInMS = nowInMS - stmtBeginTime
mqp.recoverPool = mqpp
return
}
func (qp *MysqlQueryPiece) String() (*string) {
content := qp.Bytes()
contentStr := hack.String(content)
return &contentStr
} }
func (qp *MysqlQueryPiece) Bytes() (bytes []byte) { func (qp *MysqlQueryPiece) Bytes() (bytes []byte) {
@ -41,9 +83,18 @@ func (qp *MysqlQueryPiece) Bytes() (bytes []byte) {
return content return content
} }
func (qp *MysqlQueryPiece) GetSQL() (str string) { func (qp *MysqlQueryPiece) GetSQL() (str *string) {
if qp.QuerySQL != nil { return qp.QuerySQL
return *qp.QuerySQL
} }
return ""
func (qp *MysqlQueryPiece) NeedSyncSend() (bool) {
return qp.SyncSend
}
func (qp *MysqlQueryPiece) SetNeedSyncSend(syncSend bool) {
qp.SyncSend = syncSend
}
func (pmqp *PooledMysqlQueryPiece) Recovery() {
pmqp.recoverPool.Enqueue(pmqp)
} }

View File

@ -0,0 +1,36 @@
package model
import (
"sync"
)
type mysqlQueryPiecePool struct {
queue []*PooledMysqlQueryPiece
lock sync.Mutex
}
func NewMysqlQueryPiecePool() (mqpp *mysqlQueryPiecePool) {
return &mysqlQueryPiecePool{
queue: make([]*PooledMysqlQueryPiece, 0, 5000),
}
}
func (mqpp *mysqlQueryPiecePool) Enqueue(pmqp *PooledMysqlQueryPiece) {
mqpp.lock.Lock()
defer mqpp.lock.Unlock()
mqpp.queue = append(mqpp.queue, pmqp)
}
func (mqpp *mysqlQueryPiecePool) Dequeue() (pmqp *PooledMysqlQueryPiece) {
mqpp.lock.Lock()
defer mqpp.lock.Unlock()
if len(mqpp.queue) < 1 {
return nil
}
pmqp = mqpp.queue[0]
mqpp.queue = mqpp.queue[1:]
return
}

View File

@ -6,22 +6,22 @@ function execute_real(){
user_name=user user_name=user
passwd=123456 passwd=123456
mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd jmms -e "select 1" mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd sniffer -e "select 1"
sleep 1 sleep 1
mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd jmms -e "use sniffer;show tables;create table haha(id int, name text)" mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd sniffer -e "use sniffer;show tables;create table haha(id int, name text)"
sleep 1 sleep 1
mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd jmms -e "" mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd sniffer -e ""
sleep 1 sleep 1
mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd jmms -e "" mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd sniffer -e ""
sleep 1 sleep 1
insert_cmd="insert into unibase.haha(id, name) values(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')" insert_cmd="insert into unibase.haha(id, name) values(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')"
insert_cmd="$insert_cmd,(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')" insert_cmd="$insert_cmd,(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')"
insert_cmd="$insert_cmd,(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')" insert_cmd="$insert_cmd,(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')"
insert_cmd="$insert_cmd,(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')" insert_cmd="$insert_cmd,(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')"
insert_cmd="$insert_cmd,(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')" insert_cmd="$insert_cmd,(10, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')"
mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd jmms -e "$insert_cmd" mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd sniffer -e "$insert_cmd"
sleep 1 sleep 1
mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd jmms -e "use unibase; select * from haha; drop table haha" mysql -h$mysql_host -P$mysql_port -u$user_name -p$passwd sniffer -e "use unibase; select * from haha; drop table haha"
sleep 1 sleep 1
} }

View File

@ -4,7 +4,7 @@ import (
"github.com/zr-hebo/sniffer-agent/session-dealer/mysql" "github.com/zr-hebo/sniffer-agent/session-dealer/mysql"
) )
func NewSession(sessionKey string, clientIP string, clientPort int, serverIP string, serverPort int) (session ConnSession) { func NewSession(sessionKey *string, clientIP *string, clientPort int, serverIP *string, serverPort int) (session ConnSession) {
switch serviceType { switch serviceType {
case ServiceTypeMysql: case ServiceTypeMysql:
session = mysql.NewMysqlSession(sessionKey, clientIP, clientPort, serverIP, serverPort) session = mysql.NewMysqlSession(sessionKey, clientIP, clientPort, serverIP, serverPort)

View File

@ -5,5 +5,6 @@ import "github.com/zr-hebo/sniffer-agent/model"
type ConnSession interface { type ConnSession interface {
ReadFromClient(bytes []byte) ReadFromClient(bytes []byte)
ReadFromServer(bytes []byte) ReadFromServer(bytes []byte)
ResetBeginTime()
GenerateQueryPiece() (qp model.QueryPiece) GenerateQueryPiece() (qp model.QueryPiece)
} }

View File

@ -9,6 +9,7 @@ import (
var ( var (
uselessSQLPattern = regexp.MustCompile(`(?i)^\s*(select 1|select @@version_comment limit 1|`+ uselessSQLPattern = regexp.MustCompile(`(?i)^\s*(select 1|select @@version_comment limit 1|`+
`SELECT user, db FROM information_schema.processlist WHERE host=|commit|begin)`) `SELECT user, db FROM information_schema.processlist WHERE host=|commit|begin)`)
ddlPatern = regexp.MustCompile(`(?i)^\s*(create|alter|drop)`)
) )
var ( var (

View File

@ -5,7 +5,6 @@ import (
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
du "github.com/zr-hebo/util-db" du "github.com/zr-hebo/util-db"
// log "github.com/sirupsen/logrus"
) )
func expandLocalMysql(port int) (mysqlHost *du.MysqlDB) { func expandLocalMysql(port int) (mysqlHost *du.MysqlDB) {
@ -20,7 +19,7 @@ func expandLocalMysql(port int) (mysqlHost *du.MysqlDB) {
return return
} }
func querySessionInfo(snifferPort int, clientHost string) (user, db *string, err error) { func querySessionInfo(snifferPort int, clientHost *string) (user, db *string, err error) {
mysqlServer := expandLocalMysql(snifferPort) mysqlServer := expandLocalMysql(snifferPort)
querySQL := fmt.Sprintf( querySQL := fmt.Sprintf(
"SELECT user, db FROM information_schema.processlist WHERE host='%s'", clientHost) "SELECT user, db FROM information_schema.processlist WHERE host='%s'", clientHost)

View File

@ -1,6 +1,9 @@
package mysql package mysql
import "errors" import (
"errors"
"time"
)
// Command information. // Command information.
const ( const (
@ -97,7 +100,7 @@ const (
) )
const ( const (
datetimeFormat = "2006-01-02 15:04:05" millSecondUnit = int64(time.Millisecond)
) )
var ( var (

View File

@ -2,6 +2,7 @@ package mysql
import ( import (
"fmt" "fmt"
"github.com/siddontang/go/hack"
"time" "time"
"github.com/zr-hebo/sniffer-agent/model" "github.com/zr-hebo/sniffer-agent/model"
@ -9,14 +10,14 @@ import (
) )
type MysqlSession struct { type MysqlSession struct {
connectionID string connectionID *string
visitUser *string visitUser *string
visitDB *string visitDB *string
clientHost string clientHost *string
clientPort int clientPort int
serverIP string serverIP *string
serverPort int serverPort int
beginTime int64 stmtBeginTime int64
expectSize int expectSize int
prepareInfo *prepareInfo prepareInfo *prepareInfo
cachedPrepareStmt map[int]*string cachedPrepareStmt map[int]*string
@ -28,19 +29,23 @@ type prepareInfo struct {
prepareStmtID int prepareStmtID int
} }
func NewMysqlSession(sessionKey string, clientIP string, clientPort int, serverIP string, serverPort int) (ms *MysqlSession) { func NewMysqlSession(sessionKey *string, clientIP *string, clientPort int, serverIP *string, serverPort int) (ms *MysqlSession) {
ms = &MysqlSession{ ms = &MysqlSession{
connectionID: sessionKey, connectionID: sessionKey,
clientHost: clientIP, clientHost: clientIP,
clientPort: clientPort, clientPort: clientPort,
serverIP: serverIP, serverIP: serverIP,
serverPort: serverPort, serverPort: serverPort,
beginTime: time.Now().UnixNano() / int64(time.Millisecond), stmtBeginTime: time.Now().UnixNano() / millSecondUnit,
cachedPrepareStmt: make(map[int]*string), cachedPrepareStmt: make(map[int]*string, 8),
} }
return return
} }
func (ms *MysqlSession) ResetBeginTime() {
ms.stmtBeginTime = time.Now().UnixNano() / millSecondUnit
}
func (ms *MysqlSession) ReadFromServer(bytes []byte) { func (ms *MysqlSession) ReadFromServer(bytes []byte) {
if ms.expectSize < 1 { if ms.expectSize < 1 {
ms.expectSize = extractMysqlPayloadSize(bytes) ms.expectSize = extractMysqlPayloadSize(bytes)
@ -82,7 +87,8 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) {
return return
} }
var mqp *model.MysqlQueryPiece = nil var mqp *model.PooledMysqlQueryPiece
var querySQLInBytes []byte
ms.cachedStmtBytes = append(ms.cachedStmtBytes, ms.tcpCache...) ms.cachedStmtBytes = append(ms.cachedStmtBytes, ms.tcpCache...)
switch ms.cachedStmtBytes[0] { switch ms.cachedStmtBytes[0] {
case ComAuth: case ComAuth:
@ -107,12 +113,16 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) {
case ComDropDB: case ComDropDB:
case ComQuery: case ComQuery:
mqp = ms.composeQueryPiece() mqp = ms.composeQueryPiece()
querySQL := string(ms.cachedStmtBytes[1:]) querySQLInBytes = make([]byte, len(ms.cachedStmtBytes[1:]))
copy(querySQLInBytes, ms.cachedStmtBytes[1:])
querySQL := hack.String(querySQLInBytes)
mqp.QuerySQL = &querySQL mqp.QuerySQL = &querySQL
case ComStmtPrepare: case ComStmtPrepare:
mqp = ms.composeQueryPiece() mqp = ms.composeQueryPiece()
querySQL := string(ms.cachedStmtBytes[1:]) querySQLInBytes = make([]byte, len(ms.cachedStmtBytes[1:]))
copy(querySQLInBytes, ms.cachedStmtBytes[1:])
querySQL := hack.String(querySQLInBytes)
mqp.QuerySQL = &querySQL mqp.QuerySQL = &querySQL
ms.cachedPrepareStmt[ms.prepareInfo.prepareStmtID] = &querySQL ms.cachedPrepareStmt[ms.prepareInfo.prepareStmtID] = &querySQL
log.Debugf("prepare statement %s, get id:%d", querySQL, ms.prepareInfo.prepareStmtID) log.Debugf("prepare statement %s, get id:%d", querySQL, ms.prepareInfo.prepareStmtID)
@ -145,31 +155,27 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) {
ms.cachedStmtBytes = ms.cachedStmtBytes[:0] ms.cachedStmtBytes = ms.cachedStmtBytes[:0]
ms.expectSize = 0 ms.expectSize = 0
ms.prepareInfo = nil ms.prepareInfo = nil
return filterQueryPieceBySQL(mqp) return filterQueryPieceBySQL(mqp, querySQLInBytes)
} }
func filterQueryPieceBySQL(mqp *model.MysqlQueryPiece) (model.QueryPiece) { func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (model.QueryPiece) {
if mqp == nil || mqp.QuerySQL == nil { if mqp == nil || querySQL == nil {
return nil return nil
} else if (uselessSQLPattern.MatchString(*mqp.QuerySQL)) { } else if (uselessSQLPattern.Match(querySQL)) {
return nil return nil
} }
if ddlPatern.Match(querySQL) {
mqp.SetNeedSyncSend(true)
}
// log.Debug(mqp.String())
return mqp return mqp
} }
func (ms *MysqlSession) composeQueryPiece() (mqp *model.MysqlQueryPiece) { func (ms *MysqlSession) composeQueryPiece() (mqp *model.PooledMysqlQueryPiece) {
nowInMS := time.Now().UnixNano() / int64(time.Millisecond) return model.NewPooledMysqlQueryPiece(
mqp = &model.MysqlQueryPiece{ ms.connectionID, ms.visitUser, ms.visitDB, ms.clientHost, ms.serverIP, ms.serverPort, ms.stmtBeginTime)
SessionID: ms.connectionID,
ClientHost: ms.clientHost,
ServerIP: ms.serverIP,
ServerPort: ms.serverPort,
VisitUser: ms.visitUser,
VisitDB: ms.visitDB,
BeginTime: time.Unix(ms.beginTime/1000, 0).Format(datetimeFormat),
CostTimeInMS: nowInMS - ms.beginTime,
}
return mqp
} }