From 85bef28022927731f47cd313de38a5732eb71c3a Mon Sep 17 00:00:00 2001 From: hebo Date: Thu, 14 Nov 2019 21:52:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=AE=BE=E7=BD=AE=E6=8A=93?= =?UTF-8?q?=E5=8C=85=E7=8E=87=E5=BC=95=E8=B5=B7=E7=9A=84=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E5=BC=82=E5=B8=B8=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- session-dealer/mysql/cover_range.go | 30 ++++++++++++++--------------- session-dealer/mysql/session.go | 8 ++++++-- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/session-dealer/mysql/cover_range.go b/session-dealer/mysql/cover_range.go index da5b4bb..7e15703 100644 --- a/session-dealer/mysql/cover_range.go +++ b/session-dealer/mysql/cover_range.go @@ -6,17 +6,17 @@ type coverageNode struct { end int64 next *coverageNode - crp *coveragePool + crp *coveragePool } func newCoverage(begin, end int64) (*coverageNode) { return &coverageNode{ begin: begin, - end: end, + end: end, } } -func (crn *coverageNode) Recovery() { +func (crn *coverageNode) Recovery() { crn.crp.Enqueue(crn) } @@ -28,6 +28,7 @@ func NewCoverRanges() *coverRanges { return &coverRanges{ head: &coverageNode{ begin: -1, + end: -1, }, } } @@ -42,7 +43,7 @@ func (crs *coverRanges) clear() { crs.head.next = nil } -func (crs *coverRanges) addRange(node *coverageNode) { +func (crs *coverRanges) addRange(node *coverageNode) { // insert range in asc order var currRange = crs.head; for currRange != nil && currRange.next != nil { @@ -52,10 +53,9 @@ func (crs *coverRanges) addRange(node *coverageNode) { node.next = checkRange node = nil break - - } else { - currRange = checkRange } + + currRange = checkRange } if node != nil && currRange != nil { @@ -65,12 +65,12 @@ func (crs *coverRanges) addRange(node *coverageNode) { crs.mergeRanges() } -func (crs *coverRanges) mergeRanges() { +func (crs *coverRanges) mergeRanges() { // merge ranges currRange := crs.head.next for currRange != nil && currRange.next != nil { checkRange := currRange.next - if currRange.end >= checkRange.begin && currRange.end < checkRange.end { + if currRange.begin <= checkRange.begin && currRange.end >= checkRange.begin && currRange.end < checkRange.end { currRange.end = checkRange.end currRange.next = checkRange.next checkRange.Recovery() @@ -81,19 +81,17 @@ func (crs *coverRanges) mergeRanges() { } } - type coveragePool struct { - queue chan *coverageNode + queue chan *coverageNode } - func NewCoveragePool() (cp *coveragePool) { return &coveragePool{ queue: make(chan *coverageNode, 256), } } -func (crp *coveragePool) NewCoverage(begin, end int64)(cn *coverageNode) { +func (crp *coveragePool) NewCoverage(begin, end int64) (cn *coverageNode) { cn = crp.Dequeue() cn.begin = begin cn.end = end @@ -115,7 +113,7 @@ func (crp *coveragePool) Enqueue(cn *coverageNode) { } } -func (crp *coveragePool) Dequeue() (cn *coverageNode) { +func (crp *coveragePool) Dequeue() (cn *coverageNode) { // log.Debugf("coveragePool dequeue: %d", len(crp.queue)) defer func() { @@ -126,11 +124,11 @@ func (crp *coveragePool) Dequeue() (cn *coverageNode) { }() select { - case cn = <- crp.queue: + case cn = <-crp.queue: return default: cn = &coverageNode{} return } -} \ No newline at end of file +} diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 2b65f0c..876d4fe 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -87,7 +87,7 @@ func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) { ms.readFromClient(newPkt.Seq, newPkt.Payload) } else { - ms.readFromServer(newPkt.Payload) + ms.readFromServer(newPkt.Seq, newPkt.Payload) qp := ms.GenerateQueryPiece() if qp != nil { ms.queryPieceReceiver <- qp @@ -99,7 +99,7 @@ func (ms *MysqlSession) resetBeginTime() { ms.stmtBeginTime = time.Now().UnixNano() / millSecondUnit } -func (ms *MysqlSession) readFromServer(bytes []byte) { +func (ms *MysqlSession) readFromServer(respSeq int64, bytes []byte) { if ms.expectSendSize < 1 && len(bytes) > 4 { ms.expectSendSize = extractMysqlPayloadSize(bytes[:4]) contents := bytes[4:] @@ -107,6 +107,10 @@ func (ms *MysqlSession) readFromServer(bytes []byte) { ms.prepareInfo.prepareStmtID = bytesToInt(contents[1:5]) } } + + if ms.coverRanges.head.next == nil || ms.coverRanges.head.next.end != respSeq { + ms.clear() + } } func (ms *MysqlSession) checkFinish() bool {