fix bug of execute prepare stmt
This commit is contained in:
parent
18845f8278
commit
66dc02ebcb
|
@ -99,7 +99,7 @@ func (ke *kafkaExporter) Export (qp model.QueryPiece) (err error){
|
||||||
if qp.NeedSyncSend() {
|
if qp.NeedSyncSend() {
|
||||||
// log.Debugf("deal ddl: %s\n", *qp.String())
|
// log.Debugf("deal ddl: %s\n", *qp.String())
|
||||||
|
|
||||||
msg := &sarama.ProducerMessage{
|
msg := &sarama.ProducerMessage {
|
||||||
Topic: ke.syncTopic,
|
Topic: ke.syncTopic,
|
||||||
Value: sarama.ByteEncoder(qp.Bytes()),
|
Value: sarama.ByteEncoder(qp.Bytes()),
|
||||||
}
|
}
|
||||||
|
@ -110,7 +110,7 @@ func (ke *kafkaExporter) Export (qp model.QueryPiece) (err error){
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// log.Debugf("deal non ddl: %s", *qp.String())
|
// log.Debugf("deal non ddl: %s", *qp.String())
|
||||||
msg := &sarama.ProducerMessage{
|
msg := &sarama.ProducerMessage {
|
||||||
Topic: ke.asyncTopic,
|
Topic: ke.asyncTopic,
|
||||||
Value: sarama.ByteEncoder(qp.Bytes()),
|
Value: sarama.ByteEncoder(qp.Bytes()),
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ type MysqlSession struct {
|
||||||
coverRanges []*jigsaw
|
coverRanges []*jigsaw
|
||||||
expectSendSize int
|
expectSendSize int
|
||||||
prepareInfo *prepareInfo
|
prepareInfo *prepareInfo
|
||||||
cachedPrepareStmt map[int]*string
|
cachedPrepareStmt map[int][]byte
|
||||||
cachedStmtBytes []byte
|
cachedStmtBytes []byte
|
||||||
computeWindowSizeCounter int
|
computeWindowSizeCounter int
|
||||||
|
|
||||||
|
@ -63,7 +63,7 @@ func NewMysqlSession(
|
||||||
serverIP: serverIP,
|
serverIP: serverIP,
|
||||||
serverPort: serverPort,
|
serverPort: serverPort,
|
||||||
stmtBeginTime: time.Now().UnixNano() / millSecondUnit,
|
stmtBeginTime: time.Now().UnixNano() / millSecondUnit,
|
||||||
cachedPrepareStmt: make(map[int]*string, 8),
|
cachedPrepareStmt: make(map[int][]byte, 8),
|
||||||
coverRanges: make([]*jigsaw, 0, 4),
|
coverRanges: make([]*jigsaw, 0, 4),
|
||||||
queryPieceReceiver: receiver,
|
queryPieceReceiver: receiver,
|
||||||
closeConn: make(chan bool, 1),
|
closeConn: make(chan bool, 1),
|
||||||
|
@ -76,55 +76,6 @@ func NewMysqlSession(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (ms *MysqlSession) dealTCPPacket() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case closeConn := <-ms.closeConn:
|
|
||||||
|
|
||||||
if closeConn {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
if len(ms.tcpPacketCache) > 0 {
|
|
||||||
ms.parseTCPPacket()
|
|
||||||
|
|
||||||
} else {
|
|
||||||
log.Debugf("no package need deal in session:%s, so sleep", *ms.connectionID)
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ms *MysqlSession) parseTCPPacket() {
|
|
||||||
ms.pkgCacheLock.Lock()
|
|
||||||
var pkg *model.TCPPacket
|
|
||||||
if len(ms.tcpPacketCache) < 1 {
|
|
||||||
ms.pkgCacheLock.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
pkg = ms.tcpPacketCache[0]
|
|
||||||
ms.tcpPacketCache = ms.tcpPacketCache[1:]
|
|
||||||
ms.pkgCacheLock.Unlock()
|
|
||||||
|
|
||||||
if pkg.ToServer {
|
|
||||||
ms.resetBeginTime()
|
|
||||||
ms.readFromClient(pkg.Seq, pkg.Payload)
|
|
||||||
|
|
||||||
} else {
|
|
||||||
ms.readFromServer(pkg.Payload)
|
|
||||||
qp := ms.GenerateQueryPiece()
|
|
||||||
if qp != nil {
|
|
||||||
ms.queryPieceReceiver <- qp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) {
|
func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) {
|
||||||
if newPkt == nil {
|
if newPkt == nil {
|
||||||
return
|
return
|
||||||
|
@ -150,27 +101,6 @@ func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) {
|
||||||
ms.queryPieceReceiver <- qp
|
ms.queryPieceReceiver <- qp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ms.pkgCacheLock.Lock()
|
|
||||||
// defer ms.pkgCacheLock.Unlock()
|
|
||||||
//
|
|
||||||
// insertIdx := len(ms.tcpPacketCache)
|
|
||||||
// for idx, pkt := range ms.tcpPacketCache {
|
|
||||||
// if pkt.Seq > newPkt.Seq {
|
|
||||||
// insertIdx = idx
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if insertIdx == len(ms.tcpPacketCache) {
|
|
||||||
// ms.tcpPacketCache = append(ms.tcpPacketCache, newPkt)
|
|
||||||
// } else {
|
|
||||||
// newCache := make([]*model.TCPPacket, len(ms.tcpPacketCache)+1)
|
|
||||||
// copy(newCache[:insertIdx], ms.tcpPacketCache[:insertIdx])
|
|
||||||
// newCache[insertIdx] = newPkt
|
|
||||||
// copy(newCache[insertIdx+1:], ms.tcpPacketCache[insertIdx:])
|
|
||||||
// ms.tcpPacketCache = newCache
|
|
||||||
// }
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *MysqlSession) resetBeginTime() {
|
func (ms *MysqlSession) resetBeginTime() {
|
||||||
|
@ -375,13 +305,15 @@ func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) {
|
||||||
querySQLInBytes = ms.cachedStmtBytes[1:]
|
querySQLInBytes = ms.cachedStmtBytes[1:]
|
||||||
querySQL := hack.String(querySQLInBytes)
|
querySQL := hack.String(querySQLInBytes)
|
||||||
mqp.QuerySQL = &querySQL
|
mqp.QuerySQL = &querySQL
|
||||||
ms.cachedPrepareStmt[ms.prepareInfo.prepareStmtID] = &querySQL
|
ms.cachedPrepareStmt[ms.prepareInfo.prepareStmtID] = querySQLInBytes
|
||||||
log.Debugf("prepare statement %s, get id:%d", querySQL, ms.prepareInfo.prepareStmtID)
|
log.Debugf("prepare statement %s, get id:%d", querySQL, ms.prepareInfo.prepareStmtID)
|
||||||
|
|
||||||
case ComStmtExecute:
|
case ComStmtExecute:
|
||||||
prepareStmtID := bytesToInt(ms.cachedStmtBytes[1:5])
|
prepareStmtID := bytesToInt(ms.cachedStmtBytes[1:5])
|
||||||
mqp = ms.composeQueryPiece()
|
mqp = ms.composeQueryPiece()
|
||||||
mqp.QuerySQL = ms.cachedPrepareStmt[prepareStmtID]
|
querySQLInBytes = ms.cachedPrepareStmt[prepareStmtID]
|
||||||
|
querySQL := hack.String(querySQLInBytes)
|
||||||
|
mqp.QuerySQL = &querySQL
|
||||||
log.Debugf("execute prepare statement:%d", prepareStmtID)
|
log.Debugf("execute prepare statement:%d", prepareStmtID)
|
||||||
|
|
||||||
case ComStmtClose:
|
case ComStmtClose:
|
||||||
|
|
Loading…
Reference in New Issue