60 lines
1.8 KiB
Python
60 lines
1.8 KiB
Python
# -*- encoding:utf-8 -*-
|
||
|
||
'''
|
||
@Author : dingjiawen
|
||
@Date : 2023/5/16 20:15
|
||
@Usage : 获取kakfa的producer和Consumer,以及发布信息和订阅信息
|
||
@Desc : 创建kafka的工具类
|
||
'''
|
||
|
||
from kafka import KafkaProducer, KafkaConsumer
|
||
from kafka.errors import kafka_errors
|
||
import traceback
|
||
import json
|
||
|
||
KAFKA_SERVER = ["Ding202:9092", "Ding203:9092", "Ding204:9092"]
|
||
|
||
|
||
def getKafkaProducer(key_serializer=None, value_serializer=None):
|
||
# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
|
||
# producer = KafkaProducer(
|
||
# bootstrap_servers=['192.168.118.202:9092'],
|
||
# key_serializer=lambda v: json.dumps(v).encode('utf-8'),
|
||
# value_serializer=lambda v: json.dumps(v).encode('utf-8'),
|
||
# )
|
||
|
||
producer = KafkaProducer(
|
||
bootstrap_servers=KAFKA_SERVER,
|
||
key_serializer=key_serializer,
|
||
value_serializer=value_serializer
|
||
)
|
||
|
||
return producer
|
||
|
||
def getKafkaProducerTest( key_serializer=None, value_serializer=None):
|
||
# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
|
||
# producer = KafkaProducer(
|
||
# bootstrap_servers=['192.168.118.202:9092'],
|
||
# key_serializer=lambda v: json.dumps(v).encode('utf-8'),
|
||
# value_serializer=lambda v: json.dumps(v).encode('utf-8'),
|
||
# )
|
||
|
||
producer = KafkaProducer(
|
||
bootstrap_servers=KAFKA_SERVER,
|
||
key_serializer=key_serializer,
|
||
value_serializer=value_serializer
|
||
)
|
||
|
||
return producer
|
||
|
||
def getKafkaConsumer(topic,group_id ,key_serializer=None, value_serializer=None):
|
||
# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
|
||
consumer = KafkaConsumer(
|
||
topic,
|
||
group_id=group_id,
|
||
bootstrap_servers=KAFKA_SERVER,
|
||
auto_offset_reset='latest'
|
||
)
|
||
|
||
return consumer
|