Merge pull request #3 from zr-hebo/develop-hebo

Develop hebo
This commit is contained in:
河伯 2019-10-11 17:35:08 +08:00 committed by GitHub
commit 7d65a7b424
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 106 additions and 58 deletions

2
.gitignore vendored
View File

@ -6,3 +6,5 @@ sniffer-agent
# vendor/github.com # vendor/github.com
profile* profile*
*.svg *.svg
nohup.out
*/nohup.out

2
Godeps/Godeps.json generated
View File

@ -109,7 +109,7 @@
}, },
{ {
"ImportPath": "github.com/zr-hebo/util-db", "ImportPath": "github.com/zr-hebo/util-db",
"Rev": "3ff29f916f7b712b3adc53c4b9b19b13b8bbed87" "Rev": "06948bca5665b2d80078f9ff5c015eabb43c54f8"
}, },
{ {
"ImportPath": "github.com/zr-hebo/util-http", "ImportPath": "github.com/zr-hebo/util-http",

View File

@ -20,13 +20,13 @@ import (
var ( var (
DeviceName string DeviceName string
snifferPort int snifferPort int
inParallel bool // inParallel bool
) )
func init() { func init() {
flag.StringVar(&DeviceName, "interface", "eth0", "network device name. Default is eth0") flag.StringVar(&DeviceName, "interface", "eth0", "network device name. Default is eth0")
flag.IntVar(&snifferPort, "port", 3306, "sniffer port. Default is 3306") flag.IntVar(&snifferPort, "port", 3306, "sniffer port. Default is 3306")
flag.BoolVar(&inParallel, "in_parallel", false, "if capture and deal package in parallel. Default is false") // flag.BoolVar(&inParallel, "in_parallel", false, "if capture and deal package in parallel. Default is false")
} }
// networkCard is network device // networkCard is network device
@ -93,21 +93,24 @@ func initEthernetHandlerFromPacp() (handler *pcap.Handle) {
} }
func (nc *networkCard) Listen() (receiver chan model.QueryPiece) { func (nc *networkCard) Listen() (receiver chan model.QueryPiece) {
if inParallel { // if inParallel {
nc.listenInParallel() // nc.listenInParallel()
//
} else { // } else {
nc.listenNormal() // nc.listenNormal()
} // }
nc.listenNormal()
return nc.receiver return nc.receiver
} }
// Listen get a connection. // Listen get a connection.
func (nc *networkCard) listenNormal() { func (nc *networkCard) listenNormal() {
go func() { go func() {
aliveCounter := 0 aliveCounter := 0
handler := initEthernetHandlerFromPacpgo() handler := initEthernetHandlerFromPacp()
for { for {
var data []byte var data []byte
var ci gopacket.CaptureInfo var ci gopacket.CaptureInfo
@ -123,8 +126,28 @@ func (nc *networkCard) listenNormal() {
nc.receiver <- model.NewBaseQueryPiece(localIPAddr, nc.listenPort, capturePacketRate) nc.receiver <- model.NewBaseQueryPiece(localIPAddr, nc.listenPort, capturePacketRate)
} }
continue continue
}
} else if 0 < capturePacketRate && capturePacketRate < 1.0 { data, ci, err = handler.ZeroCopyReadPacketData()
if err != nil {
log.Error(err.Error())
time.Sleep(time.Second*3)
continue
}
// packet := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.NoCopy)
packet := gopacket.NewPacket(data, handler.LinkType(), gopacket.NoCopy)
m := packet.Metadata()
m.CaptureInfo = ci
// send FIN tcp packet to avoid not complete session cannot be released
tcpPkt := packet.TransportLayer().(*layers.TCP)
if tcpPkt.FIN {
nc.parseTCPPackage(packet)
continue
}
if 0 < capturePacketRate && capturePacketRate < 1.0 {
// fall into throw range // fall into throw range
rn := rand.Float64() rn := rand.Float64()
if rn > capturePacketRate { if rn > capturePacketRate {
@ -133,17 +156,6 @@ func (nc *networkCard) listenNormal() {
} }
aliveCounter = 0 aliveCounter = 0
data, ci, err = handler.ZeroCopyReadPacketData()
if err != nil {
log.Error(err.Error())
time.Sleep(time.Second*3)
continue
}
packet := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.NoCopy)
m := packet.Metadata()
m.CaptureInfo = ci
m.Truncated = m.Truncated || ci.CaptureLength < ci.Length
nc.parseTCPPackage(packet) nc.parseTCPPackage(packet)
} }
}() }()

View File

@ -13,7 +13,6 @@ const (
var ( var (
communicatePort int communicatePort int
// capturePacketRate float64
router = mux.NewRouter() router = mux.NewRouter()
) )
@ -21,15 +20,14 @@ var (
configMapLock sync.RWMutex configMapLock sync.RWMutex
configMap map[string]configItem configMap map[string]configItem
catpurePacketRate *capturePacketRateConfig catpurePacketRate *capturePacketRateConfig
catpurePacketRateVal float64
) )
func init() { func init() {
catpurePacketRate = newCapturePacketRateConfig() catpurePacketRate = newCapturePacketRateConfig()
flag.IntVar(&communicatePort, "communicate_port", 8088, "http server port. Default is 8088") flag.IntVar(&communicatePort, "communicate_port", 8088, "http server port. Default is 8088")
var cpr float64 flag.Float64Var(&catpurePacketRateVal, CAPTURE_PACKET_RATE, 0.01, "capture packet rate. Default is 0.01")
flag.Float64Var(&cpr, CAPTURE_PACKET_RATE, 1, "capture packet rate. Default is 1.0")
_ = catpurePacketRate.setVal(cpr)
configMap = make(map[string]configItem) configMap = make(map[string]configItem)
} }

View File

@ -11,6 +11,8 @@ import (
) )
func Server() { func Server() {
initConfig()
server := &http.Server{ server := &http.Server{
Addr: "0.0.0.0:" + strconv.Itoa(communicatePort), Addr: "0.0.0.0:" + strconv.Itoa(communicatePort),
IdleTimeout: time.Second * 5, IdleTimeout: time.Second * 5,
@ -22,6 +24,10 @@ func Server() {
} }
} }
func initConfig() {
_ = catpurePacketRate.setVal(catpurePacketRateVal)
}
func outletCheckAlive(resp http.ResponseWriter, req *http.Request) { func outletCheckAlive(resp http.ResponseWriter, req *http.Request) {
mp := hu.NewMouthpiece(resp) mp := hu.NewMouthpiece(resp)
defer func() { defer func() {

View File

@ -32,6 +32,7 @@ func (cprc *capturePacketRateConfig) setVal (val interface{}) (err error){
return return
} }
fmt.Printf("set config %s: %v\n", CAPTURE_PACKET_RATE, realVal)
cprc.mysqlTPR = realVal cprc.mysqlTPR = realVal
cprc.tcpTPR = math.Sqrt(realVal) cprc.tcpTPR = math.Sqrt(realVal)
return return

View File

@ -9,9 +9,9 @@ import (
type MysqlQueryPiece struct { type MysqlQueryPiece struct {
BaseQueryPiece BaseQueryPiece
SessionID *string `json:"cid"` SessionID *string `json:"-"`
ClientHost *string `json:"-"` ClientHost *string `json:"cip"`
ClientPort int `json:"-"` ClientPort int `json:"cport"`
VisitUser *string `json:"user"` VisitUser *string `json:"user"`
VisitDB *string `json:"db"` VisitDB *string `json:"db"`
@ -62,16 +62,21 @@ func (mqp *MysqlQueryPiece) Bytes() (content []byte) {
return mqp.jsonContent return mqp.jsonContent
} }
mqp.jsonContent = marsharQueryPiece(mqp) mqp.GenerateJsonBytes()
return mqp.jsonContent return mqp.jsonContent
} }
func (mqp *MysqlQueryPiece) GenerateJsonBytes() {
mqp.jsonContent = marsharQueryPieceMonopolize(mqp)
return
}
func (mqp *MysqlQueryPiece) GetSQL() (str *string) { func (mqp *MysqlQueryPiece) GetSQL() (str *string) {
return mqp.QuerySQL return mqp.QuerySQL
} }
func (pmqp *PooledMysqlQueryPiece) Recovery() { func (pmqp *PooledMysqlQueryPiece) Recovery() {
pmqp.recoverPool.Enqueue(pmqp) // pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0])
pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0])
pmqp.jsonContent = nil pmqp.jsonContent = nil
pmqp.recoverPool.Enqueue(pmqp)
} }

View File

@ -1,13 +1,15 @@
package model package model
import ( import (
// "github.com/json-iterator/go"
"bytes" "bytes"
"encoding/json" "encoding/json"
jsoniter "github.com/json-iterator/go"
"github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/hack"
"time" "time"
) )
var jsonIterator = jsoniter.ConfigCompatibleWithStandardLibrary
type QueryPiece interface { type QueryPiece interface {
String() *string String() *string
Bytes() []byte Bytes() []byte
@ -70,7 +72,7 @@ func (bqp *BaseQueryPiece) Bytes() (content []byte) {
return bqp.jsonContent return bqp.jsonContent
} }
bqp.jsonContent = marsharQueryPiece(bqp) bqp.jsonContent = marsharQueryPieceMonopolize(bqp)
return bqp.jsonContent return bqp.jsonContent
} }
@ -81,8 +83,12 @@ func (bqp *BaseQueryPiece) GetSQL() (*string) {
func (bqp *BaseQueryPiece) Recovery() { func (bqp *BaseQueryPiece) Recovery() {
} }
func marsharQueryPiece(qp interface{}) []byte { func marsharQueryPieceShareMemory(qp interface{}) []byte {
var cacheBuffer = localSliceBufferPool.Dequeue() var cacheBuffer = localSliceBufferPool.Dequeue()
if len(cacheBuffer) > 0 {
panic("there already have bytes in buffer")
}
buffer := bytes.NewBuffer(cacheBuffer) buffer := bytes.NewBuffer(cacheBuffer)
err := json.NewEncoder(buffer).Encode(qp) err := json.NewEncoder(buffer).Encode(qp)
if err != nil { if err != nil {
@ -91,3 +97,12 @@ func marsharQueryPiece(qp interface{}) []byte {
return buffer.Bytes() return buffer.Bytes()
} }
func marsharQueryPieceMonopolize(qp interface{}) (content []byte) {
content, err := jsonIterator.Marshal(qp)
if err != nil {
return []byte(err.Error())
}
return content
}

View File

@ -19,7 +19,6 @@ func (mqpp *mysqlQueryPiecePool) Enqueue(pmqp *PooledMysqlQueryPiece) {
mqpp.lock.Lock() mqpp.lock.Lock()
defer mqpp.lock.Unlock() defer mqpp.lock.Unlock()
mqpp.queue <- pmqp mqpp.queue <- pmqp
} }

View File

@ -12,17 +12,17 @@ import (
) )
type MysqlSession struct { type MysqlSession struct {
connectionID *string connectionID *string
visitUser *string visitUser *string
visitDB *string visitDB *string
clientHost *string clientHost *string
clientPort int clientPort int
serverIP *string serverIP *string
serverPort int serverPort int
stmtBeginTime int64 stmtBeginTime int64
// packageOffset int64 // packageOffset int64
beginSeqID int64 beginSeqID int64
endSeqID int64 endSeqID int64
coverRanges *coverRanges coverRanges *coverRanges
expectReceiveSize int expectReceiveSize int
expectSendSize int expectSendSize int
@ -59,7 +59,7 @@ func NewMysqlSession(
queryPieceReceiver: receiver, queryPieceReceiver: receiver,
closeConn: make(chan bool, 1), closeConn: make(chan bool, 1),
expectReceiveSize: -1, expectReceiveSize: -1,
coverRanges: NewCoverRanges(), coverRanges: NewCoverRanges(),
ignoreAckID: -1, ignoreAckID: -1,
sendSize: 0, sendSize: 0,
pkgCacheLock: sync.Mutex{}, pkgCacheLock: sync.Mutex{},
@ -100,7 +100,7 @@ func (ms *MysqlSession) resetBeginTime() {
} }
func (ms *MysqlSession) readFromServer(bytes []byte) { func (ms *MysqlSession) readFromServer(bytes []byte) {
if ms.expectSendSize < 1 { if ms.expectSendSize < 1 && len(bytes) > 4 {
ms.expectSendSize = extractMysqlPayloadSize(bytes[:4]) ms.expectSendSize = extractMysqlPayloadSize(bytes[:4])
contents := bytes[4:] contents := bytes[4:]
if ms.prepareInfo != nil && contents[0] == 0 { if ms.prepareInfo != nil && contents[0] == 0 {
@ -115,7 +115,7 @@ func (ms *MysqlSession) checkFinish() bool {
} }
checkNode := ms.coverRanges.head.next checkNode := ms.coverRanges.head.next
if checkNode.end - checkNode.begin == int64(len(ms.cachedStmtBytes)) { if checkNode.end-checkNode.begin == int64(len(ms.cachedStmtBytes)) {
return true return true
} }
@ -143,6 +143,11 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) {
contentSize := int64(len(bytes)) contentSize := int64(len(bytes))
if ms.expectReceiveSize == -1 { if ms.expectReceiveSize == -1 {
// ignore invalid head package
if len(bytes) <= 4{
return
}
ms.expectReceiveSize = extractMysqlPayloadSize(bytes[:4]) ms.expectReceiveSize = extractMysqlPayloadSize(bytes[:4])
// ignore too big mysql packet // ignore too big mysql packet
if ms.expectReceiveSize >= MaxMysqlPacketLen { if ms.expectReceiveSize >= MaxMysqlPacketLen {
@ -160,9 +165,8 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) {
ms.beginSeqID = seqID ms.beginSeqID = seqID
ms.endSeqID = seqID ms.endSeqID = seqID
if int64(ms.expectReceiveSize) < int64(len(contents)) { if int64(ms.expectReceiveSize) < int64(len(contents)) {
log.Warnf("receive invalid mysql packet") log.Debug("receive invalid mysql packet")
return return
} }
@ -177,7 +181,7 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) {
} }
if ms.beginSeqID == -1 { if ms.beginSeqID == -1 {
log.Warnf("cover range is empty") log.Debug("cover range is empty")
return return
} }
@ -192,7 +196,7 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) {
if seqOffset+contentSize > int64(len(ms.cachedStmtBytes)) { if seqOffset+contentSize > int64(len(ms.cachedStmtBytes)) {
// not in a normal mysql packet // not in a normal mysql packet
log.Debugf("receive an unexpect packet") log.Debugf("receive an unexpect packet")
ms.clear() ms.clear()
return return
} }
@ -204,7 +208,6 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) {
// ms.expectReceiveSize = ms.expectReceiveSize - int(contentSize) // ms.expectReceiveSize = ms.expectReceiveSize - int(contentSize)
} }
func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) {
defer ms.clear() defer ms.clear()
@ -298,10 +301,15 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) {
} }
} }
return filterQueryPieceBySQL(mqp, querySQLInBytes) mqp = filterQueryPieceBySQL(mqp, querySQLInBytes)
if mqp == nil {
return nil
}
mqp.GenerateJsonBytes()
return mqp
} }
func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (model.QueryPiece) { func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (*model.PooledMysqlQueryPiece) {
if mqp == nil || querySQL == nil { if mqp == nil || querySQL == nil {
return nil return nil

View File

@ -43,12 +43,14 @@ type MysqlDB struct {
DatabaseType string DatabaseType string
DBName string DBName string
ConnectTimeout int ConnectTimeout int
QueryTimeout int
} }
// NewMysqlDB 创建MySQL数据库 // NewMysqlDB 创建MySQL数据库
func NewMysqlDB() (md *MysqlDB) { func NewMysqlDB() (md *MysqlDB) {
md = new(MysqlDB) md = new(MysqlDB)
md.DatabaseType = dbTypeMysql md.DatabaseType = dbTypeMysql
md.QueryTimeout = 5
return return
} }
@ -352,7 +354,7 @@ func (md *MysqlDB) fillConnStr() string {
md.UserName, md.Passwd, md.IP, md.Port, md.DBName) md.UserName, md.Passwd, md.IP, md.Port, md.DBName)
if md.ConnectTimeout > 0 { if md.ConnectTimeout > 0 {
dbServerInfoStr = fmt.Sprintf("%s?timeout=%ds&readTimeout=%ds&writeTimeout=%ds", dbServerInfoStr = fmt.Sprintf("%s?timeout=%ds&readTimeout=%ds&writeTimeout=%ds",
dbServerInfoStr, md.ConnectTimeout, md.ConnectTimeout, md.ConnectTimeout) dbServerInfoStr, md.ConnectTimeout, md.QueryTimeout, md.QueryTimeout)
} }
return dbServerInfoStr return dbServerInfoStr

View File

@ -6,7 +6,6 @@ import (
"time" "time"
) )
// PooledMysqlDB Mysql主机实例 // PooledMysqlDB Mysql主机实例
type PooledMysqlDB struct { type PooledMysqlDB struct {
MysqlDB MysqlDB
@ -46,6 +45,7 @@ func NewPooledMysqlDBWithAllParam(
func NewPooledMysqlDB() (pmd *PooledMysqlDB) { func NewPooledMysqlDB() (pmd *PooledMysqlDB) {
pmd = new(PooledMysqlDB) pmd = new(PooledMysqlDB)
pmd.DatabaseType = dbTypeMysql pmd.DatabaseType = dbTypeMysql
pmd.QueryTimeout = 5
pmd.lock = new(sync.Mutex) pmd.lock = new(sync.Mutex)
return return
} }