diff --git a/exporter/kafka.go b/exporter/kafka.go index 0475241..c1c32f1 100644 --- a/exporter/kafka.go +++ b/exporter/kafka.go @@ -15,6 +15,8 @@ var ( kafkaGroupID string asyncTopic string syncTopic string + compress string + compressType sarama.CompressionCodec ) func init() { @@ -27,8 +29,11 @@ func init() { &asyncTopic, "kafka-async-topic", "", "kafka async send topic. No default value") flag.StringVar( - &syncTopic, + &syncTopic, "kafka-sync-topic", "", "kafka sync send topic. No default value") + flag.StringVar( + &compress, + "compress-type", "", "kafka message compress type. Default value is no compress") } type kafkaExporter struct { @@ -39,6 +44,20 @@ type kafkaExporter struct { } func checkParams() { + switch compress { + case "": + compressType = sarama.CompressionNone + case "gzip": + compressType = sarama.CompressionGZIP + case "snappy": + compressType = sarama.CompressionSnappy + case "lz4": + compressType = sarama.CompressionLZ4 + default: + panic(fmt.Sprintf("cannot support compress type: %s", compress)) + } + + fmt.Printf("kafka message compress type: %s", compress) params := make(map[string]string) params["kafka-server"] = kafkaServer params["kafka-group-id"] = kafkaGroupID @@ -57,6 +76,7 @@ func NewKafkaExporter() (ke *kafkaExporter) { conf := sarama.NewConfig() conf.Producer.Return.Successes = true conf.ClientID = kafkaGroupID + conf.Producer.Compression = compressType addrs := strings.Split(kafkaServer, ",") syncProducer, err := sarama.NewSyncProducer(addrs, conf) if err != nil {