为kafka消息添加压缩设置

This commit is contained in:
hebo 2019-12-13 11:38:53 +08:00
parent 8db6c6f4fa
commit 03c752a209
1 changed files with 21 additions and 1 deletions

View File

@ -15,6 +15,8 @@ var (
kafkaGroupID string
asyncTopic string
syncTopic string
compress string
compressType sarama.CompressionCodec
)
func init() {
@ -27,8 +29,11 @@ func init() {
&asyncTopic,
"kafka-async-topic", "", "kafka async send topic. No default value")
flag.StringVar(
&syncTopic,
&syncTopic,
"kafka-sync-topic", "", "kafka sync send topic. No default value")
flag.StringVar(
&compress,
"compress-type", "", "kafka message compress type. Default value is no compress")
}
type kafkaExporter struct {
@ -39,6 +44,20 @@ type kafkaExporter struct {
}
func checkParams() {
switch compress {
case "":
compressType = sarama.CompressionNone
case "gzip":
compressType = sarama.CompressionGZIP
case "snappy":
compressType = sarama.CompressionSnappy
case "lz4":
compressType = sarama.CompressionLZ4
default:
panic(fmt.Sprintf("cannot support compress type: %s", compress))
}
fmt.Printf("kafka message compress type: %s", compress)
params := make(map[string]string)
params["kafka-server"] = kafkaServer
params["kafka-group-id"] = kafkaGroupID
@ -57,6 +76,7 @@ func NewKafkaExporter() (ke *kafkaExporter) {
conf := sarama.NewConfig()
conf.Producer.Return.Successes = true
conf.ClientID = kafkaGroupID
conf.Producer.Compression = compressType
addrs := strings.Split(kafkaServer, ",")
syncProducer, err := sarama.NewSyncProducer(addrs, conf)
if err != nil {