Kafka로 메시지와 이벤트 처리하기 - (2) Python으로 consumer, producer 만들기
새로운 Topic을 생성하자. blog-post 라는 Topic을 생성 해서 예제로 사용 해 보고자 한다.
bin/kafka-topics.sh --create --topic blog-post --bootstrap-server localhost:9092
virtualenv venv soruce venv/bin/activate
(venv) $ pip3 install kafka-python
10000개의 데이터를 Queue에 보내는데 얼마나 걸리는지 측정 해 보기 위해 time을 사용 해 보았다.
import time from kafka import KafkaProducer # producer 객체 생성 # acks 0 -> 빠른 전송우선, acks 1 -> 데이터 정확성 우선 producer = KafkaProducer(acks=0, compression_type='gzip',bootstrap_servers=['localhost:9092']) start = time.time() for i in range(10000): producer.send('blog-post',b'Kafka Blog Post Event Message') producer.flush() #queue에 있는 데이터를 보냄 end = time.time() - start print(end)
from kafka import KafkaConsumer, consumer # consumer 객체 생성 consumer = KafkaConsumer( 'blog-post', bootstrap_servers=['127.0.0.1:9092'], auto_offset_reset='earliest', enable_auto_commit=True, consumer_timeout_ms=1000 ) while True: for message in consumer: print(message.topic, message.partition, message.offset, message.key, message.value)
> python producer.py 3.8476390838623047
> python consumer.py ... blog-post 0 100 None b'Kafka Blog Post Event Message' blog-post 0 101 None b'Kafka Blog Post Event Message' blog-post 0 102 None b'Kafka Blog Post Event Message' blog-post 0 103 None b'Kafka Blog Post Event Message' ...