반응형

1. Confluent 선택한 이유

- 로컬 노트북에 카프카를 깔고 싶지 않았고..

- 그렇다고 AWS에서 직접 EC2 여러대 만들어서 카프카 클러스터 만들고 싶지 않았음.. 어려우니까ㅠ

- 무엇보다 metric을 확인하면서 병렬처리의 성능을 평가해야하는데 Confluent에서는 이걸 쉽게 확인 가능했음..

- 즉, 내 능력이 부족하므로 플랫폼을 활용하는 것이 효율적이라 생각했음..!

- 가장 중요했던 건... 무료 60일간 400$ 이라는 것.

- 초급자가 카프카 배우기에 안성맞춤 플랫폼

 

 

2. 컨플루언트 튜토리얼 찾았음.. 

자바, 파이썬, Go, .NET, Node.js, C/C++, RestAPI, SpringBoot 등 다양한 언어도 된 튜토리얼 있음.

https://docs.confluent.io/platform/current/clients/index.html

 

Build Client Applications for Confluent Platform | Confluent Documentation

Programming languages you use to build applications that interact with Kafka provide a set of APIs and libraries. A Kafka client library provides functions, classes, and utilities that allow developers to create Kafka producer clients (Producers) and consu

docs.confluent.io

 

 

- 여기서 파이썬으로 된 카프카 클라이언트 문서 따라서 실습해볼 예정.

 

Apache Kafka and Python - Getting Started Tutorial

How to run a Kafka client application written in Python that produces to and consumes messages from a Kafka cluster, complete with step-by-step instructions and examples.

developer.confluent.io

 

 


 

튜토리얼 따라하기 과정

 

1. 카프카 클러스터 생성

Confluent Cloud Console에서 클러스터 하나 만든다.

 

 

2. 파이썬 프로젝트 생성 및 라이브러리 설치

mkdir kafka-python-getting-started && cd kafka-python-getting-started

# Python 3.x (Install the Kafka library)
pip install confluent-kafka

 

 

 

3. 카프카 클러스터 정보 복사

Confluent Cloud Console 에서 클러스터의 Bootstrap server 의 endpoint 복사해두기

 

 

 

 

4. 글로벌 키 생성 & config 파일 생성

 

(1) Key와 Secret 복사하기 

 

(2) config 정보 담긴 파일 만들기

getting_started.ini 파일 만들어서 아래 코드 넣기

[default]
bootstrap.servers={{ kafka.broker.server }}
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=< CLUSTER API KEY >
sasl.password=< CLUSTER API SECRET >

[consumer]
group.id=python_example_group_1

# 'auto.offset.reset=earliest' to start reading from the beginning of
# the topic if no committed offsets exist.
auto.offset.reset=earliest

 

(주의) 

bootstrap.servers={{ kafka.broker.server }}


# 에러코드
%3|1702203218.131|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://'pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092/b]: sasl_ssl://'pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092/bootstrap: Failed to resolve ''pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092': 알려진 호스트가 없습니다. (after 8ms in state CONNECT)

여기서 { } 중괄호 없고, 따옴표도 없고 키 값만 넣어야 함.

안그러면 인식 제대로 못해서 에러 발생

 

sasl.username=< CLUSTER API KEY >
sasl.password=< CLUSTER API SECRET >

# 에러코드
%3|1702205150.434|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092/bo]: sasl_ssl://pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092/bootstrap: SASL authentication error: Authentication failed (after 5116ms in state AUTH_REQ, 5 identical error(s) suppressed)

 

여기도 마찬가지로 < > 괄호 없고, 따옴표도 없고 키 값만 넣어야 함.

안그러면 인식 제대로 못해서 에러 발생

 

 

 

5. 토픽 생성

토픽명은 purchases

파티션 개수는 1개

 

 

6. 카프카 프로듀서 만들기

producer.py 파일 만들어서 아래 코드 저장

더보기
#!/usr/bin/env python

import sys
from random import choice
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Producer

if __name__ == '__main__':
    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])

    # Create Producer instance
    producer = Producer(config)

    # Optional per-message delivery callback (triggered by poll() or flush())
    # when a message has been successfully delivered or permanently
    # failed delivery (after retries).
    def delivery_callback(err, msg):
        if err:
            print('ERROR: Message failed delivery: {}'.format(err))
        else:
            print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
                topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))

    # Produce data by selecting random values from these lists.
    topic = "purchases"
    user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
    products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']

    count = 0
    for _ in range(10):

        user_id = choice(user_ids)
        product = choice(products)
        producer.produce(topic, product, user_id, callback=delivery_callback)
        count += 1

    # Block until the messages are sent.
    producer.poll(10000)
    producer.flush()

 

 

 

7. 카프카 컨슈머 만들기

consumer.py 파일 만들어서 아래 코드 저장

더보기
#!/usr/bin/env python

import sys
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Consumer, OFFSET_BEGINNING

