시작하며
본 내용은 2022년 11월 16일 Boaz 세션에서 발표된 내용입니다
글 목표
AWS EC2에 Kafka를 직접 설치하고 토픽, 프로듀서, 컨슈머를 설치하기
프레젠테이션
from datetime import datetime
import threading, time
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic
#카프카 프로듀서 운영 객체
class Producer():
bootstrap_servers='43.201.95.87:9092'
def sendMessage(self,topic_name:str, message:str):
'''Set Message to Producer
:param str topic_name: Name of the topic
:param str message: a message you want to send
:returns: producer send result
'''
producer = KafkaProducer(bootstrap_servers=Producer.bootstrap_servers)
producer.send(topic_name, message.encode('utf-8'))
producer.close()
#카프카 컨슈머 객체
class Consumer():
bootstrap_servers = '43.201.95.87:9092'
def getMessage(self, topic_name:str):
'''Get Consumer All Message
:param str topic_name: Name of the topic
:returns: consmer message list
'''
consumer = KafkaConsumer(bootstrap_servers=Consumer.bootstrap_servers,
auto_offset_reset='earliest',
consumer_timeout_ms=1000)
consumer.subscribe([topic_name])
for message in consumer:
print("Topic: ",message.topic,"Message: ",message.value)
consumer.close()
#카프카 토픽 운영 객체
class KafkaTopicAdministrator:
kafkaAdmin = KafkaAdminClient(bootstrap_servers='43.201.95.87:9092')
def setKafkaTopic(self, topic_name: str): # Set topic
'''Set topic list
:param str topic_name: Name of the topic you want to create
:returns: create result
'''
if not self.isKafkaTopic(topic_name):
topic = NewTopic(name=topic_name,
num_partitions=1,
replication_factor=1)
return KafkaTopicAdministrator.kafkaAdmin.create_topics([topic])
raise Exception("already exist")
def getKafkaTopicList(self) -> list:
'''Get topic list
:returns: topic List
'''
return KafkaTopicAdministrator.kafkaAdmin.list_topics()
def deleteKafkaTopic(self, topic_name: str) -> list:
'''Check if topic exists
:param str topic_name: Name of the topic you want to remove
:returns: delete result
'''
if self.isKafkaTopic(topic_name):
return KafkaTopicAdministrator.kafkaAdmin.delete_topics([topic_name])
raise Exception("no topic")
def deleteKafkaTopicAll(self):
'''Delete All Topic
:returns: void
'''
for topic in self.getKafkaTopicList():
KafkaTopicAdministrator.kafkaAdmin.delete_topics([topic])
def isKafkaTopic(self, topic_name: str):
'''Check if topic exists
:param str topic_name: The topic name you want to check
:returns: topic lists
'''
if topic_name in KafkaTopicAdministrator.kafkaAdmin.list_topics():
return True
return False
if __name__ == "__main__":
#Kafka Topic Manager
kafkaAdmin = KafkaTopicAdministrator() # Kafka 관리자 객체 생성
kafkaAdmin.setKafkaTopic("my-kafka-example") # Kafka 토픽 생성
kafkaAdmin.deleteKafkaTopic("my-kafka-example") # Kafka 토픽 제거
print(kafkaAdmin.getKafkaTopicList()) #Kafka 토픽 리스트
# Kafka Producer & Consumer
producer=Producer()
producer.sendMessage("my-kafka","Kafka")
consumer=Consumer()
consumer.getMessage("my-kafka")