반응형

 

 

안녕하세요. 빅분기 실기 독학을 위해 준비를 해야할 것이 바로  개발환경 준비하기 입니다.

 

파이썬을 이용하여 시험을 보시는 분들을 위해 2가지 방법 안내해드리겠습니다.

 

두가지 방법의 특징을 한마디로 요약하자면

내 컴퓨터에 설치하시겠어요?  구글 계정으로 설치없이 쓰시겠어요?

 

각 방법의 장단점으로는

컴퓨터에 설치를 하면, 인터넷없이 사용가능합니다. 단점으로는 컴퓨터 사양이 낮은 경우.. 컴퓨터가 느려진다는 점입니다.

 

반대로 구글 계정으로 파이썬 사용하는 경우

구글계정만 있으면 어떤 컴퓨터에서도 사용이 가능합니다. (기존에 작업하던 코드를 어디서든 볼 수 있음)

단점으로는 항상 인터넷이 연결되어야 한다는 점입니다.

 

 

 

두가지 방법을 안내해드릴테니 상황에 맞게 사용하시길 바랍니다.

 

1. 컴퓨터에 파이썬 개발환경 설치하기

https://wikidocs.net/254150

 

2장. 파이썬 개발 환경

다음은 이 장에서 배우게 될 내용이다. 2.1 파이썬 설치 2.2 파이썬 가상환경 2.3 Anaconda 설치 2.4 Jupyter Notebook 사용 이 장을 통해 파이썬…

wikidocs.net

 

2. 구글 colab 이용하기

https://wikidocs.net/141502

 

01. 구글 코랩 (colab)

[TOC] # 구글 코랩 (colab) 소개 그럼 이제 다운로드 받은 데이터 분석을 시작해볼까요!? 데이터를 분석하는 툴은 그 목적에 따라 매우 다양합니다. 프로그래밍을 기반으…

wikidocs.net

 

 

반응형
반응형

안녕하세요. 빅데이터 분석기사 실기를 준비하기 전

 

어떤 시험인지, 응시료는 얼마인지 등 정리를 하는 시간이 필요할것 같다고 생각해서 이 글을 작성합니다.

 

 

 

 

 

일정

25년도 빅분기 일정입니다. 필기 2회, 실기 2회가 진행됩니다.

필기를 합격하고 2년 이내에 실기를 따지 않으면...

필기 유효기간이 만료되어 처음부터 필기를 다시 준비해야하는 불상사가 일어납니다...!!

 

저는 24년상반기에 필기를 땄기 때문에 올해 안에 빅분기 실기를 따기 위해 10회차를 도전하고,,

혹시 떨어지게 된다면 11회차 필기도 준비하려 합니다.

사실 작년에 실기 한번 봤는데 너무 낮은 점수를 받아서 이번에는 부디 10회차에 붙기를...ㅎㅎ

 

 

 

아직 원서접수도 시작 안했지만,, 이렇게 일찍 시작한 이유는

6월에 정보보안기사 필기를 병행하기 위함입니다!

 

하나는 파이썬 실기 / 하나는 이론위주의 시험이기 떄문에 병행을 할 수 있지 않을까 생각하는 것은

굉장한 착각일수도 있지만... 직장인으로서  보안기사를 따기는 따야하는 상황이라 

 

 

합격 컷

아래 내용처럼 실기는 과락 없이 100점 만점에 60점만 맞으면 됩니다.

 

교재

교재는 작년에 샀던 24년 이기적 빅분기 실기 python 입니다.

현재는 절판이 되어 구매가 불가하지만,, 구버전임에도 내용은 비슷할거라 생각하고.. 이책으로 공부하려 합니다.

사실 책이 없어도 공부하는데 지장은 없지만.. 

 

 

 

공부법

1. 기본기 잡기

먼저 유튜브에 올라가 있는 2시간에 완성하는 빅분기 실기 강의를 1회독 하면서 

어떤 시험인지 감을 잡으려 합니다.

유튜브 최신 무료 인강

실제시험환경 체험하기 

 

2. 그리고 나서 개념을 한번 보려고 합니다.

평일에는 출퇴근 길에 유튜브를 통해 개념을 익히고, 회사에서 몰래몰래 실기 실습을 따라해보려 합니다.

 

이기적 무료인강 24년도 버전

1유형 문제풀이 강의

2유형 문제풀이 강의

3유형 문제풀이 강의

실습 링크 : 실기 실습은 아래 링크로 해볼까 합니다.

  - https://www.datamanim.com/dataset/99_pandas/pandasMain.html

 

광고가 너무 많아서, 정신은 없지만 무료이기에 감사할 따름 

구글 코랩에 옮겨 적으면서 실습해보려고 합니다!

 

 

3.  주말에는 모의고사를 1회차씩 풀어제낄 것입니다.

모의고사는 이기적 책에 총3회분이 있고, 그거 외에도 인터넷에서 좀 찾아보고 자료를 수집할 예정입니다.

 

모의고사 1회차

모의고사 2회차

 

 

 

다짐

빅분기 실기는 코드를 최대한 암기하는 방식으로 진행하려 합니다.

