From 66dc02ebcb0394ac61d6b456be91cd25dd99f4d2 Mon Sep 17 00:00:00 2001 From: hebo Date: Thu, 29 Aug 2019 20:37:07 +0800 Subject: [PATCH] fix bug of execute prepare stmt --- exporter/kafka.go | 4 +- session-dealer/mysql/session.go | 80 +++------------------------------ 2 files changed, 8 insertions(+), 76 deletions(-) diff --git a/exporter/kafka.go b/exporter/kafka.go index fd5992d..0475241 100644 --- a/exporter/kafka.go +++ b/exporter/kafka.go @@ -99,7 +99,7 @@ func (ke *kafkaExporter) Export (qp model.QueryPiece) (err error){ if qp.NeedSyncSend() { // log.Debugf("deal ddl: %s\n", *qp.String()) - msg := &sarama.ProducerMessage{ + msg := &sarama.ProducerMessage { Topic: ke.syncTopic, Value: sarama.ByteEncoder(qp.Bytes()), } @@ -110,7 +110,7 @@ func (ke *kafkaExporter) Export (qp model.QueryPiece) (err error){ } else { // log.Debugf("deal non ddl: %s", *qp.String()) - msg := &sarama.ProducerMessage{ + msg := &sarama.ProducerMessage { Topic: ke.asyncTopic, Value: sarama.ByteEncoder(qp.Bytes()), } diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 8e342b7..8ad7571 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -26,7 +26,7 @@ type MysqlSession struct { coverRanges []*jigsaw expectSendSize int prepareInfo *prepareInfo - cachedPrepareStmt map[int]*string + cachedPrepareStmt map[int][]byte cachedStmtBytes []byte computeWindowSizeCounter int @@ -63,7 +63,7 @@ func NewMysqlSession( serverIP: serverIP, serverPort: serverPort, stmtBeginTime: time.Now().UnixNano() / millSecondUnit, - cachedPrepareStmt: make(map[int]*string, 8), + cachedPrepareStmt: make(map[int][]byte, 8), coverRanges: make([]*jigsaw, 0, 4), queryPieceReceiver: receiver, closeConn: make(chan bool, 1), @@ -76,55 +76,6 @@ func NewMysqlSession( return } - -func (ms *MysqlSession) dealTCPPacket() { - for { - select { - case closeConn := <-ms.closeConn: - - if closeConn { - return - } - - default: - if len(ms.tcpPacketCache) > 0 { - ms.parseTCPPacket() - - } else { - log.Debugf("no package need deal in session:%s, so sleep", *ms.connectionID) - time.Sleep(time.Second) - } - } - } -} - -func (ms *MysqlSession) parseTCPPacket() { - ms.pkgCacheLock.Lock() - var pkg *model.TCPPacket - if len(ms.tcpPacketCache) < 1 { - ms.pkgCacheLock.Unlock() - return - } - - pkg = ms.tcpPacketCache[0] - ms.tcpPacketCache = ms.tcpPacketCache[1:] - ms.pkgCacheLock.Unlock() - - if pkg.ToServer { - ms.resetBeginTime() - ms.readFromClient(pkg.Seq, pkg.Payload) - - } else { - ms.readFromServer(pkg.Payload) - qp := ms.GenerateQueryPiece() - if qp != nil { - ms.queryPieceReceiver <- qp - } - } - - return -} - func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) { if newPkt == nil { return @@ -150,27 +101,6 @@ func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) { ms.queryPieceReceiver <- qp } } - - // ms.pkgCacheLock.Lock() - // defer ms.pkgCacheLock.Unlock() - // - // insertIdx := len(ms.tcpPacketCache) - // for idx, pkt := range ms.tcpPacketCache { - // if pkt.Seq > newPkt.Seq { - // insertIdx = idx - // } - // } - // - // if insertIdx == len(ms.tcpPacketCache) { - // ms.tcpPacketCache = append(ms.tcpPacketCache, newPkt) - // } else { - // newCache := make([]*model.TCPPacket, len(ms.tcpPacketCache)+1) - // copy(newCache[:insertIdx], ms.tcpPacketCache[:insertIdx]) - // newCache[insertIdx] = newPkt - // copy(newCache[insertIdx+1:], ms.tcpPacketCache[insertIdx:]) - // ms.tcpPacketCache = newCache - // } - } func (ms *MysqlSession) resetBeginTime() { @@ -375,13 +305,15 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) { querySQLInBytes = ms.cachedStmtBytes[1:] querySQL := hack.String(querySQLInBytes) mqp.QuerySQL = &querySQL - ms.cachedPrepareStmt[ms.prepareInfo.prepareStmtID] = &querySQL + ms.cachedPrepareStmt[ms.prepareInfo.prepareStmtID] = querySQLInBytes log.Debugf("prepare statement %s, get id:%d", querySQL, ms.prepareInfo.prepareStmtID) case ComStmtExecute: prepareStmtID := bytesToInt(ms.cachedStmtBytes[1:5]) mqp = ms.composeQueryPiece() - mqp.QuerySQL = ms.cachedPrepareStmt[prepareStmtID] + querySQLInBytes = ms.cachedPrepareStmt[prepareStmtID] + querySQL := hack.String(querySQLInBytes) + mqp.QuerySQL = &querySQL log.Debugf("execute prepare statement:%d", prepareStmtID) case ComStmtClose: