优草派  >   Python

python如何连接kafka?这篇文章教会你连接方法

陈立鑫            来源:优草派

很多小伙伴想知道在python中如何连接kafka?今天实现方法它来了!小编就通过这篇文章来给大家分享一个python连接kafka的操作方法,如果有兴趣的小伙伴一定要耐心阅读完这篇文章。

python如何连接kafka?这篇文章教会你连接方法

python连接kafka的详细方法如下:

(1)首先我们需要kafka-python安装:

# PyPI安装
 pip install kafka-python
  
 # conda安装
 conda install -c conda-forge kafka-python
  
 # anaconda自带pip安装
 /root/anaconda3/bin/pip install kafka-python

(2)kafka-python生产者

producer.py

#!/usr/bin/env python
 # -*- coding: utf-8 -*-
 import datetime
 import json
 import time
 import uuid
 from kafka import KafkaProducer
 from kafka.errors import KafkaError
 producer = KafkaProducer(bootstrap_servers='100.69.222.221:9092,100.69.222.222:9092,100.69.222.223:9092')
 topic = 'test_20181105'
 def test():
     print('begin')
     try:
         n = 0
         while True:
             dic = {}
             dic['id'] = n
             n = n + 1
             dic['myuuid'] = str(uuid.uuid4().hex)
             dic['time'] = datetime.datetime.now().strftime("%Y%m%d %H:%M:%S")
             producer.send(topic, json.dumps(dic).encode())
             print("send:" + json.dumps(dic))
             time.sleep(0.5)
     except KafkaError as e:
         print(e)
     finally:
         producer.close()
         print('done')
 if __name__ == '__main__':
     test()

服务器集群中配置好Kafka, 修改上面程序中的ip地址和端口号, 执行python脚本就可以成功将消息发送到 topic: test_20181105

send:{"id": 1411, "myuuid": "a25a3d0361f94d3b8fffd5967ab5df01", "time": "20181105 16:11:14"}
 send:{"id": 1412, "myuuid": "784efd5389564194941240dca66233b6", "time": "20181105 16:11:14"}
 send:{"id": 1413, "myuuid": "6a211195319e447aa559614662f70590", "time": "20181105 16:11:15"}
 send:{"id": 1414, "myuuid": "2cc45bd82baf4a1cb41ea4786e50a0df", "time": "20181105 16:11:15"}
 send:{"id": 1415, "myuuid": "b7dfed4919c74164b83cf3ec28e257b6", "time": "20181105 16:11:16"}
 send:{"id": 1416, "myuuid": "9218eceb17834c228f5ab01ca7595272", "time": "20181105 16:11:16"}
 send:{"id": 1417, "myuuid": "c2751c54c390453f9eedd417fb1e5a31", "time": "20181105 16:11:17"}
 send:{"id": 1418, "myuuid": "9bbc4ef2cfbb42148332eb979b1142cb", "time": "20181105 16:11:17"}
 send:{"id": 1419, "myuuid": "f4998a862494445c976137793b55ed73", "time": "20181105 16:11:18"}

(3)kafka-python消费者

consumer.py

#!/bin/env python
 from kafka import KafkaConsumer
 # connect to Kafka server and pass the topic we want to consume
 consumer = KafkaConsumer('test_20181105',group_id = 'test_group2', bootstrap_servers='100.69.222.221:9092,100.69.222.222:9092,100.69.222.223:9092')
 try:
     for msg in consumer:
         print(msg)
         # print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
 except KeyboardInterrupt as e:
     print(e)

同样修改上面的Ip地址和端口号,就可以接收 topic: test_20181105上的消息:

ConsumerRecord(topic='test_20181105', partition=1, offset=951, timestamp=1541405600340, timestamp_type=0, key=None, value=b'{"id": 1663, "myuuid": "0f744021b2d9468886908ee6685a0fdb", "time": "20181105 16:13:20"}', checksum=1357895145, serialized_key_size=-1, serialized_value_size=87)
 ConsumerRecord(topic='test_20181105', partition=0, offset=935, timestamp=1541405600841, timestamp_type=0, key=None, value=b'{"id": 1664, "myuuid": "9379f68f656644bdb2d30911f06240e4", "time": "20181105 16:13:20"}', checksum=-715594646, serialized_key_size=-1, serialized_value_size=87)
 ConsumerRecord(topic='test_20181105', partition=1, offset=952, timestamp=1541405601341, timestamp_type=0, key=None, value=b'{"id": 1665, "myuuid": "f4a5fa5b32cd4b7991612b626bea4b0e", "time": "20181105 16:13:21"}', checksum=-2068072013, serialized_key_size=-1, serialized_value_size=87)

可以通过设置不同的group_id 来实现消息队列或消息订阅:

如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.

如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.

以上就是小编给大家带来的在python连接kafka的操作方法,希望大家通过阅读小编的文章之后能够有所收获!如果大家觉得小编的文章不错的话,可以多多分享给有需要的人。

【原创声明】凡注明“来源:优草派”的文章,系本站原创,任何单位或个人未经本站书面授权不得转载、链接、转贴或以其他方式复制发表。否则,本站将依法追究其法律责任。
TOP 10
  • 周排行
  • 月排行