문제가 다 비슷하게 나오고, 필기처럼 이거 같은데?? 하면서 찍을 수 있는 문제가 아니기 때문에

반드시 코드를 스스로 칠 수 있을만큼 암기가 필요합니다.

 

진짜 이렇게 계획대로만 성실하게 한다면 실기를 딸수 있을거 같죠? ㅋㅋㅋ

아자아자 힘내봅시다

 

 

 

반응형
반응형


사전 점수결과가 발표되었습니다.

시험공부를 얼마 못해서 떨어질수도 있었겠다고 생각했는데
운이 좋게도 합격점수입니다.

맞는 것은? 틀린 것은? 에 따라 실수한 것은 없는지 마지막까지 확인했고

공부를 하지 않은 부분에서 출제된 문제는
상식적으로 틀린 보기들을 소거해나가며 풀어서

합격 점수를 얻은게 아닌가 싶습니다.



2025년 다음 자격증 후보로는 아래 리스트 중
2개~3개만 도전해볼 예정입니다.

1. ISRM (4월)
2. 정보보안기사필기 (5월)
3. 빅데이터분석기사 실기 (6월)
4. 정보보안필기 (7월)
5. SQLP (8월)
6. DAP (9월)
7. 공인중개사 1,2차 (10월)


반응형
반응형

오늘은 월요일, 퇴근후 자격증 공부를 합니다.

 

과목1. 전사 아키텍처에 대해 공부할 예정

 

시작 전, 나의 목표는?

적은 시간 투자해서 최대 효율을 내자.

그러기 위해서는 키워드 위주로 빠르게 1회독을 한다!

 

 

전사?

회사 전체? 인 것 같다. 영어로는 Enterprise

 

아키텍처?

구조, 아마도 시스템 구조, 데이터 구조 등등을 의미하겠지. 영어로는 Architecture

 

오늘의 공부 끝...!!!! 이면 좋겠지만 아직 한참 남았다.

 

 

 

 

여기서 부터는 키워드 위주로 빠르게 스캔합니다.

 

전사 아키텍처란, 기업의 정보화 관련 설계도를 뜻한다. 약자로 EA

 

사실 스타트업이야 하나의 사업이 곧 회사 전체시스템이고 그게 바로 EA 겠지만

 

회사 규모가 커지면 사업부서가 늘어나고, 이것 저것 확장하면서 여러개의 EA가 생길 것이다.

 

즉, 다시말해 

 

전사라는 개념은 하나의 시스템일수도, 여러 개의 시스템으로 구성된 하나의 단위 시스템일수도 있다.

 

따라서 프로젝트 초기에 '전사 아키텍처'가 무엇인지 정의하고 시작해야 헷갈리지 않을 것이다.

 

 

 

아키텍처 3요소

1. 모델 : 비즈니스 모델

2. 규칙 : 일관성을 위해

3. 계획 : 현재 → 목표 아키텍처로 나아가기 위해

(사람은 아키텍처 요소가 아님)

 

EA 프레임워크 3요소

1. 정책 : 아키텍처 정책은, 이상적인 목표를 세우는 것

2. 정보 : 어떤 관점(시스템 담당자, 기획자 등등)에서 도메인 4요소(비즈니스, 데이터, 어플리케이션, 기술)를 바라보는 것. 

3. 관리 : 현재 아키텍처를 어떻게 효율적으로 관리하느냐

 

도메인 4요소

1. BA 비즈니스 관점 : 기업의 경영목표

2. AA 어플리케이션 관점 : 업무 관련 어플리케이션 체계화

3. DA 데이터 관점 : 데이터 구조를 체계화

4. TA 기술 관점 : 기술 인프라를 체계화

 

참조모델 6요소

1. 성과 참조모델

2. 업무 참조모델

3. 서비스 참조모델

4. 데이터 참조모델

5. 기술 참조모델

 

참조모델에 대해서는 다음에 알아봐야겠다;;

 

 

마지막이다.

 

전사 아키텍처 프로세스

 

1. 비전 수립

나아가야할 방향성 정하기 위해 환경분석이 필요하다. EA정보를 정의하기 위해, 아키텍처 매트릭스를 만든다.

아래처럼 의사결정유형(관점)과 정보유형(뷰) 두 축을 기준으로 2차원 매트릭스를 만든다.

  업무 데이터 응용 기술
계획자 조직구성도/정의서
업무구성도/정의서
데이터구성도/정의서 응용시스템구성도/정의서 표준프로파일
기반구조 구성도/정의서
기술지원목록
책임자 업무관계도/기술서
업무기능 분할도/기술서
개념데이터 관계도
데이터 교환 기술서
응용시스템 관계도
응용기능 분할도
기반 구조 관계도
설계자 업무절차 설계서 논리데이터 설계서
데이터 교환 설계서
응용 기능 설계서 기반 구조 설계서
개발자 업무매뉴얼 물리데이터 모델 응용프로그램 목록 제품목록

 

2. 구축 : 

자료 수집 → 현행 아키텍처 정보구축 → 목표 아키텍처 정보구축

 

3. 관리 : 

