본문 바로가기
Study/Data engineer

[Kafka] AWS EC2에 설치하기

by 리노 Linho 2022. 11. 18.

시작하며

본 내용은 2022년 11월 16일 Boaz 세션에서 발표된 내용입니다

글 목표

AWS EC2에 Kafka를 직접 설치하고 토픽, 프로듀서, 컨슈머를 설치하기

프레젠테이션

from datetime import datetime
import threading, time

from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic

#카프카 프로듀서 운영 객체
class Producer():
    bootstrap_servers='43.201.95.87:9092'
    def sendMessage(self,topic_name:str, message:str):
        '''Set Message to Producer
            :param str topic_name: Name of the topic
            :param str message: a message you want to send
            :returns: producer send result
        '''
        producer = KafkaProducer(bootstrap_servers=Producer.bootstrap_servers)
        producer.send(topic_name, message.encode('utf-8'))
        producer.close()

#카프카 컨슈머 객체
class Consumer():
    bootstrap_servers = '43.201.95.87:9092'
    def getMessage(self, topic_name:str):
      '''Get Consumer All Message
          :param str topic_name: Name of the topic
          :returns: consmer message list
      '''
      consumer = KafkaConsumer(bootstrap_servers=Consumer.bootstrap_servers,
                                auto_offset_reset='earliest',
                                consumer_timeout_ms=1000)
      consumer.subscribe([topic_name])
      for message in consumer:
        print("Topic: ",message.topic,"Message: ",message.value)
      consumer.close()

#카프카 토픽 운영 객체
class KafkaTopicAdministrator:
    kafkaAdmin = KafkaAdminClient(bootstrap_servers='43.201.95.87:9092')

    def setKafkaTopic(self, topic_name: str): # Set topic
        '''Set topic list
            :param str topic_name: Name of the topic you want to create
            :returns: create result
        '''
        if not self.isKafkaTopic(topic_name):
            topic = NewTopic(name=topic_name,
                             num_partitions=1,
                             replication_factor=1)
            return KafkaTopicAdministrator.kafkaAdmin.create_topics([topic])
        raise Exception("already exist")

    def getKafkaTopicList(self) -> list:
        '''Get topic list
            :returns: topic List
        '''
        return KafkaTopicAdministrator.kafkaAdmin.list_topics()

    def deleteKafkaTopic(self, topic_name: str) -> list:
        '''Check if topic exists
            :param str topic_name: Name of the topic you want to remove
            :returns: delete result
        '''   
        if self.isKafkaTopic(topic_name):
            return KafkaTopicAdministrator.kafkaAdmin.delete_topics([topic_name])
        raise Exception("no topic")

    def deleteKafkaTopicAll(self):
        '''Delete All Topic
            :returns: void
        '''
        for topic in self.getKafkaTopicList():
          KafkaTopicAdministrator.kafkaAdmin.delete_topics([topic])

    def isKafkaTopic(self, topic_name: str):
        '''Check if topic exists
            :param str topic_name: The topic name you want to check
            :returns: topic lists
        '''  
        if topic_name in KafkaTopicAdministrator.kafkaAdmin.list_topics():
            return True
        return False


if __name__ == "__main__":
    #Kafka Topic Manager
    kafkaAdmin = KafkaTopicAdministrator() # Kafka 관리자 객체 생성
    kafkaAdmin.setKafkaTopic("my-kafka-example") # Kafka 토픽 생성
    kafkaAdmin.deleteKafkaTopic("my-kafka-example") # Kafka 토픽 제거
    print(kafkaAdmin.getKafkaTopicList()) #Kafka 토픽 리스트

    # Kafka Producer & Consumer
    producer=Producer()
    producer.sendMessage("my-kafka","Kafka")
    consumer=Consumer()
    consumer.getMessage("my-kafka")