Merge pull request #8 from zr-hebo/develop-hebo

修复设置抓包率引起的查询时间异常的问题
This commit is contained in:
河伯 2019-11-15 14:51:03 +08:00 committed by GitHub
commit 3a555db2d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 30 additions and 28 deletions

View File

@ -89,9 +89,9 @@ func outletSetConfig(resp http.ResponseWriter, req *http.Request) {
}
func GetTCPCapturePacketRate() float64 {
return catpurePacketRate.tcpTPR
return catpurePacketRate.tcpCPR
}
func GetMysqlCapturePacketRate() float64 {
return catpurePacketRate.mysqlTPR
return catpurePacketRate.mysqlCPR
}

View File

@ -11,16 +11,16 @@ type configItem interface {
}
type capturePacketRateConfig struct {
name string
tcpTPR float64
mysqlTPR float64
name string
tcpCPR float64
mysqlCPR float64
}
func newCapturePacketRateConfig() (cprc *capturePacketRateConfig) {
cprc = &capturePacketRateConfig{
name: CAPTURE_PACKET_RATE,
tcpTPR: 1.0,
mysqlTPR: 1.0,
tcpCPR: 1.0,
mysqlCPR: 1.0,
}
return
}
@ -33,11 +33,11 @@ func (cprc *capturePacketRateConfig) setVal (val interface{}) (err error){
}
fmt.Printf("set config %s: %v\n", CAPTURE_PACKET_RATE, realVal)
cprc.mysqlTPR = realVal
cprc.tcpTPR = math.Sqrt(realVal)
cprc.mysqlCPR = realVal
cprc.tcpCPR = math.Sqrt(realVal)
return
}
func (cprc *capturePacketRateConfig) getVal () (val interface{}){
return cprc.mysqlTPR
return cprc.mysqlCPR
}

View File

@ -6,17 +6,17 @@ type coverageNode struct {
end int64
next *coverageNode
crp *coveragePool
crp *coveragePool
}
func newCoverage(begin, end int64) (*coverageNode) {
return &coverageNode{
begin: begin,
end: end,
end: end,
}
}
func (crn *coverageNode) Recovery() {
func (crn *coverageNode) Recovery() {
crn.crp.Enqueue(crn)
}
@ -28,6 +28,7 @@ func NewCoverRanges() *coverRanges {
return &coverRanges{
head: &coverageNode{
begin: -1,
end: -1,
},
}
}
@ -42,7 +43,7 @@ func (crs *coverRanges) clear() {
crs.head.next = nil
}
func (crs *coverRanges) addRange(node *coverageNode) {
func (crs *coverRanges) addRange(node *coverageNode) {
// insert range in asc order
var currRange = crs.head;
for currRange != nil && currRange.next != nil {
@ -52,10 +53,9 @@ func (crs *coverRanges) addRange(node *coverageNode) {
node.next = checkRange
node = nil
break
} else {
currRange = checkRange
}
currRange = checkRange
}
if node != nil && currRange != nil {
@ -65,12 +65,12 @@ func (crs *coverRanges) addRange(node *coverageNode) {
crs.mergeRanges()
}
func (crs *coverRanges) mergeRanges() {
func (crs *coverRanges) mergeRanges() {
// merge ranges
currRange := crs.head.next
for currRange != nil && currRange.next != nil {
checkRange := currRange.next
if currRange.end >= checkRange.begin && currRange.end < checkRange.end {
if currRange.begin <= checkRange.begin && currRange.end >= checkRange.begin && currRange.end < checkRange.end {
currRange.end = checkRange.end
currRange.next = checkRange.next
checkRange.Recovery()
@ -81,19 +81,17 @@ func (crs *coverRanges) mergeRanges() {
}
}
type coveragePool struct {
queue chan *coverageNode
queue chan *coverageNode
}
func NewCoveragePool() (cp *coveragePool) {
return &coveragePool{
queue: make(chan *coverageNode, 256),
}
}
func (crp *coveragePool) NewCoverage(begin, end int64)(cn *coverageNode) {
func (crp *coveragePool) NewCoverage(begin, end int64) (cn *coverageNode) {
cn = crp.Dequeue()
cn.begin = begin
cn.end = end
@ -115,7 +113,7 @@ func (crp *coveragePool) Enqueue(cn *coverageNode) {
}
}
func (crp *coveragePool) Dequeue() (cn *coverageNode) {
func (crp *coveragePool) Dequeue() (cn *coverageNode) {
// log.Debugf("coveragePool dequeue: %d", len(crp.queue))
defer func() {
@ -126,11 +124,11 @@ func (crp *coveragePool) Dequeue() (cn *coverageNode) {
}()
select {
case cn = <- crp.queue:
case cn = <-crp.queue:
return
default:
cn = &coverageNode{}
return
}
}
}

View File

@ -87,7 +87,7 @@ func (ms *MysqlSession) ReceiveTCPPacket(newPkt *model.TCPPacket) {
ms.readFromClient(newPkt.Seq, newPkt.Payload)
} else {
ms.readFromServer(newPkt.Payload)
ms.readFromServer(newPkt.Seq, newPkt.Payload)
qp := ms.GenerateQueryPiece()
if qp != nil {
ms.queryPieceReceiver <- qp
@ -99,7 +99,7 @@ func (ms *MysqlSession) resetBeginTime() {
ms.stmtBeginTime = time.Now().UnixNano() / millSecondUnit
}
func (ms *MysqlSession) readFromServer(bytes []byte) {
func (ms *MysqlSession) readFromServer(respSeq int64, bytes []byte) {
if ms.expectSendSize < 1 && len(bytes) > 4 {
ms.expectSendSize = extractMysqlPayloadSize(bytes[:4])
contents := bytes[4:]
@ -107,6 +107,10 @@ func (ms *MysqlSession) readFromServer(bytes []byte) {
ms.prepareInfo.prepareStmtID = bytesToInt(contents[1:5])
}
}
if ms.coverRanges.head.next == nil || ms.coverRanges.head.next.end != respSeq {
ms.clear()
}
}
func (ms *MysqlSession) checkFinish() bool {