도입한 다음에 얼마나 잘 관리하고 활용하기 위해

관리 조직 / 관리 프로세스 / 관리 인력을 포함한다.

 

4. 활용 :

구축된 전사 아키텍처 정보를 바탕으로,  의사 결정이 이루어지고 IT자원관리가 이를 기준으로 이루어질 수 있도록

전사 아키텍처 활용을 활성화하려는 적극적인 노력이 필요하다.

 

 

 

 

오늘은 여기까지입니다. 남은 화수목금 화이팅.

 

반응형
반응형

목적:
회사에서 데이터관리시스템 구축 예정인데, 업무상 도움이 될거 같아 준비하게 되었음

준비물:
책 2권 구매


비용:
응시료 5만원


자격증 개요


문항수

50개

목차

과목1. 전사아키텍처 (10문항)
전사, 아키텍처 각각 뭔지 개념만 알면 된다
분량 몇페이지 안됨. 몇번 읽고 문제풀기

과목2. 데이터 요건분석 (10문항)
어떤 데이터가 필요하지? 그럼 어떻게 수집하지?를 배운다
표가 많아서 암기양은 적음. 2-3번 훑어보기

과목3. 데이터표준화 (10문항)
데이터를 일단 모았다면? 팀/부서별로 단어, 코드, 기준 등 다르게 사용하지 않도록 통일된 기준이 필요하다

과목4. 데이터 모델링(20문항)
대망의 모델링 파트! SQLD 랑 유사한거 같은데
개논물( 개념모델링/논리적 모델링/ 물리적 모델링)에 대해 배운다


과락 기준

100점 만점에 60점 이상 (50문제 중 30개는 맞아야지)

과목별 40점 이상 과락개수
1~3과목 4/10개 아래로, 4과목 8/20개 아래로 틀리면 과락


반응형
반응형

 

진행기간 : 2024. 01. 22. ~ 202.02.14

 

목표: 독거노인 1인 가구의 사회복지사 방문 외 시간 때 응급상황을 알릴 수 있도록 위급 행동 구별

 

팀: 4명

 

 


개요

2021년 고독사 사망자 수는 총 3,378명으로 최근 5년간 증가 추세

 

고령 사회의 빠른 도래와 독거노인 가구의 급증으로 인해 고령자의 삶의 질 저하와 고령자 지원을 위한 사회 공공지출의 급격한 증가가 예상되고 있다. 이러한 사회 문제에 대한 해결책의 하나로 로봇이 고령자와 함께 생활하면서 고령자를 이해하고 정서적으로 교류하면서 상황에 맞는 건강, 생활, 인지, 정서 서비스를 제공하기 위해 필요한 로봇 지능기술의 개발이 요구되고 있다.


로봇이 휴먼에게 적절한 휴먼케어 서비스를 제공하기 위해서는 시시각각 변하는 휴먼에 대한 정보를 높은 신뢰도로 인식할 수 있는 능력이 기본적으로 필요하다. 휴먼 정보 인식 기술 중에서 휴먼이 행하고 있는 동작이 어떤 행동인지를 파악하는 행동 인식 기술은 고령자가 일생 생활에서 행하는 행위의 의도를 이해하고 고령자의 생활 패턴을 파악하기 위한 필수 기술이다.

고독사, 가족, 친척 등 주변사람들과 단절된 채 홀로 사는 사람

자살·병사 등으로 혼자 임종을 맞음

시신이 일정한 시간이 흐른 뒤에 발견되는 죽음

 

매년 남성 고독사가 여성 고독사에 비해 4배 이상 많으며,

가장 많은 비중을 차지하는 연령은 50∼60대 (매년 50% 이상)로 확인

 

※ 최근 5년 연평균 증가율 : 남성 고독사 10.0%, 여성 고독사 5.6%

 

 


 

 

데이터셋

[ETRI 나눔] 로봇환경에서 고령자의 일상행동 인식을 위한 3D 영상 데이터셋 - Skeleton, BodyIndex

사이트 연결

 

 

1. RGB video 동영상 데이터

 

2. Depth map frame 이미지 (불필요)

 

3. body index frames 이미지 (불필요)

 

4. 3D skeletal data csv파일

 

 

데이터셋 상세 정보

 

 

데이터셋 label 정보

70세 이상의 고령자 53명의 자택을 방문하여 기상부터 취침까지의 하루 행동을 직접 관찰하고 기록하였더니 총 245개의 일상 활동 유형으로 압축되었다. 이들 행동 중에 빈번하게 나타나는 행동으로 TV 시청, 식사관련 활동, 화장실 사용, 식사 준비, 전화 통화, 약 복용, 요리, 청소 등이 있었으며 이러한 다빈도 활동들을 기준으로 55종의 행동을 인식 대상으로 선정하였다.

 

반응형
반응형

 

 

