From 6c2fcd13b28fbb00b2c896fbc8742ef81cee39c1 Mon Sep 17 00:00:00 2001 From: hebo Date: Tue, 24 Dec 2019 22:39:30 +0800 Subject: [PATCH] add response parse module --- model/mysql_query_piece.go | 7 +++ model/query_piece.go | 4 ++ model/query_piece_pool.go | 2 + session-dealer/mysql/resp.go | 75 +++++++++++++++++++++++++++++++++ session-dealer/mysql/session.go | 49 ++++++++++++++------- session-dealer/mysql/util.go | 22 +++++++++- 6 files changed, 141 insertions(+), 18 deletions(-) create mode 100644 session-dealer/mysql/resp.go diff --git a/model/mysql_query_piece.go b/model/mysql_query_piece.go index fe19aaf..3b10580 100644 --- a/model/mysql_query_piece.go +++ b/model/mysql_query_piece.go @@ -16,6 +16,8 @@ type MysqlQueryPiece struct { VisitDB *string `json:"db"` QuerySQL *string `json:"sql"` CostTimeInMS int64 `json:"cms"` + OK int64 `json:"ok"` + ResponseVal int64 `json:"rv"` } func (mqp *MysqlQueryPiece) String() (*string) { @@ -42,3 +44,8 @@ func (mqp *MysqlQueryPiece) GenerateJsonBytes() { func (mqp *MysqlQueryPiece) GetSQL() (str *string) { return mqp.QuerySQL } + +func (mqp *MysqlQueryPiece) SetResponse(ok, respVal int64) { + mqp.OK = ok + mqp.ResponseVal = respVal +} diff --git a/model/query_piece.go b/model/query_piece.go index 1b2334f..d796370 100644 --- a/model/query_piece.go +++ b/model/query_piece.go @@ -14,6 +14,7 @@ type QueryPiece interface { GetSQL() *string NeedSyncSend() bool Recovery() + SetResponse(int64, int64) } // BaseQueryPiece 查询信息 @@ -80,6 +81,9 @@ func (bqp *BaseQueryPiece) GetSQL() (*string) { func (bqp *BaseQueryPiece) Recovery() { } +func (bqp *BaseQueryPiece) SetResponse(ok, respVal int64) { +} + /** func marsharQueryPieceShareMemory(qp interface{}, cacheBuffer []byte) []byte { buffer := bytes.NewBuffer(cacheBuffer) diff --git a/model/query_piece_pool.go b/model/query_piece_pool.go index 358c29f..3ad6f13 100644 --- a/model/query_piece_pool.go +++ b/model/query_piece_pool.go @@ -29,6 +29,8 @@ func NewPooledMysqlQueryPiece( pmqp.EventTime = stmtBeginTime pmqp.CostTimeInMS = nowInMS - stmtBeginTime pmqp.recoverPool = mqpp + pmqp.OK = 0 + pmqp.ResponseVal = -1 return } diff --git a/session-dealer/mysql/resp.go b/session-dealer/mysql/resp.go new file mode 100644 index 0000000..9fffb58 --- /dev/null +++ b/session-dealer/mysql/resp.go @@ -0,0 +1,75 @@ +package mysql + +import ( + "fmt" +) + +const ( + PACKET_OK = 0 + PACKET_EOF = 254 + PACKET_ERR = 255 +) + +func lenencInt(bytesVal []byte) (val int64) { + if bytesVal == nil || len(bytesVal) < 1 { + val = -1 + return + } + + fb := bytesVal[0] + var offset int64 + switch { + case fb < 251: + val = int64(fb) + + case fb == 252: + numLen := int64(2) + offset = 1+numLen + val = int64(bytesToInt(bytesVal[1:offset])) + + case fb == 253: + numLen := int64(3) + offset = 1+numLen + val = int64(bytesToInt(bytesVal[1:offset])) + + case fb == 254: + numLen := int64(8) + offset = 1+numLen + val = int64(bytesToInt(bytesVal[1:offset])) + + default: + val = -1 + } + return +} + +func parseResponseHeader(payload []byte) (ok, val int64, err error) { + if payload == nil || len(payload) < 1 { + err = fmt.Errorf("no bytes to parse") + return + } + + fmt.Printf("%#v\n", payload) + defer func() { + fmt.Printf("%#v\n", ok) + fmt.Printf("%#v\n", val) + }() + + switch { + case payload[0] == PACKET_OK && len(payload)>=7: + case payload[0] == PACKET_EOF && len(payload)<=9: + // set ok and mysql affected rows number + ok = 1 + val = lenencInt(payload) + + case payload[0] == PACKET_ERR && len(payload)>3: + // set not ok and mysql execute error-code + ok = 0 + val = int64(bytesToIntSmallEndian(payload[1:3])) + + default: + err = fmt.Errorf("invalid response packet") + } + return +} + diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 7016f00..eb40037 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -91,9 +91,15 @@ func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) { ms.readFromClient(newPkt.Seq, newPkt.Payload) } else { - ms.readFromServer(newPkt.Seq, newPkt.Payload) + ok, respVal, err := ms.readFromServer(newPkt.Seq, newPkt.Payload) + if err != nil { + log.Debug(err.Error()) + return + } + qp := ms.GenerateQueryPiece() if qp != nil { + qp.SetResponse(ok, respVal) ms.queryPieceReceiver <- qp } } @@ -103,20 +109,6 @@ func (ms *MysqlSession) resetBeginTime() { ms.stmtBeginTime = time.Now().UnixNano() / millSecondUnit } -func (ms *MysqlSession) readFromServer(respSeq int64, bytes []byte) { - if ms.expectSendSize < 1 && len(bytes) > 4 { - ms.expectSendSize = extractMysqlPayloadSize(bytes[:4]) - contents := bytes[4:] - if ms.prepareInfo != nil && contents[0] == 0 { - ms.prepareInfo.prepareStmtID = bytesToInt(contents[1:5]) - } - } - - if ms.coverRanges.head.next == nil || ms.coverRanges.head.next.end != respSeq { - ms.clear() - } -} - func (ms *MysqlSession) checkFinish() bool { if ms.coverRanges.head == nil || ms.coverRanges.head.next == nil { return false @@ -147,6 +139,32 @@ func (ms *MysqlSession) clear() { ms.coverRanges.clear() } +func (ms *MysqlSession) readFromServer(respSeq int64, bytes []byte) (ok, val int64, err error) { + defer func() { + // 检查返回包的seqid和请求需要的id是否连续 + if ms.coverRanges.head.next == nil || ms.coverRanges.head.next.end != respSeq { + ms.clear() + } + + if err != nil { + ms.clear() + } + }() + + if ms.expectSendSize < 1 && len(bytes) > 4 { + ms.expectSendSize = extractMysqlPayloadSize(bytes[:4]) + contents := bytes[4:] + if ms.prepareInfo != nil && contents[0] == 0 { + ms.prepareInfo.prepareStmtID = bytesToInt(contents[1:5]) + } + + return parseResponseHeader(contents) + } + + err = fmt.Errorf("not need packet") + return +} + func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) { contentSize := int64(len(bytes)) @@ -318,7 +336,6 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { if mqp == nil { return nil } - mqp.GenerateJsonBytes() return mqp } diff --git a/session-dealer/mysql/util.go b/session-dealer/mysql/util.go index 53c1752..d81bac6 100644 --- a/session-dealer/mysql/util.go +++ b/session-dealer/mysql/util.go @@ -125,6 +125,24 @@ func extractMysqlPayloadSize(header []byte) int { return int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16) } -func bytesToInt(contents []byte) int { - return int(uint32(contents[0]) | uint32(contents[1])<<8 | uint32(contents[2])<<16 | uint32(contents[3])<<24) +func bytesToInt(contents []byte) (val int) { + if contents == nil || len(contents) < 1 { + return 0 + } + + for _, byteVal := range contents { + val = val * 256 + int(byteVal) + } + return +} + +func bytesToIntSmallEndian(contents []byte) (val int) { + if contents == nil || len(contents) < 1 { + return 0 + } + + for idx := len(contents)-1; idx >= 0; idx -= 1 { + val = val * 256 + int(contents[idx]) + } + return } \ No newline at end of file