merge ranges

This commit is contained in:
hebo
2019-09-07 19:07:49 +08:00
parent c7833a9df7
commit 07d0c60086
113 changed files with 11865 additions and 95 deletions

View File

@@ -1,9 +1,7 @@
package mysql
import (
"container/list"
"fmt"
// "strings"
"sync"
"time"
@@ -25,7 +23,7 @@ type MysqlSession struct {
// packageOffset int64
beginSeqID int64
endSeqID int64
coverRanges *list.List
coverRanges *coverRanges
expectReceiveSize int
expectSendSize int
prepareInfo *prepareInfo
@@ -61,7 +59,7 @@ func NewMysqlSession(
queryPieceReceiver: receiver,
closeConn: make(chan bool, 1),
expectReceiveSize: -1,
coverRanges: list.New(),
coverRanges: NewCoverRanges(),
ignoreAckID: -1,
sendSize: 0,
pkgCacheLock: sync.Mutex{},
@@ -95,7 +93,6 @@ func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) {
ms.queryPieceReceiver <- qp
}
}
}
func (ms *MysqlSession) resetBeginTime() {
@@ -112,39 +109,12 @@ func (ms *MysqlSession) readFromServer(bytes []byte) {
}
}
func mergeRanges(currRange *jigsaw, pkgRanges []*jigsaw) (mergedRange *jigsaw, newPkgRanges []*jigsaw) {
var nextRange *jigsaw
newPkgRanges = make([]*jigsaw, 0, 4)
if len(pkgRanges) < 1 {
return currRange, newPkgRanges
} else if len(pkgRanges) == 1 {
nextRange = pkgRanges[0]
} else {
nextRange, newPkgRanges = mergeRanges(pkgRanges[0], pkgRanges[1:])
}
if currRange.end >= nextRange.begin {
mergedRange = &jigsaw{begin: currRange.begin, end: nextRange.end}
} else {
tmpRanges := make([]*jigsaw, len(newPkgRanges)+1)
tmpRanges[0] = nextRange
if len(newPkgRanges) > 0 {
copy(tmpRanges[1:], newPkgRanges)
}
newPkgRanges = tmpRanges
mergedRange = currRange
}
return
}
func (ms *MysqlSession) checkFinish() bool {
ms.mergeRange()
if ms.endSeqID - ms.beginSeqID == int64(len(ms.cachedStmtBytes)) && ms.expectReceiveSize == 0 {
if ms.coverRanges.head == nil {
return false
}
if ms.coverRanges.head.end - ms.coverRanges.head.begin == int64(len(ms.cachedStmtBytes)) {
return true
}
@@ -160,7 +130,7 @@ func (ms *MysqlSession) clear() {
ms.endSeqID = -1
ms.ignoreAckID = -1
ms.sendSize = 0
ms.coverRanges = list.New()
ms.coverRanges.clear()
}
func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) {
@@ -230,52 +200,10 @@ func (ms *MysqlSession) readFromClient(seqID int64, bytes []byte) {
copy(ms.cachedStmtBytes[seqOffset:seqOffset+contentSize], bytes)
}
ms.addRange(newJigsaw(seqID, seqID+contentSize))
ms.expectReceiveSize = ms.expectReceiveSize - int(contentSize)
ms.coverRanges.addRange(coverRangePool.NewCoverage(seqID, seqID+contentSize))
// ms.expectReceiveSize = ms.expectReceiveSize - int(contentSize)
}
func (ms *MysqlSession) addRange(js *jigsaw) {
head := ms.coverRanges.Front()
// empty list
if head == nil {
ms.coverRanges.PushBack(js)
return
}
// find insert position
var node = head
for ; node != nil; node = node.Next() {
nodeVal := node.Value.(*jigsaw)
if nodeVal.begin > js.begin {
break
}
}
// insert element
if node != nil {
ms.coverRanges.InsertBefore(js, node)
} else {
ms.coverRanges.PushBack(js)
}
}
func (ms *MysqlSession) mergeRange() {
head := ms.coverRanges.Front()
// empty list
if head == nil {
return
}
// find insert position
var node = head
for ; node != nil; node = node.Next() {
nodeVal := node.Value.(*jigsaw)
if nodeVal.begin <= ms.endSeqID && nodeVal.end > ms.endSeqID {
ms.endSeqID = nodeVal.end
}
}
}
func (ms *MysqlSession) GenerateQueryPiece() (qp model.QueryPiece) {
defer ms.clear()
@@ -385,6 +313,5 @@ func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (m
func (ms *MysqlSession) composeQueryPiece() (mqp *model.PooledMysqlQueryPiece) {
return model.NewPooledMysqlQueryPiece(
ms.connectionID, ms.clientHost, ms.visitUser, ms.visitDB, ms.clientHost, ms.serverIP,
ms.clientPort, ms.serverPort, communicator.GetConfig(communicator.THROW_PACKET_RATE).(float64),
ms.stmtBeginTime)
ms.clientPort, ms.serverPort, communicator.GetThrowPacketRate(), ms.stmtBeginTime)
}