Chat GPT에게 물어본 결과

 

  1. 다수의 파티션 활용:
    • Kafka 토픽은 여러 파티션으로 나눠져 있습니다. 컨슈머 그룹을 사용하여 각 컨슈머 인스턴스가 특정 파티션을 병렬로 소비하도록 설정함으로써 데이터를 효과적으로 분산하고 병렬로 처리할 수 있습니다.
  2. 여러 프로듀서 및 컨슈머 활용:
    • 여러 프로듀서가 동시에 메시지를 생성하고 여러 컨슈머가 동시에 메시지를 소비함으로써 Kafka의 스루풋을 증가시킬 수 있습니다. 이를 통해 대량의 데이터를 빠르게 처리할 수 있습니다.
  3. 병렬 처리 프레임워크 사용:
    • Kafka의 데이터를 처리할 때, Spark, Flink, Storm 등과 같은 병렬 처리 프레임워크를 사용하여 데이터를 분산하고 처리할 수 있습니다. 이러한 프레임워크는 다양한 데이터 처리 작업에 특화되어 있어, 대용량 데이터를 효과적으로 다룰 수 있습니다.
  4. 멀티스레딩 및 멀티프로세싱 활용:
    • 컨슈머 또는 프로듀서 코드 내에서 멀티스레딩 또는 멀티프로세싱을 사용하여 여러 작업을 동시에 수행하도록 구성할 수 있습니다. 이는 특히 데이터 전처리나 추가 비즈니스 로직을 병렬로 처리할 때 유용합니다.
  5. 카프카 스트림스 활용:
    • Kafka Streams는 Kafka를 기반으로 하는 스트리밍 처리 라이브러리로, 데이터를 실시간으로 처리하는 데 사용됩니다. Kafka Streams를 활용하면 데이터를 효과적으로 처리하고 원하는 형태로 변환할 수 있습니다.
  6. 컨슈머 그룹 및 파티션 확장:
    • 병렬 처리를 높이기 위해 컨슈머 그룹을 확장하거나 토픽의 파티션 수를 조절하는 방법도 있습니다. 이를 통해 높은 처리량과 빠른 응답 시간을 달성할 수 있습니다.

 

 

이 중에서 

1번, 2번, 4번, 6번 직접 테스트 가능


직렬

병렬처리 하지 않았을 때, 하나의 프로듀서가 하나의 파티션에 데이터 저장 (직렬 전송)

 

[프로듀서 실행시간 측정]

producer.py

더보기
#!/usr/bin/env python

import sys
import time
from random import choice
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Producer

if __name__ == '__main__':
    # Record the start time
    start_time = time.time()

    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])

    # Create Producer instance
    producer = Producer(config)

    # Optional per-message delivery callback (triggered by poll() or flush())
    # when a message has been successfully delivered or permanently
    # failed delivery (after retries).
    def delivery_callback(err, msg):
        if err:
            print('ERROR: Message failed delivery: {}'.format(err))
        else:
            print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
                topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))

    # Produce data by selecting random values from these lists.
    topic = "purchases"
    user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
    products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']

    count = 0
    for _ in range(100):  # Change the range to 100 for 100 messages
        user_id = choice(user_ids)
        product = choice(products)
        producer.produce(topic, product, user_id, callback=delivery_callback)

        # Introduce a delay of 0.1 seconds
        time.sleep(0.1)
        count += 1

    # Block until the messages are sent.
    producer.flush()

    # Record the end time
    end_time = time.time()

    # Calculate and print the total execution time
    total_time = end_time - start_time
    print(f'Total execution time: {total_time:.2f} seconds')

 

하나의 프로듀서가 0.1 초가 걸리는 데이터 전송을 100개 전송했을 때 10.95 seconds 소요됨.

 

 

 

 

[컨슈머 실행시간 측정 ]

consumer.py 

더보기
#!/usr/bin/env python

import sys
import time
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Consumer, OFFSET_BEGINNING