if __name__ == '__main__':
    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    parser.add_argument('--reset', action='store_true')
    args = parser.parse_args()

    # Parse the configuration.
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])
    config.update(config_parser['consumer'])

    # Create Consumer instance
    consumer = Consumer(config)

    # Set up a callback to handle the '--reset' flag.
    def reset_offset(consumer, partitions):
        if args.reset:
            for p in partitions:
                p.offset = OFFSET_BEGINNING
            consumer.assign(partitions)

    # Subscribe to topic
    topic = "purchases"
    consumer.subscribe([topic], on_assign=reset_offset)

    # Poll for new messages from Kafka and print them.
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # Initial message consumption may take up to
                # `session.timeout.ms` for the consumer group to
                # rebalance and start consuming
                print("Waiting...")
            elif msg.error():
                print("ERROR: %s".format(msg.error()))
            else:
                # Extract the (optional) key and value, and print.

                print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(
                    topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
    except KeyboardInterrupt:
        pass
    finally:
        # Leave group and commit final offsets
        consumer.close()

 

 

 

8. 카프카 프로듀서 이벤트 생성하기

 

chmod 파일 권한변경 명령어

u+x : user에게 excution(실행) 권한 부여 후 파이썬 코드 실행

chmod u+x producer.py

./producer.py getting_started.ini

 

 

 

9. 카프카 컨슈머 이벤트 생성하기

chmod 파일 권한변경 명령어

u+x : user에게 excution(실행) 권한 부여 후 파이썬 코드 실행

chmod u+x consumer.py

./consumer.py getting_started.ini

 

 

 

 

 


 

최종 결과

 

 

생성파일

 

 

출력 화면

반응형

'백엔드 > Kafka' 카테고리의 다른 글

[Kafka] 심화 (병렬처리 / 스트림즈 / 커넥트 )  (0) 2023.12.10
[Kafka] 개념  (1) 2023.12.06
반응형

컨슈머

configs : 어느 카프카 브로커에서 데이터를 가져올지 선언.

consumer.subscribe() 메서드: 어느 토픽에서 데이터를 가져올지 설정

 

> 0.5초 동안 watit 

> records 반환 ( 이 데이터는 프로듀서가 카프카에 전송한 데이터)

이를 무한루프로 반복.

 

 

consumer.assign() 메서드: 토픽의 어느 파티션에서 데이터를 가져올지 설정

 

 

파티션 개수와 컨슈머 개수의 관계성

 

토픽 내 파티션 개수는 무조건 컨슈머 개수보다 많아야 함.

여러 파티션을 가진 토픽에 대해서, 컨슈머를 병렬처리 하고 싶다면 파티션 수보다 적은 개수로 실행해야 함.

 

 

 

컨슈머에서의 병렬처리

컨슈머 그룹별로, 토픽별로 offset을 저장하기 때문에 병렬처리가 가능하다.

 

 

프로듀서에서의 병렬처리

데이터를 해키(키)를 이용해 특정 파티션에만 저장할 수 있음. 

만약 파티션 추가하면 병렬처리가 깨짐

 

 

 

 





 

 

 

1. 카프카 스트림즈

대용량, 폭발적인 성능의 실시간 데이터 처리를 위한 기능의 컴포넌트

 

2. 카프카 커넥트

반복적으로 파이프라인을 배포하고 관리하는 기능의 컴포넌트

 

커넥트Connect:

커넥터가 동작하도록 실행해주는 프로세스. 먼저 실행되어야 커넥터를 실행할 수 있음.
json 형태로 토픽과 테이블명을 지정해 rest API호출하면 파이프라인이 만들어지는 과정.

1) 단일 실행 모드 커넥트
 간단한 데이터 파이프라인, 또는 개발용으로 사용된다.

2) 분산 모드 커넥트
실제 상용할때 쓰이는 모드. 여러 프로세스를 하나의 클러스터로 묶은 것. 두개 이상의 커넥트가 하나의 클러스터로 묶이는 것. 일부 클러스터에 장애발생에도 다른 커넥트 이용해 데이터 처리가능하다.

 

 

커넥터 Connector: 

실질적으로 데이터를 처리하는 코드가 담긴 jar 패키지. 
동작/설정/실행 메서드가 들어있음. 

1) 싱크 커넥터 (sink connector)
특정 토픽에 있는 데이터를 어딘가로 sink한다. 특정 저장소(오라클, elastic search 등)으로 저장하는 역할. (컨슈머 같은 역할)

2) 소스 커넥터 (source connector)
데이터베이스로부터 데이터를 가져와서 토픽에 저장하는 역할 ( 프로듀서 같은 역할)

 

 

 

 

반응형

'백엔드 > Kafka' 카테고리의 다른 글

[Kafka] Confluent 플랫폼 이용한 카프카 실습 (with python)  (0) 2023.12.10
[Kafka] 개념  (1) 2023.12.06
반응형

 

아파치 카프카

무엇
데이터를 전송하는 source와 데이터를 받는 target이 점점 많아지면 데이터 전송 관계들이 복잡해진다.
이 복잡함을 해결하기 위해 등장한 오픈소스
장점
낮은 지연과 높은 처리량으로 효과적으로 빅데이터 처리가 가능하다. 

 

 

 

카프카 클라이언트

 

카프카 프로듀서

