반응형

1. Confluent 선택한 이유

- 로컬 노트북에 카프카를 깔고 싶지 않았고..

- 그렇다고 AWS에서 직접 EC2 여러대 만들어서 카프카 클러스터 만들고 싶지 않았음.. 어려우니까ㅠ

- 무엇보다 metric을 확인하면서 병렬처리의 성능을 평가해야하는데 Confluent에서는 이걸 쉽게 확인 가능했음..

- 즉, 내 능력이 부족하므로 플랫폼을 활용하는 것이 효율적이라 생각했음..!

- 가장 중요했던 건... 무료 60일간 400$ 이라는 것.

- 초급자가 카프카 배우기에 안성맞춤 플랫폼

 

 

2. 컨플루언트 튜토리얼 찾았음.. 

자바, 파이썬, Go, .NET, Node.js, C/C++, RestAPI, SpringBoot 등 다양한 언어도 된 튜토리얼 있음.

https://docs.confluent.io/platform/current/clients/index.html

 

Build Client Applications for Confluent Platform | Confluent Documentation

Programming languages you use to build applications that interact with Kafka provide a set of APIs and libraries. A Kafka client library provides functions, classes, and utilities that allow developers to create Kafka producer clients (Producers) and consu

docs.confluent.io

 

 

- 여기서 파이썬으로 된 카프카 클라이언트 문서 따라서 실습해볼 예정.

 

Apache Kafka and Python - Getting Started Tutorial

How to run a Kafka client application written in Python that produces to and consumes messages from a Kafka cluster, complete with step-by-step instructions and examples.

developer.confluent.io

 

 


 

튜토리얼 따라하기 과정

 

1. 카프카 클러스터 생성

Confluent Cloud Console에서 클러스터 하나 만든다.

 

 

2. 파이썬 프로젝트 생성 및 라이브러리 설치

mkdir kafka-python-getting-started && cd kafka-python-getting-started

# Python 3.x (Install the Kafka library)
pip install confluent-kafka

 

 

 

3. 카프카 클러스터 정보 복사

Confluent Cloud Console 에서 클러스터의 Bootstrap server 의 endpoint 복사해두기

 

 

 

 

4. 글로벌 키 생성 & config 파일 생성

 

(1) Key와 Secret 복사하기 

 

(2) config 정보 담긴 파일 만들기

getting_started.ini 파일 만들어서 아래 코드 넣기

[default]
bootstrap.servers={{ kafka.broker.server }}
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=< CLUSTER API KEY >
sasl.password=< CLUSTER API SECRET >

[consumer]
group.id=python_example_group_1

# 'auto.offset.reset=earliest' to start reading from the beginning of
# the topic if no committed offsets exist.
auto.offset.reset=earliest

 

(주의) 

bootstrap.servers={{ kafka.broker.server }}


# 에러코드
%3|1702203218.131|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://'pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092/b]: sasl_ssl://'pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092/bootstrap: Failed to resolve ''pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092': 알려진 호스트가 없습니다. (after 8ms in state CONNECT)

여기서 { } 중괄호 없고, 따옴표도 없고 키 값만 넣어야 함.

안그러면 인식 제대로 못해서 에러 발생

 

sasl.username=< CLUSTER API KEY >
sasl.password=< CLUSTER API SECRET >

# 에러코드
%3|1702205150.434|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092/bo]: sasl_ssl://pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092/bootstrap: SASL authentication error: Authentication failed (after 5116ms in state AUTH_REQ, 5 identical error(s) suppressed)

 

여기도 마찬가지로 < > 괄호 없고, 따옴표도 없고 키 값만 넣어야 함.

안그러면 인식 제대로 못해서 에러 발생

 

 

 

5. 토픽 생성

토픽명은 purchases

파티션 개수는 1개

 

 

6. 카프카 프로듀서 만들기

producer.py 파일 만들어서 아래 코드 저장

더보기
#!/usr/bin/env python

import sys
from random import choice
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Producer

if __name__ == '__main__':
    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])

    # Create Producer instance
    producer = Producer(config)

    # Optional per-message delivery callback (triggered by poll() or flush())
    # when a message has been successfully delivered or permanently
    # failed delivery (after retries).
    def delivery_callback(err, msg):
        if err:
            print('ERROR: Message failed delivery: {}'.format(err))
        else:
            print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
                topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))

    # Produce data by selecting random values from these lists.
    topic = "purchases"
    user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
    products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']

    count = 0
    for _ in range(10):

        user_id = choice(user_ids)
        product = choice(products)
        producer.produce(topic, product, user_id, callback=delivery_callback)
        count += 1

    # Block until the messages are sent.
    producer.poll(10000)
    producer.flush()

 

 

 

7. 카프카 컨슈머 만들기

consumer.py 파일 만들어서 아래 코드 저장

더보기
#!/usr/bin/env python

import sys
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Consumer, OFFSET_BEGINNING

if __name__ == '__main__':
    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    parser.add_argument('--reset', action='store_true')
    args = parser.parse_args()

    # Parse the configuration.
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])
    config.update(config_parser['consumer'])

    # Create Consumer instance
    consumer = Consumer(config)

    # Set up a callback to handle the '--reset' flag.
    def reset_offset(consumer, partitions):
        if args.reset:
            for p in partitions:
                p.offset = OFFSET_BEGINNING
            consumer.assign(partitions)

    # Subscribe to topic
    topic = "purchases"
    consumer.subscribe([topic], on_assign=reset_offset)

    # Poll for new messages from Kafka and print them.
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # Initial message consumption may take up to
                # `session.timeout.ms` for the consumer group to
                # rebalance and start consuming
                print("Waiting...")
            elif msg.error():
                print("ERROR: %s".format(msg.error()))
            else:
                # Extract the (optional) key and value, and print.

                print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(
                    topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
    except KeyboardInterrupt:
        pass
    finally:
        # Leave group and commit final offsets
        consumer.close()

 

 

 

8. 카프카 프로듀서 이벤트 생성하기

 

chmod 파일 권한변경 명령어

u+x : user에게 excution(실행) 권한 부여 후 파이썬 코드 실행

chmod u+x producer.py

./producer.py getting_started.ini

 

 

 

9. 카프카 컨슈머 이벤트 생성하기

chmod 파일 권한변경 명령어

u+x : user에게 excution(실행) 권한 부여 후 파이썬 코드 실행

chmod u+x consumer.py

./consumer.py getting_started.ini

 

 

 

 

 


 

최종 결과

 

 

생성파일

 

 

출력 화면

반응형

'백엔드 > Kafka' 카테고리의 다른 글

[Kafka] 심화 (병렬처리 / 스트림즈 / 커넥트 )  (0) 2023.12.10
[Kafka] 개념  (1) 2023.12.06

+ Recent posts