if __name__ == '__main__':
    # Record the start time
    start_time = time.time()
    start_receive_time = time.time()

    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    parser.add_argument('--reset', action='store_true')
    args = parser.parse_args()

    # Parse the configuration.
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])
    config.update(config_parser['consumer'])

    # Create Consumer instance
    consumer = Consumer(config)

    # Set up a callback to handle the '--reset' flag.
    def reset_offset(consumer, partitions):
        if args.reset:
            for p in partitions:
                p.offset = OFFSET_BEGINNING
            consumer.assign(partitions)

    # Subscribe to topic
    topic = "purchases"
    consumer.subscribe([topic], on_assign=reset_offset)
    wait = False

    # Poll for new messages from Kafka and print them.
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # Initial message consumption may take up to
                # `session.timeout.ms` for the consumer group to
                # rebalance and start consuming
                print("Waiting...")

                if not wait:
                    # Calculate and print the waiting time
                    end_receive_time = time.time()
                    transfer_duration = end_receive_time - start_receive_time
                    print(f'Receive duration: {transfer_duration-1:.2f} seconds')
                    wait = True
            elif msg.error():
                print("ERROR: {}".format(msg.error()))
            else:
                # Extract the (optional) key and value, and print.
                start_waiting_time = time.time()  # Record the start time of waiting
                print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(
                    topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
                time.sleep(0.1)
                wait = False

    except KeyboardInterrupt:
        pass
    finally:
        # Leave group and commit final offsets
        consumer.close()

        # Record the end time
        end_time = time.time()

        # Calculate and print the total execution time
        total_time = end_time - start_time
        print(f'Total execution time: {total_time:.2f} seconds')

하나의 컨슈머가 0.1 초가 걸리는 데이터 전송을 100개 수신했을 때 11.65 seconds 소요됨.

 

 

 

 

병렬 시스템

 

1번. 하나의 파티션을 두개의 consumer에서 소비하는 병렬방식

consumer_1.py 

(config 파일인 "getting_started.ini" 안에는 group.id = group_1 가 지정되어 있음)

더보기
#!/usr/bin/env python

import sys
import time
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Consumer, KafkaError

if __name__ == '__main__':
    # Record the start time
    start_time = time.time()
    start_receive_time = time.time()

    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])
    config.update(config_parser['consumer'])

    # Set unique group.id for each consumer group
    config['group.id'] = f"{config['group.id']}_{time.time()}"

    # Create Consumer instance
    consumer = Consumer(config)

    # Subscribe to the topic
    topic = "purchases"
    consumer.subscribe([topic])
    wait = False

    # Poll for new messages from Kafka and print them.
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # No new messages within the timeout
                print("Waiting...")

                if not wait:
                    # Calculate and print the waiting time
                    end_receive_time = time.time()
                    transfer_duration = end_receive_time - start_receive_time
                    print(f'Receive duration: {transfer_duration - 1:.2f} seconds')
                    wait = True
            elif msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event (not an error)
                    continue
                else:
                    print("ERROR: {}".format(msg.error()))
            else:
                # Extract the (optional) key and value, and print.
                print("[Consumer 1] Consumed event from topic {topic}, partition {partition}, offset {offset}: key = {key:12} value = {value:12}".format(
                    topic=msg.topic(), partition=msg.partition(), offset=msg.offset(),
                    key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
                time.sleep(0.1)
                if wait:
                    wait = False
                    start_receive_time = time.time()

    except KeyboardInterrupt:
        pass
    finally:
        # Leave group and commit final offsets
        consumer.close()

        # Record the end time
        end_time = time.time()

        # Calculate and print the total execution time
        total_time = end_time - start_time
        print(f'Total execution time: {total_time:.2f} seconds')

consumer_2.py

(config 파일인 "getting_started.ini" 안에는 group.id = group_2 가 지정되어 있음)

더보기
#!/usr/bin/env python

import sys
import time
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Consumer, KafkaError

if __name__ == '__main__':
    # Record the start time
    start_time = time.time()
    start_receive_time = time.time()

    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])
    config.update(config_parser['consumer'])

    # Set unique group.id for each consumer group
    config['group.id'] = f"{config['group.id']}_{time.time()}"

    # Create Consumer instance
    consumer = Consumer(config)

    # Subscribe to the topic
    topic = "purchases"
    consumer.subscribe([topic])
    wait = False

    # Poll for new messages from Kafka and print them.
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # No new messages within the timeout
                print("Waiting...")

                if not wait:
                    # Calculate and print the waiting time
                    end_receive_time = time.time()
                    transfer_duration = end_receive_time - start_receive_time
                    print(f'Receive duration: {transfer_duration - 1:.2f} seconds')
                    wait = True
            elif msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event (not an error)
                    continue
                else:
                    print("ERROR: {}".format(msg.error()))
            else:
                # Extract the (optional) key and value, and print.
                print("[Consumer 2] Consumed event from topic {topic}, partition {partition}, offset {offset}: key = {key:12} value = {value:12}".format(
                    topic=msg.topic(), partition=msg.partition(), offset=msg.offset(),
                    key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
                time.sleep(0.1)
                if wait:
                    wait = False
                    start_receive_time = time.time()
    except KeyboardInterrupt:
        pass
    finally:
        # Leave group and commit final offsets
        consumer.close()

        # Record the end time
        end_time = time.time()

        # Calculate and print the total execution time
        total_time = end_time - start_time
        print(f'Total execution time: {total_time:.2f} seconds')

 

 

[ 컨슈머에서의 병렬처리 ]

2개의 컨슈머가 0.1 초가 걸리는 데이터 전송을 100개 수신했을 때 각각 10.73 seconds 소요됨. 

 

 

 

2번. 여러 프로듀서가 동시에 메시지 생성 및 여러 파티션에 저장 / 여러 컨슈머가 병렬로 메시지 소비

producer_parallel.py

더보기
#!/usr/bin/env python

import sys
import time
from random import choice
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Producer
import threading

def produce_messages(producer, topic, user_ids, products, count):
    for _ in range(count):
        user_id = user_ids
        product = choice(products)
        producer.produce(topic, product, user_id, callback=delivery_callback)
        time.sleep(0.1)

def delivery_callback(err, msg):
    if err:
        print('ERROR: Message failed delivery: {}'.format(err))
    else:
        print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
            topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))