- 토픽에 해당하는 메시지를 생성

- 특정 토픽으로 데이터를 publish

- 처리 실패/재시도 

 

카프카

- 데이터를 담는 큐와 같은 역할을 한다고 보면 된다.
- 데이터 흐름에 있어서 fault tolerant

- 가용성으로 서버이슈 발생해도 데이터를 복구할 수 있음

카프카 컨슈머

- 토픽 내 파티션의 데이터를 가져옴. (polling)

- 파티션의 오프셋 위치를 기록 (commit)

- 컨슈머 그룹을 통해 병렬처리 가능 (파티션의 개수에 따라 컨슈머를 여러개 만들면 병렬처리가 가능해진다.)

 

 

 

토픽

데이터를 담는 공간을 토픽이라고 한다.

토픽은 여러개 생성할 수 있다.  DB의 테이블/ 파일시스템의 폴더 같은 역할을 한다. 

Producer는 토픽에 데이터를 저장하고, Consumer는 토픽에서 데이터를 가져간다. 

토픽 이름은 데이터의 특징을 반영하게 짓는 것이 유지보수에 유리하다. (click_log 이런식)

 

파티션

토픽 내에 여러개 파티션으로 구성할 수 있다.
하나의 파티션은 큐와 같이 데이터가 차곡차곡 쌓이고, Consumer는 가장 오래된 데이터부터 가져간다.  
(Consumer가 record를 가져가도 데이터는 삭제되지 않음. 새로운 Consumer 들어오면 처음부터 가져간다)

만약 파티션이 2개라면, 데이터를 저장할 때 어느 파티션에 저장할지 키를 지정할 수 있다.
파티션을 늘리면 Consumer를 늘려서 분산처리를 할 수 있다.  (파티션은 늘리는 것이 가능하지만, 줄일 수는 없다.)
파티션 데이터의 삭제는 보존기간과 보존크기를 미리 정하면 일정 기간/ 용량동안 저장되었다가 삭제될 수 있다.

 

 

브로커

카프카가 저장되어 있는 서버를 의미. 보통 3개 이상의 브로커로 구성함.

만약 브로커가 3대라면, 그 중 한대에 토픽정보가 저장된다.

복제 (Replication)
replication이 3이라면, 파티션은 원본 1대와 복제본 2대로 구성된다.  (leader partition 1, follower partition 2)
만약 브로커가 사용불가하게 되면, 리더 파티션은 못쓰게 되도 팔로워 파티션으로 복구 가능해진다.
ack는 0, 1, all  셋 중 하나를 사용할 수 있다. 
0 - 리더파티션에 데이터 전송 후, 잘 전송됐는지 확인하지는 않는다. 데이터 전달 속도 빠르지만 유실 가능성 있음.
1 - 리더파티션에 데이터 전송 후, 잘 전송됐는지 확인한다. 팔로워 파티션에 복제됐는지 확인하지 않음.
all - 리더파티션에 데이터 전송 후, 잘 전송됐는지 확인한다. 복제 잘 됐는지 응답까지 확인 (너무 느림..)
ISR (In-Sync-Replication)
원본 + 복제본 합쳐서 ISR 이라고 지칭한다.

 

 

파티셔너

파티션을 더 효과적으로 쓰기 위해 파티셔너 존재함.

데이터를 토픽의 어떤 파티션에 넣을지 결정하는 역할을 한다.

메시지의 키에 따라 특정 해시값이 결정되고, 이를 기반으로 저장될 파티션이 결정된다. 

 

 

컨슈머 랙 (consumer lag)

 

카프카 운영함에 있어 모니터링 해야하는 지표 중 하나.

프로듀서가 마지막으로 저장한 오프셋과 컨슈머가 마지막으로 읽은 오프셋의 차이

 

카프카 프로듀서 : 토픽의 파티션에 데이터를 저장하는데 오프셋 0부터 시작한다.

카프카 컨슈머 : 데이터를 오프셋0번부터 가져간다. 

 

 

컨슈머그룹이 1개이고, 만약 파티션이 2개라면 lag은 2개가 측정된다.

그중 큰 값이 records lag max 값이다.

 

 

 

카프카 버로우 (Burrow)

컨슈머 랙 모니터링 애플리케이션

 

1. 버로우 1대 이용하여, 멀티 카프카 클러스터 지원

2. 윈도우 wanrning, error

3. HTTP api를 제공한다.

 

 

 

카프카 /  레빗엠큐RabbitMQ / 레디스 큐의 차이

메시지 브로커: RedisQ, RabbitMQ

대규모 메시지 기반 미들웨어, 전송이 되고 나면 삭제된다.

 

이벤트 브로커: 카프카, AWS의 키네시스

이벤트를 보존한다. 서비스에서 나오는 이벤트를 DB에 저장하듯이 큐에 저장한다.

1. 한번 일어난 이벤트도 저장하여 "단일진실공급원"으로서 사용이 가능하다.

2. 장애가 발생했을때 그 이전부터 재처리 할 수 있음.

3. 많은 양의 스트림 데이터를 실시간으로 처리할 수 있다.

 

반응형

+ Recent posts