add response parse module

This commit is contained in:
hebo
2019-12-24 22:39:30 +08:00
parent 0ed1b77305
commit 6c2fcd13b2
6 changed files with 141 additions and 18 deletions

View File

@@ -16,6 +16,8 @@ type MysqlQueryPiece struct {
VisitDB *string `json:"db"` VisitDB *string `json:"db"`
QuerySQL *string `json:"sql"` QuerySQL *string `json:"sql"`
CostTimeInMS int64 `json:"cms"` CostTimeInMS int64 `json:"cms"`
OK int64 `json:"ok"`
ResponseVal int64 `json:"rv"`
} }
func (mqp *MysqlQueryPiece) String() (*string) { func (mqp *MysqlQueryPiece) String() (*string) {
@@ -42,3 +44,8 @@ func (mqp *MysqlQueryPiece) GenerateJsonBytes() {
func (mqp *MysqlQueryPiece) GetSQL() (str *string) { func (mqp *MysqlQueryPiece) GetSQL() (str *string) {
return mqp.QuerySQL return mqp.QuerySQL
} }
func (mqp *MysqlQueryPiece) SetResponse(ok, respVal int64) {
mqp.OK = ok
mqp.ResponseVal = respVal
}

View File

@@ -14,6 +14,7 @@ type QueryPiece interface {
GetSQL() *string GetSQL() *string
NeedSyncSend() bool NeedSyncSend() bool
Recovery() Recovery()
SetResponse(int64, int64)
} }
// BaseQueryPiece 查询信息 // BaseQueryPiece 查询信息
@@ -80,6 +81,9 @@ func (bqp *BaseQueryPiece) GetSQL() (*string) {
func (bqp *BaseQueryPiece) Recovery() { func (bqp *BaseQueryPiece) Recovery() {
} }
func (bqp *BaseQueryPiece) SetResponse(ok, respVal int64) {
}
/** /**
func marsharQueryPieceShareMemory(qp interface{}, cacheBuffer []byte) []byte { func marsharQueryPieceShareMemory(qp interface{}, cacheBuffer []byte) []byte {
buffer := bytes.NewBuffer(cacheBuffer) buffer := bytes.NewBuffer(cacheBuffer)

View File

@@ -29,6 +29,8 @@ func NewPooledMysqlQueryPiece(
pmqp.EventTime = stmtBeginTime pmqp.EventTime = stmtBeginTime
pmqp.CostTimeInMS = nowInMS - stmtBeginTime pmqp.CostTimeInMS = nowInMS - stmtBeginTime
pmqp.recoverPool = mqpp pmqp.recoverPool = mqpp
pmqp.OK = 0
pmqp.ResponseVal = -1
return return
} }

View File

@@ -0,0 +1,75 @@
package mysql
import (
"fmt"
)
const (
PACKET_OK = 0
PACKET_EOF = 254
PACKET_ERR = 255
)
func lenencInt(bytesVal []byte) (val int64) {
if bytesVal == nil || len(bytesVal) < 1 {
val = -1
return
}
fb := bytesVal[0]
var offset int64
switch {
case fb < 251:
val = int64(fb)
case fb == 252:
numLen := int64(2)
offset = 1+numLen
val = int64(bytesToInt(bytesVal[1:offset]))
case fb == 253:
numLen := int64(3)
offset = 1+numLen
val = int64(bytesToInt(bytesVal[1:offset]))
case fb == 254:
numLen := int64(8)
offset = 1+numLen
val = int64(bytesToInt(bytesVal[1:offset]))
default:
val = -1
}
return
}
func parseResponseHeader(payload []byte) (ok, val int64, err error) {
if payload == nil || len(payload) < 1 {
err = fmt.Errorf("no bytes to parse")
return
}
fmt.Printf("%#v\n", payload)
defer func() {
fmt.Printf("%#v\n", ok)
fmt.Printf("%#v\n", val)
}()
switch {
case payload[0] == PACKET_OK && len(payload)>=7:
case payload[0] == PACKET_EOF && len(payload)<=9:
// set ok and mysql affected rows number
ok = 1
val = lenencInt(payload)
case payload[0] == PACKET_ERR && len(payload)>3:
// set not ok and mysql execute error-code
ok = 0
val = int64(bytesToIntSmallEndian(payload[1:3]))
default:
err = fmt.Errorf("invalid response packet")
}
return
}

View File

@@ -91,9 +91,15 @@ func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) {
ms.readFromClient(newPkt.Seq, newPkt.Payload) ms.readFromClient(newPkt.Seq, newPkt.Payload)
} else { } else {
ms.readFromServer(newPkt.Seq, newPkt.Payload) ok, respVal, err := ms.readFromServer(newPkt.Seq, newPkt.Payload)
if err != nil {
log.Debug(err.Error())
return
}
qp := ms.GenerateQueryPiece() qp := ms.GenerateQueryPiece()
if qp != nil { if qp != nil {
qp.SetResponse(ok, respVal)
ms.queryPieceReceiver <- qp ms.queryPieceReceiver <- qp
} }
} }
@@ -103,20 +109,6 @@ func (ms *MysqlSession) resetBeginTime() {
ms.stmtBeginTime = time.Now().UnixNano() / millSecondUnit ms.stmtBeginTime = time.Now().UnixNano() / millSecondUnit
} }
func (ms *MysqlSession) readFromServer(respSeq int64, bytes []byte) {
if ms.expectSendSize < 1 && len(bytes) > 4 {
ms.expectSendSize = extractMysqlPayloadSize(bytes[:4])
contents := bytes[4:]
if ms.prepareInfo != nil && contents[0] == 0 {
ms.prepareInfo.prepareStmtID = bytesToInt(contents[1:5])
}
}
if ms.coverRanges.head.next == nil || ms.coverRanges.head.next.end != respSeq {
ms.clear()
}
}
func (ms *MysqlSession) checkFinish() bool { func (ms *MysqlSession) checkFinish() bool {
if ms.coverRanges.head == nil || ms.coverRanges.head.next == nil { if ms.coverRanges.head == nil || ms.coverRanges.head.next == nil {
return false return false
@@ -147,6 +139,32 @@ func (ms *MysqlSession) clear() {
ms.coverRanges.clear() ms.coverRanges.clear()
} }
func (ms *MysqlSession) readFromServer(respSeq int64, bytes []byte) (ok, val int64, err error) {
defer func() {
// 检查返回包的seqid和请求需要的id是否连续
if ms.coverRanges.head.next == nil || ms.coverRanges.head.next.end != respSeq {
ms.clear()
}
if err != nil {
ms.clear()
}
}()
if ms.expectSendSize < 1 && len(bytes) > 4 {
ms.expectSendSize = extractMysqlPayloadSize(bytes[:4])
contents := bytes[4:]
if ms.prepareInfo != nil && contents[0] == 0 {
ms.prepareInfo.prepareStmtID = bytesToInt(contents[1:5])
}
return parseResponseHeader(contents)
}
err = fmt.Errorf("not need packet")
return
}
func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) {
contentSize := int64(len(bytes)) contentSize := int64(len(bytes))
@@ -318,7 +336,6 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) {
if mqp == nil { if mqp == nil {
return nil return nil
} }
mqp.GenerateJsonBytes()
return mqp return mqp
} }

View File

@@ -125,6 +125,24 @@ func extractMysqlPayloadSize(header []byte) int {
return int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16) return int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16)
} }
func bytesToInt(contents []byte) int { func bytesToInt(contents []byte) (val int) {
return int(uint32(contents[0]) | uint32(contents[1])<<8 | uint32(contents[2])<<16 | uint32(contents[3])<<24) if contents == nil || len(contents) < 1 {
return 0
}
for _, byteVal := range contents {
val = val * 256 + int(byteVal)
}
return
}
func bytesToIntSmallEndian(contents []byte) (val int) {
if contents == nil || len(contents) < 1 {
return 0
}
for idx := len(contents)-1; idx >= 0; idx -= 1 {
val = val * 256 + int(contents[idx])
}
return
} }