if __name__ == '__main__':
    # Record the start time
    start_time = time.time()

    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])

    # Create Producer instances
    producer1 = Producer(config)
    producer2 = Producer(config)

    # Produce data by selecting random values from these lists.
    topic = "purchases"
    data_1 = ['APPLE', 'BANANA', 'COCONUT']
    data_2 = ['dog', 'cat', 'giraffe', 'pig']

    count = 50  # Each producer will produce 50 messages

    # Create threads for each producer
    thread1 = threading.Thread(target=produce_messages, args=(producer1, topic, 'producer_1', data_1, count))
    thread2 = threading.Thread(target=produce_messages, args=(producer2, topic, 'producer_2', data_2, count))

    # Start the threads
    thread1.start()
    thread2.start()

    # Wait for the threads to finish
    thread1.join()
    thread2.join()

    # Block until the messages are sent.
    producer1.flush()
    producer2.flush()

    # Record the end time
    end_time = time.time()

    # Calculate and print the total execution time
    total_time = end_time - start_time
    print(f'Total execution time: {total_time:.2f} seconds')

 

[ 프로듀서에서의 병렬처리하여 하나의 파티션에 데이터 저장 ]

멀티스레딩 이용하여 프로듀서를 병렬처리

 

2개의 프로듀서가 0.1 초가 걸리는 데이터 전송을 카프카에 100개 저장했을 때 5.48 seconds 소요됨. 

병렬처리 했기 떄문에 프로듀서 1개일때보다 소요시간이 1/2 단축되었음.

 

 

단, 2개의 프로듀서가 하나의 파티션에 저장하기 때문에, 라운드로빈 형태로 저장되는 것을 아래에서 확인할 수 있다.

 

 

 

 

[ 프로듀서에서의 병렬처리하여 2개의 파티션에 데이터 저장 ]

producer_parallel_partition.py

더보기
#!/usr/bin/env python

import sys
import time
from random import choice
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Producer
import threading

def produce_messages(producer, topic, user_ids, products, count, partition):
    for _ in range(count):
        user_id = user_ids
        product = choice(products)
        producer.produce(topic, key=user_id, value=product, partition=partition, callback=delivery_callback)
        time.sleep(0.1)

def delivery_callback(err, msg):
    if err:
        print('ERROR: Message failed delivery: {}'.format(err))
    else:
        print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
            topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))

if __name__ == '__main__':
    # Record the start time
    start_time = time.time()

    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])

    # Create Producer instances
    producer1 = Producer(config)
    producer2 = Producer(config)

    # Produce data by selecting random values from these lists.
    topic = "purchases_partition"

    data_1 = ['APPLE', 'BANANA', 'COCONUT']
    data_2 = ['dog', 'cat', 'giraffe', 'pig']

    count = 50  # Each producer will produce 50 messages

    # Create threads for each producer
    thread1 = threading.Thread(target=produce_messages, args=(producer1, topic, 'producer_1', data_1, count, 0))
    thread2 = threading.Thread(target=produce_messages, args=(producer2, topic, 'producer_2', data_2, count, 1))

    # Start the threads
    thread1.start()
    thread2.start()

    # Wait for the threads to finish
    thread1.join()
    thread2.join()

    # Block until the messages are sent.
    producer1.flush()
    producer2.flush()

    # Record the end time
    end_time = time.time()

    # Calculate and print the total execution time
    total_time = end_time - start_time
    print(f'Total execution time: {total_time:.2f} seconds')

consumer1_partition.py

더보기
#!/usr/bin/env python

import sys
import time
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Consumer, KafkaError, TopicPartition

