143 lines
3.1 KiB
Go
143 lines
3.1 KiB
Go
package exporter
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/Shopify/sarama"
|
|
log "github.com/golang/glog"
|
|
"github.com/zr-hebo/sniffer-agent/model"
|
|
)
|
|
|
|
var (
|
|
kafkaServer string
|
|
kafkaGroupID string
|
|
asyncTopic string
|
|
syncTopic string
|
|
compress string
|
|
compressType sarama.CompressionCodec
|
|
)
|
|
|
|
func init() {
|
|
flag.StringVar(
|
|
&kafkaServer, "kafka-server", "", "kafka server address. No default value")
|
|
flag.StringVar(
|
|
&kafkaGroupID,
|
|
"kafka-group-id", "", "kafka service group. No default value")
|
|
flag.StringVar(
|
|
&asyncTopic,
|
|
"kafka-async-topic", "", "kafka async send topic. No default value")
|
|
flag.StringVar(
|
|
&syncTopic,
|
|
"kafka-sync-topic", "", "kafka sync send topic. No default value")
|
|
flag.StringVar(
|
|
&compress,
|
|
"compress-type", "", "kafka message compress type. Default value is no compress")
|
|
}
|
|
|
|
type kafkaExporter struct {
|
|
asyncProducer sarama.AsyncProducer
|
|
syncProducer sarama.SyncProducer
|
|
asyncTopic string
|
|
syncTopic string
|
|
}
|
|
|
|
func checkParams() {
|
|
switch compress {
|
|
case "":
|
|
compressType = sarama.CompressionNone
|
|
case "gzip":
|
|
compressType = sarama.CompressionGZIP
|
|
case "snappy":
|
|
compressType = sarama.CompressionSnappy
|
|
case "lz4":
|
|
compressType = sarama.CompressionLZ4
|
|
default:
|
|
panic(fmt.Sprintf("cannot support kafka compress type: %s", compress))
|
|
}
|
|
|
|
fmt.Printf("kafka message compress type: %s", compress)
|
|
params := make(map[string]string)
|
|
params["kafka-server"] = kafkaServer
|
|
params["kafka-group-id"] = kafkaGroupID
|
|
params["kafka-async-topic"] = asyncTopic
|
|
params["kafka-sync-topic"] = syncTopic
|
|
for param := range params {
|
|
if len(params[param]) < 1{
|
|
panic(fmt.Sprintf("%s cannot be empty", param))
|
|
}
|
|
}
|
|
}
|
|
|
|
func NewKafkaExporter() (ke *kafkaExporter) {
|
|
checkParams()
|
|
ke = &kafkaExporter{}
|
|
conf := sarama.NewConfig()
|
|
conf.Producer.Return.Successes = true
|
|
conf.ClientID = kafkaGroupID
|
|
conf.Producer.Compression = compressType
|
|
addrs := strings.Split(kafkaServer, ",")
|
|
syncProducer, err := sarama.NewSyncProducer(addrs, conf)
|
|
if err != nil {
|
|
panic(err.Error())
|
|
}
|
|
ke.syncProducer = syncProducer
|
|
|
|
asyncProducer, err := sarama.NewAsyncProducer(addrs, conf)
|
|
if err != nil {
|
|
panic(err.Error())
|
|
}
|
|
ke.asyncProducer = asyncProducer
|
|
ke.asyncTopic = asyncTopic
|
|
ke.syncTopic = syncTopic
|
|
|
|
go func() {
|
|
errors := ke.asyncProducer.Errors()
|
|
success := ke.asyncProducer.Successes()
|
|
for {
|
|
select {
|
|
case err := <-errors:
|
|
if err != nil {
|
|
log.Error(err.Error())
|
|
}
|
|
|
|
case <-success:
|
|
}
|
|
}
|
|
}()
|
|
return
|
|
}
|
|
|
|
func (ke *kafkaExporter) Export (qp model.QueryPiece) (err error){
|
|
defer func() {
|
|
if err != nil {
|
|
log.Errorf("export with kafka failed <-- %s", err.Error())
|
|
}
|
|
}()
|
|
|
|
if qp.NeedSyncSend() {
|
|
// log.Debugf("deal ddl: %s\n", *qp.String())
|
|
|
|
msg := &sarama.ProducerMessage {
|
|
Topic: ke.syncTopic,
|
|
Value: sarama.ByteEncoder(qp.Bytes()),
|
|
}
|
|
_, _, err = ke.syncProducer.SendMessage(msg)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
} else {
|
|
// log.Debugf("deal non ddl: %s", *qp.String())
|
|
msg := &sarama.ProducerMessage {
|
|
Topic: ke.asyncTopic,
|
|
Value: sarama.ByteEncoder(qp.Bytes()),
|
|
}
|
|
|
|
ke.asyncProducer.Input() <- msg
|
|
}
|
|
|
|
return
|
|
}
|