self_example/phm_rotate/PHMWarehouse/utils/KafkaUtils.py

60 lines
1.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding:utf-8 -*-
'''
@Author : dingjiawen
@Date : 2023/5/16 20:15
@Usage : 获取kakfa的producer和Consumer以及发布信息和订阅信息
@Desc : 创建kafka的工具类
'''
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
KAFKA_SERVER = ["Ding202:9092", "Ding203:9092", "Ding204:9092"]
def getKafkaProducer(key_serializer=None, value_serializer=None):
# 假设生产的消息为键值对不是一定要键值对且序列化方式为json
# producer = KafkaProducer(
# bootstrap_servers=['192.168.118.202:9092'],
# key_serializer=lambda v: json.dumps(v).encode('utf-8'),
# value_serializer=lambda v: json.dumps(v).encode('utf-8'),
# )
producer = KafkaProducer(
bootstrap_servers=KAFKA_SERVER,
key_serializer=key_serializer,
value_serializer=value_serializer
)
return producer
def getKafkaProducerTest( key_serializer=None, value_serializer=None):
# 假设生产的消息为键值对不是一定要键值对且序列化方式为json
# producer = KafkaProducer(
# bootstrap_servers=['192.168.118.202:9092'],
# key_serializer=lambda v: json.dumps(v).encode('utf-8'),
# value_serializer=lambda v: json.dumps(v).encode('utf-8'),
# )
producer = KafkaProducer(
bootstrap_servers=KAFKA_SERVER,
key_serializer=key_serializer,
value_serializer=value_serializer
)
return producer
def getKafkaConsumer(topic,group_id ,key_serializer=None, value_serializer=None):
# 假设生产的消息为键值对不是一定要键值对且序列化方式为json
consumer = KafkaConsumer(
topic,
group_id=group_id,
bootstrap_servers=KAFKA_SERVER,
auto_offset_reset='latest'
)
return consumer