if __name__ == '__main__':
    # Record the start time
    start_time = time.time()
    start_receive_time = time.time()

    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])
    config.update(config_parser['consumer'])

    # Set unique group.id for each consumer group
    config['group.id'] = f"{config['group.id']}_{time.time()}"

    # Create Consumer instance
    consumer = Consumer(config)

    # Specify the topic and partition to consume from
    topic = "purchases_partition"
    partition = 0  # Replace with the desired partition

    # Assign the consumer to the specified topic and partition
    consumer.assign([TopicPartition(topic, partition)])

    wait = False

    # Poll for new messages from Kafka and print them.
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # No new messages within the timeout
                print("Waiting...")

                if not wait:
                    # Calculate and print the waiting time
                    end_receive_time = time.time()
                    transfer_duration = end_receive_time - start_receive_time
                    print(f'Receive duration: {transfer_duration - 1:.2f} seconds')
                    wait = True
            elif msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event (not an error)
                    continue
                else:
                    print("ERROR: {}".format(msg.error()))
            else:
                # Extract the (optional) key and value, and print.
                print("[Consumer 1] Consumed event from topic {topic}, partition {partition}, offset {offset}: key = {key:12} value = {value:12}".format(
                    topic=msg.topic(), partition=msg.partition(), offset=msg.offset(),
                    key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
                time.sleep(0.1)
                if wait:
                    wait = False
                    start_receive_time = time.time()

    except KeyboardInterrupt:
        pass
    finally:
        # Leave group and commit final offsets
        consumer.close()

        # Record the end time
        end_time = time.time()

        # Calculate and print the total execution time
        total_time = end_time - start_time
        print(f'Total execution time: {total_time:.2f} seconds')

consumer2_partition.py

더보기
#!/usr/bin/env python

import sys
import time
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Consumer, KafkaError, TopicPartition

if __name__ == '__main__':
    # Record the start time
    start_time = time.time()
    start_receive_time = time.time()

    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])
    config.update(config_parser['consumer'])

    # Set unique group.id for each consumer group
    config['group.id'] = f"{config['group.id']}_{time.time()}"

    # Create Consumer instance
    consumer = Consumer(config)

    # Specify the topic and partition to consume from
    topic = "purchases_partition"
    partition = 1  # Replace with the desired partition

    # Assign the consumer to the specified topic and partition
    consumer.assign([TopicPartition(topic, partition)])

    wait = False

    # Poll for new messages from Kafka and print them.
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # No new messages within the timeout
                print("Waiting...")

                if not wait:
                    # Calculate and print the waiting time
                    end_receive_time = time.time()
                    transfer_duration = end_receive_time - start_receive_time
                    print(f'Receive duration: {transfer_duration - 1:.2f} seconds')
                    wait = True
            elif msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event (not an error)
                    continue
                else:
                    print("ERROR: {}".format(msg.error()))
            else:
                # Extract the (optional) key and value, and print.
                print("[Consumer 2] Consumed event from topic {topic}, partition {partition}, offset {offset}: key = {key:12} value = {value:12}".format(
                    topic=msg.topic(), partition=msg.partition(), offset=msg.offset(),
                    key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
                time.sleep(0.1)
                if wait:
                    wait = False
                    start_receive_time = time.time()

    except KeyboardInterrupt:
        pass
    finally:
        # Leave group and commit final offsets
        consumer.close()

        # Record the end time
        end_time = time.time()

        # Calculate and print the total execution time
        total_time = end_time - start_time
        print(f'Total execution time: {total_time:.2f} seconds')

프로듀서1, 프로듀서2 데이터를 각각 컨슈머1, 컨슈머2에게 전달하고자 할 때, 파티션을 나눈다.

그리고 이를 병렬처리하여 전달하면 throughput을 향상시킬 수 있다.

 

2개의 프로듀서가 0.1 초가 걸리는 데이터 전송을 파티션 2개에 각각 50개씩 저장했을 때 5.48 seconds 소요됨. 

2개의 컨슈머가 0.1초 걸리는 데이터 수신을 파티션 2개에 각각 50개씩 가져왔을 때 5.33 seconds 소요됨. 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

https://developer.confluent.io/tutorials/

 

Apache Kafka® Tutorials and Recipes by Confluent

Apache Kafka Tutorials: Discover recipes and tutorials that bring your idea to proof-of-concept. Learn stream processing the simple way.

developer.confluent.io

 

 

참고 자료)

파이썬 이용한 카프카 스트림??까지 있는 듯?

https://www.confluent.io/blog/getting-started-with-apache-kafka-in-python/

 

Get Started with Apache Kafka in Python

In this Kafka-Python tutorial, learn basic concepts, how to produce and consume data, and use stream processing functions to enable real-time data streaming and analytics with examples.

www.confluent.io

 

 

Apache Kafka 성능에 대한 벤치마크 테스트 및 결과

https://developer.confluent.io/learn/kafka-performance/

 

Apache Kafka® Performance, Latency, Throughout, and Test Results

Benchmark Apache Kafka performance, throughput, and latency in a repeatable, automated way with OpenMessaging benchmarking on the latest cloud hardware.

developer.confluent.io

 

반응형
반응형

1. Confluent 선택한 이유

- 로컬 노트북에 카프카를 깔고 싶지 않았고..

- 그렇다고 AWS에서 직접 EC2 여러대 만들어서 카프카 클러스터 만들고 싶지 않았음.. 어려우니까ㅠ

- 무엇보다 metric을 확인하면서 병렬처리의 성능을 평가해야하는데 Confluent에서는 이걸 쉽게 확인 가능했음..

- 즉, 내 능력이 부족하므로 플랫폼을 활용하는 것이 효율적이라 생각했음..!

- 가장 중요했던 건... 무료 60일간 400$ 이라는 것.

- 초급자가 카프카 배우기에 안성맞춤 플랫폼

 

 

2. 컨플루언트 튜토리얼 찾았음.. 

자바, 파이썬, Go, .NET, Node.js, C/C++, RestAPI, SpringBoot 등 다양한 언어도 된 튜토리얼 있음.

https://docs.confluent.io/platform/current/clients/index.html

 

Build Client Applications for Confluent Platform | Confluent Documentation

Programming languages you use to build applications that interact with Kafka provide a set of APIs and libraries. A Kafka client library provides functions, classes, and utilities that allow developers to create Kafka producer clients (Producers) and consu

docs.confluent.io

 

 

- 여기서 파이썬으로 된 카프카 클라이언트 문서 따라서 실습해볼 예정.

 

Apache Kafka and Python - Getting Started Tutorial

How to run a Kafka client application written in Python that produces to and consumes messages from a Kafka cluster, complete with step-by-step instructions and examples.

developer.confluent.io

 

 


 

튜토리얼 따라하기 과정

 

1. 카프카 클러스터 생성

Confluent Cloud Console에서 클러스터 하나 만든다.

 

 

2. 파이썬 프로젝트 생성 및 라이브러리 설치

mkdir kafka-python-getting-started && cd kafka-python-getting-started

# Python 3.x (Install the Kafka library)
pip install confluent-kafka

 

 

 

3. 카프카 클러스터 정보 복사

Confluent Cloud Console 에서 클러스터의 Bootstrap server 의 endpoint 복사해두기

 

 

 

 

4. 글로벌 키 생성 & config 파일 생성

 

(1) Key와 Secret 복사하기 

 

(2) config 정보 담긴 파일 만들기

getting_started.ini 파일 만들어서 아래 코드 넣기

[default]
bootstrap.servers={{ kafka.broker.server }}
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=< CLUSTER API KEY >
sasl.password=< CLUSTER API SECRET >

[consumer]
group.id=python_example_group_1

# 'auto.offset.reset=earliest' to start reading from the beginning of
# the topic if no committed offsets exist.
auto.offset.reset=earliest

 

(주의) 

bootstrap.servers={{ kafka.broker.server }}


# 에러코드
%3|1702203218.131|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://'pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092/b]: sasl_ssl://'pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092/bootstrap: Failed to resolve ''pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092': 알려진 호스트가 없습니다. (after 8ms in state CONNECT)

여기서 { } 중괄호 없고, 따옴표도 없고 키 값만 넣어야 함.

안그러면 인식 제대로 못해서 에러 발생

 

sasl.username=< CLUSTER API KEY >
sasl.password=< CLUSTER API SECRET >

# 에러코드
%3|1702205150.434|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092/bo]: sasl_ssl://pkc-e82om.ap-northeast-2.aws.confluent.cloud:9092/bootstrap: SASL authentication error: Authentication failed (after 5116ms in state AUTH_REQ, 5 identical error(s) suppressed)

 

