From 03c752a209114fbc8410793f026aa13be3f43d20 Mon Sep 17 00:00:00 2001 From: hebo Date: Fri, 13 Dec 2019 11:38:53 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=B8=BAkafka=E6=B6=88=E6=81=AF=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E5=8E=8B=E7=BC=A9=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- exporter/kafka.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/exporter/kafka.go b/exporter/kafka.go index 0475241..c1c32f1 100644 --- a/exporter/kafka.go +++ b/exporter/kafka.go @@ -15,6 +15,8 @@ var ( kafkaGroupID string asyncTopic string syncTopic string + compress string + compressType sarama.CompressionCodec ) func init() { @@ -27,8 +29,11 @@ func init() { &asyncTopic, "kafka-async-topic", "", "kafka async send topic. No default value") flag.StringVar( - &syncTopic, + &syncTopic, "kafka-sync-topic", "", "kafka sync send topic. No default value") + flag.StringVar( + &compress, + "compress-type", "", "kafka message compress type. Default value is no compress") } type kafkaExporter struct { @@ -39,6 +44,20 @@ type kafkaExporter struct { } func checkParams() { + switch compress { + case "": + compressType = sarama.CompressionNone + case "gzip": + compressType = sarama.CompressionGZIP + case "snappy": + compressType = sarama.CompressionSnappy + case "lz4": + compressType = sarama.CompressionLZ4 + default: + panic(fmt.Sprintf("cannot support compress type: %s", compress)) + } + + fmt.Printf("kafka message compress type: %s", compress) params := make(map[string]string) params["kafka-server"] = kafkaServer params["kafka-group-id"] = kafkaGroupID @@ -57,6 +76,7 @@ func NewKafkaExporter() (ke *kafkaExporter) { conf := sarama.NewConfig() conf.Producer.Return.Successes = true conf.ClientID = kafkaGroupID + conf.Producer.Compression = compressType addrs := strings.Split(kafkaServer, ",") syncProducer, err := sarama.NewSyncProducer(addrs, conf) if err != nil { From a223ce5dab175e32d69b3ad80ecbbdac6b8a78e5 Mon Sep 17 00:00:00 2001 From: hebo Date: Tue, 17 Dec 2019 20:38:04 +0800 Subject: [PATCH 2/2] fix bug of share memory --- model/query_piece.go | 5 ++--- model/query_piece_pool.go | 22 +--------------------- 2 files changed, 3 insertions(+), 24 deletions(-) diff --git a/model/query_piece.go b/model/query_piece.go index a99d446..1b2334f 100644 --- a/model/query_piece.go +++ b/model/query_piece.go @@ -1,8 +1,6 @@ package model import ( - "bytes" - "encoding/json" jsoniter "github.com/json-iterator/go" "github.com/pingcap/tidb/util/hack" "time" @@ -82,8 +80,8 @@ func (bqp *BaseQueryPiece) GetSQL() (*string) { func (bqp *BaseQueryPiece) Recovery() { } +/** func marsharQueryPieceShareMemory(qp interface{}, cacheBuffer []byte) []byte { - buffer := bytes.NewBuffer(cacheBuffer) err := json.NewEncoder(buffer).Encode(qp) if err != nil { @@ -92,6 +90,7 @@ func marsharQueryPieceShareMemory(qp interface{}, cacheBuffer []byte) []byte { return buffer.Bytes() } +*/ func marsharQueryPieceMonopolize(qp interface{}) (content []byte) { content, err := jsonIterator.Marshal(qp) diff --git a/model/query_piece_pool.go b/model/query_piece_pool.go index 7a58d67..358c29f 100644 --- a/model/query_piece_pool.go +++ b/model/query_piece_pool.go @@ -1,19 +1,13 @@ package model import ( - "github.com/zr-hebo/sniffer-agent/util" "sync" "time" ) -var ( - localSliceBufferPool = util.NewSliceBufferPool("json cache", (128+1)*1024) -) - type PooledMysqlQueryPiece struct { MysqlQueryPiece recoverPool *mysqlQueryPiecePool - sliceBufferPool *util.SliceBufferPool } func NewPooledMysqlQueryPiece( @@ -22,7 +16,6 @@ func NewPooledMysqlQueryPiece( pmqp *PooledMysqlQueryPiece) { pmqp = mqpp.Dequeue() - pmqp.sliceBufferPool = localSliceBufferPool nowInMS := time.Now().UnixNano() / millSecondUnit pmqp.SessionID = sessionID pmqp.ClientHost = clientIP @@ -41,9 +34,6 @@ func NewPooledMysqlQueryPiece( } func (pmqp *PooledMysqlQueryPiece) Recovery() { - if pmqp.sliceBufferPool != nil { - pmqp.sliceBufferPool.Enqueue(pmqp.jsonContent[:0]) - } pmqp.jsonContent = nil pmqp.recoverPool.Enqueue(pmqp) } @@ -59,17 +49,7 @@ func (pmqp *PooledMysqlQueryPiece) Bytes() (content []byte) { } func (pmqp *PooledMysqlQueryPiece) GenerateJsonBytes() { - if pmqp.sliceBufferPool == nil { - pmqp.jsonContent = marsharQueryPieceMonopolize(pmqp) - return - } - - var cacheBuffer = pmqp.sliceBufferPool.Dequeue() - if len(cacheBuffer) > 0 { - panic("there already have bytes in buffer") - } - - pmqp.jsonContent = marsharQueryPieceShareMemory(pmqp, cacheBuffer) + pmqp.jsonContent = marsharQueryPieceMonopolize(pmqp) return }