54 lines
1.2 KiB
Python
54 lines
1.2 KiB
Python
import json
|
|
from kafka import KafkaConsumer, KafkaProducer
|
|
|
|
|
|
|
|
kafka_server = '192.168.XX.XX:9091'
|
|
|
|
group_id = 'sniffer'
|
|
topic = 'non_ddl_sql_collector'
|
|
|
|
|
|
def check_consume():
|
|
conf = {
|
|
'bootstrap_servers': kafka_server,
|
|
'client_id': group_id,
|
|
'group_id': group_id,
|
|
'auto_offset_reset': 'earliest',
|
|
'session_timeout_ms': 60000,
|
|
'api_version': (0, 9, 0, 1)
|
|
}
|
|
consumer = KafkaConsumer(topic, **conf)
|
|
print('ready to consume')
|
|
for msg in consumer:
|
|
event = json.loads(bytes.decode(msg.value))
|
|
print(event)
|
|
|
|
|
|
def check_produce():
|
|
conf = {
|
|
'bootstrap_servers': kafka_server,
|
|
'client_id': group_id
|
|
}
|
|
# 'api_version': (0, 9, 0, 1)
|
|
producer = KafkaProducer(**conf)
|
|
try:
|
|
future = producer.send(topic, 'haha')
|
|
result = future.get(timeout=3)
|
|
print('send OK')
|
|
print(result)
|
|
|
|
except BaseException as e:
|
|
print('send failed')
|
|
# 发送失败时,用户需根据业务逻辑做异常处理,否则消息可能会丢失
|
|
print(str(e))
|
|
|
|
|
|
def _real_main():
|
|
# check_produce()
|
|
check_consume()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
_real_main()
|