여기도 마찬가지로 < > 괄호 없고, 따옴표도 없고 키 값만 넣어야 함.

안그러면 인식 제대로 못해서 에러 발생

 

 

 

5. 토픽 생성

토픽명은 purchases

파티션 개수는 1개

 

 

6. 카프카 프로듀서 만들기

producer.py 파일 만들어서 아래 코드 저장

더보기
#!/usr/bin/env python

import sys
from random import choice
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Producer

if __name__ == '__main__':
    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])

    # Create Producer instance
    producer = Producer(config)

    # Optional per-message delivery callback (triggered by poll() or flush())
    # when a message has been successfully delivered or permanently
    # failed delivery (after retries).
    def delivery_callback(err, msg):
        if err:
            print('ERROR: Message failed delivery: {}'.format(err))
        else:
            print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
                topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))

    # Produce data by selecting random values from these lists.
    topic = "purchases"
    user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
    products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']

    count = 0
    for _ in range(10):

        user_id = choice(user_ids)
        product = choice(products)
        producer.produce(topic, product, user_id, callback=delivery_callback)
        count += 1

    # Block until the messages are sent.
    producer.poll(10000)
    producer.flush()

 

 

 

7. 카프카 컨슈머 만들기

consumer.py 파일 만들어서 아래 코드 저장

더보기
#!/usr/bin/env python

import sys
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Consumer, OFFSET_BEGINNING

if __name__ == '__main__':
    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    parser.add_argument('--reset', action='store_true')
    args = parser.parse_args()

    # Parse the configuration.
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])
    config.update(config_parser['consumer'])

    # Create Consumer instance
    consumer = Consumer(config)

    # Set up a callback to handle the '--reset' flag.
    def reset_offset(consumer, partitions):
        if args.reset:
            for p in partitions:
                p.offset = OFFSET_BEGINNING
            consumer.assign(partitions)

    # Subscribe to topic
    topic = "purchases"
    consumer.subscribe([topic], on_assign=reset_offset)

    # Poll for new messages from Kafka and print them.
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # Initial message consumption may take up to
                # `session.timeout.ms` for the consumer group to
                # rebalance and start consuming
                print("Waiting...")
            elif msg.error():
                print("ERROR: %s".format(msg.error()))
            else:
                # Extract the (optional) key and value, and print.

                print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(
                    topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
    except KeyboardInterrupt:
        pass
    finally:
        # Leave group and commit final offsets
        consumer.close()

 

 

 

8. 카프카 프로듀서 이벤트 생성하기

 

chmod 파일 권한변경 명령어

u+x : user에게 excution(실행) 권한 부여 후 파이썬 코드 실행

chmod u+x producer.py

./producer.py getting_started.ini

 

 

 

9. 카프카 컨슈머 이벤트 생성하기

chmod 파일 권한변경 명령어

u+x : user에게 excution(실행) 권한 부여 후 파이썬 코드 실행

chmod u+x consumer.py

./consumer.py getting_started.ini

 

 

 

 

 


 

최종 결과

 

 

생성파일

 

 

출력 화면

반응형

'백엔드 > Kafka' 카테고리의 다른 글

[Kafka] 심화 (병렬처리 / 스트림즈 / 커넥트 )  (0) 2023.12.10
[Kafka] 개념  (1) 2023.12.06

+ Recent posts