1. 카프카 시작하기

  • publish/subscribe message

    • 전송자(발행하는 쪽)가 데이터(메시지)를 보낼 때 직접 수신자(구독하는 쪽)로 보내지 않는다.

    • 전송자를 어떤 형태로든 메시지를 분류해서 보내고, 수신자는 분류된 메시지를 구독한다.

    • 대개 발행된 메시지를 전달 받고 중계해주는 중간 지점 역할을 하는 브로커broker가 있다.

  • 아파치 카프카는 메시지 발행/구독 시스템이다.

    • '분산 커밋 로그' or '분산 스트리밍 플랫폼’이라고 불리기도 한다.

    • 파일시스템이나 데이터베이스 커밋 로그commit log는 모든 트랜잭션 기록을 지속성durable있게 보존함으로써 시스템의 상태를 일관성consistency 있게 복구할 수 있도록 고안되었다.

    • 카프카에 저장된 데이터는 순서를 유지한채로 지속성 있게 보관되면 결정적deterministic으로 읽을 수 있다.

    • 확장시 실패하더라도 데이터 사용에는 문제가 없도록 시스템 안에서 데이터를 분산시켜 저장할 수 있다.

  • 일정 기간 동안 메시지를 지속성durability 있게 보관하는 보존retention 기능이 있다.

    • 특정 기간동안 메시지를 보전하거나, 파티션의 크기가 특정 사이즈에 도달할 떄까지 데이터를 보존한다.

메시지message

  • 카프카에서 데이터의 기본 단위. 데이터베이스의 row나 record와 비슷해보일 수 있다. 카프카 입장에서 메시지는 단순히 바이트 배열일 뿐이므로 특정한 형식이나 의미가 없다.

  • 메시지는 key라 불리는 메타데이터를 포함할 수 있다. key는 메시지를 저장할 파티션을 결정하기 위해 사용된다.

  • 특정한 토픽에 쓰여진다.

배치batch

  • 카프카는 효율성을 위해 메시지를 배치 단위로 저장한다. 배치는 토픽의 파티션에 쓰여지는 메시지들의 집합이다.

  • 메시지를 배치 단위로 쓰면 네트워크상 오버헤드를 줄일 수 있다. 허나 지연latency과 처리량throughput 사이에 트레이드오프를 발생시킨다.

스키마

  • 카프카 입장에서 메시지는 단순한 바이트 배열일 뿐이지만, 내용일 이해하기 쉽도록 일정한 구조(혹은 스키마)를 부여하는 것이 권장된다.

  • JSON, XML, Avro(카프카 개발자들이 선호)이 있다.

토픽topic

  • 카프카에 저장되는 메시지는 토픽 단위로 분류된다. 토픽은 다시 여러 개의 파티션partition으로 나눠진다.

  • 파티션에 메시지가 쓰일 땐 append-only 형태로 쓰여진다. 읽을 땐 FIFO로 읽힌다.

  • 토픽에 여러 파티션이 있는 만큼 토픽 안의 메시지 전체에 대해 순서는 보장되지 않으며, 단일 파티션 안에서만 순서가 보장될 뿐이다.

  • 파티션은 복제될 수 있다. 서로 다른 서버들이 동일한 파티션의 복제본을 저장하고 있기 때문에 서버 중 하나에 장애가 발생해도 문제 없다.

  • 스트림stream은 (파티션의 갯수와 상관없이) 하나의 토픽에 저장되는 데이터로 간주되며, 프로듀서producer로부터 컨슈머consumer로의 하나의 데이터 흐름을 나타낸다.

    • 메시지 집합을 스크림이라는 용어로 부르는 것은 …​

    • 이러한 방식의 처리는 데이터를 시간이 흐른 뒤 한꺼번에 대량으로 처리하는 하둡과 같은 오프라인 프레임워크와 대비된다.

  • 토픽에는 로그 압착log compaction 기능을 설정할 수 있는데,같이 키를 갖는 메시지 중 가장 최신의 것만 보존된다.

프로듀서

  • 메시지 생성한다.

  • 발행자publisher 혹은 작성자writer라고도 부른다.

  • 프로듀서가 특정한 파티션을 지정해서 메시지를 쓸 수 있다. 메시지 키와 키값의 해시를 특정 파티션으로 대응시켜주는 파티셔너partitioner를 사용해서 구현한다.

컨슈머

  • 메시지를 읽는다.

  • 구독자subscriber 혹은 독자reader라고도 부른다.

  • 토픽을 구독해서 여기에 저장된 메시지들을 각 파티션에 쓰여진 순서대로 읽어 온다.

  • 컨슈머는 메시지의 오프셋offset을 기록함으로써 어느 메시지까지 읽었는지를 유지한다.

  • 컨슈머는 컨슈머 그룹의 일원consumer group으로 작동한다.

컨슈머 그룹

  • 컨슈머 그룹 단위로 offset을 관리한다.

  • 컨슈머 그룹에 속한 컨슈머들은 자신들이 구독하는 토픽의 파티션들에 대한 소유권을 공유함

  • 컨슈머에 파티션을 재할당reassignment하는 작업은 컨슈머 그룹에 읽고 있는 토픽이 변경되었을 때도 발생한다.

  • 컨슈머에 할당된 파티션을 다른 컨슈머에 할당해주는 작업을 '리밸런스rebalance'라고 함

  • 리밸런스는 컨슈머 그룹에 높은 가용성high availability과 규모 가변셩scalability을 제공하는 기능

브로커

  • 하나의 카프카 서버를 브로커라고 부른다.

  • 브로커는 프로듀서로부터 메시지를 전달받아 오프셋을 할당한 뒤 디스크 저장소에 쓴다.

  • 브로커는 컨슈머의 파티션 읽기fetch 요청 역시 처리하고 발행된 메시지를 보내준다.

클러스터

  • 카프카 브로커는 클러스터의 일부로서 작동하도록 설계되었다.

    • 하나의 클러스터 안에 여러 개의 브로커가 포함될 수 있다.

    • 그중 하나의 브로커가 클러스터 컨트롤러 역할을 하게 된다.

      • 컨트롤러는 파티션을 브로커에 할당해주거나 장애나 발생한 브로커를 모니터링하는 등의 관리 기능을 담당한다.

  • 파티션은 클러스터 안의 브로커 중 하나가 담당하며, 그 브로커는 파티션 리더partition leader라고 부른다.

  • 복제된 파티션이 여러 브로커에 할당될 수도 있는데 이것들은 파티션의 팔로워follewer라고 부른다.

    • 복제replication~~ 기능은 파티션의 메시지를 중복 저장함으로써 리더 브로커에 장애가 발생했을 때 팔로워 중 하나가 리더 역할을 이어받을 수 있도록 한다.

2. 카프카 설치하기

주키퍼

  • 아파치 카프카는 카프카 클러스터의 메타데이터와 컨슈터 클라이언트에 대한 정보를 저장하기 위해 아파키 주키퍼를 사용한다.

    • 클러스터 환경 관리를 위한 (분산 시스템) 코디네이터

  • 주키퍼는 고가용성을 보장하기 위해 앙상블ensemble이라 불리는 클러스터 단위로 작동하고록 설계되었다.

  • 주키퍼가 사용하느 부하 분산 알고리즘 때문에 앙상블은 홀수 개의 서버를 가지는 것이 권장된다.

    • 주키퍼가 요청에 응답하려면 앙상블 멤버(쿼럼quorum)의 과반 이상이 작동하고 있어야 하기 때문이다.

Actually, the problem is not with ZooKeeper itself but with the concept of external metadata management. (사실 문제는 ZooKeeper 자체가 아니라 외부 메타데이터 관리 개념에 있습니다.)

Quichstart

# step 1 - 다운로드
# https://kafka.apache.org/downloads
$ curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
$ tar -xzf kafka_2.13-3.5.1.tgz
$ cd kafka_2.13-3.5.1

# step 2 - 카프카 실행
# Kafka with KRaft
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
$ bin/kafka-server-start.sh config/kraft/server.properties

# step 3 - 토픽 생성/확인
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test

# step 4 - 메시지 생성
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

# step 5 - 메시지 읽기
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

# kafka-ui - https://github.com/provectus/kafka-ui
# https://docs.kafka-ui.provectus.io/development/building/without-docker
# https://github.com/provectus/kafka-ui/releases
$ java -Dspring.config.additional-location=application.yml -jar kafka-ui-api-v0.7.1.jar

브로커 설정

  • broker.id

    • 모든 카프카 브로커는 정숫값 식별자를 갖는다.

    • 기본값은 0

  • listeners

    • 쉼표로 구분된 리스너 이름과 URI 목록

    • 1024 미만의 포트 번호를 사용할 경우 루트 권한으로 카프카를 실행시켜야 하며, 이는 바람직하지 않다.

  • zookeeper.connect

    • 브로커의 메타데이터가 저장되는 주키퍼의 위치

  • log.dirs

  • num.recovery.threads.per.data.dir

    • 카프카는 설정 가능한 스레드 풀을 사용해서 로그 세그먼트를 관리한다.

    • 기본적으로 로그 디렉토리에 대해 하나의 스레드만이 사용된다.

    • 이 설정에 따라 언클린 셧다운unclean shutdown 이후 복구를 위한 재시작 시간이 차이날 수 있다.

  • auto.create.topics.enable

    • 브로커가 토픽을 자동으로 생성하도록 하는 상황

      • 프로듀서가 토픽에 메시지를 쓰기 시작할 때

      • 컨슈머가 토틱으로부터 메시지를 읽기 시작할 떄

      • 클라이언트가 토픽에 대한 메타데이터를 요청할 떄

    • 이런 자동 생성을 제어하는 설정

  • auto.leader.rebalendce.enable

    • 모든 토픽의 리더 역할이 하나의 브로커에 집중됨으로써 카프카 클러스터의 균형이 때지는 수가 있음

    • 이를 균등하게 분산되도록하는 설정. 파티션의 분포 상태를 주기적으로 확인하느 백그라운드 스레드가 시작됨

  • delete.topic.enable

    • 토픽을 임의로 삭제 못하게끔 하는 설정

토픽 설정

  • num.partitions

    • 새로운 토픽이 생성될 떄 몇 개의 파티션을 갖게 되는지 결정

    • 기본값 1

    • 토픽의 파티션 갯수는 늘릴 수만 있지 줄일 수는 없다.

    • 일단 작은 크기로 시작해서 나중에 필요할 떄 확장하는 것이 낫다.

    • 파티션 수는 어떻게 결정?

      • 토픽에 대해 달성하고자 하는 처리량

      • 단일 파티션에 대해 달성하고자 하는 최대 읽기 처리량

  • default.replication.factor

    • 복제 팩터 값은 min.insync.replicas 설정값보다 최소한 1 이상 크게 잡아줄 것을 강력히 권장

    • 좀 더 내고장성fault tolerance 있는 설정을 바란다면, 설정값보다 2 큰 값으로 복제 팩터를 설정하는 것이 좋다. 이것을 보통 RF++로 줄여 쓴다.

  • log.retention.ms

    • 얼마나 오랫동안 메시지를 보존해야하는지 지정할 떄 가장 많이 사용되는 설정이 시간 기준 보존 주기 설정

    • log.retension.ms 사용을 권장. 1개 이상 설정되었을 때 더 작은 단위 설정값이 우선권을 가짐

  • log.retention.bytes

    • 메시지 만료의 또 다른 기준은 보존되는 메시지의 용량

    • 파티션 단위로 적용

    • -1로 설정하면 영구히 보존

  • log.segment.bytes

  • log.roll.ms

  • min.insync.replicas

    • 데이터 지속성 위주로 클러스터를 설정할 때 이 값을 2로 잡아주면 최소한 2개의 레플리카가 최산 상태로 프로듀서와 동기화되도록 할 수 있다.

  • message.max.bytes

    • 카프카 브로커는 쓸 수 있는 메시지의 최대 크기를 제한

    • 기본값은 1,000,000(1MB)

    • 이 값을 증가시키는 겂은 성능에 큰 영향을 미친다.

      • 메시지가 커지는 만큼 네트워크 연결과 요청을 처리하는 브로커 스레드의 요청당 작업 시간도 증가하기 때문

      • 디스크에 써야하는 크기 역시 증가하는데, 이는 I/O 처리량에 영향을 미친다.

하드웨어

  • 디스크 처리량

    • 로그 세그먼트를 저장하는 브로커 디스크의 처리량은 프로듀서 클라이언트 성능에 가장 큰 영향을 미친다.

    • 디스크 쓰기 속도가 빨라진다는 것은 곧 쓰기 지연이 줄어드는 것

    • 경험적으로, 대체로 많은 수의 클라이언트 연결을 받아내야 하는 경우에는 SSD, 자주 쓸 일이 없는 데이터를 굉장히 많이 저장해야 하는 클러스터의 경우 HDD

  • 디스크 용량

  • 메모리

    • 시스템 페이지 캐시로 사용할 수 있느 ㄴ메모리를 더 할당해 줌으로써 컨슈머 클라이언트 성능을 향상시킬 수 있음

    • 카프카 그 자체는 JVM에 많은 힙 메모리를 필요로 하지 않음

  • 네트워크

    • 사용 가능한 네트워크 대역폭은 카프카가 처리할 수 있는 트래픽의 처리량을 결정

  • CPU

    • 카프카 클러스터를 매우 크게 확장하지 않는 한 그렇게 중요하진 않음

    • 브로커는 작업이 끝나고 디스크에 저장하기 위해 메시지를 다시 압축하는데 이 부분이 처리 능력이 중요해지는 지점

      • 작업 - 카프카 브로커가 각 메시지의 체크섬을 확인하고 오프셋을 부여하기 위해 모든 메시지 배치의 압축을 해제

카프카 클러스터 설정

  • 브로커 갯수

    • 카프카 클러스터의 적절한 크기롤 결정하는 요소들

      • 디스크 용량

      • 브로커당 레플리카 용량

      • CPU 용량

      • 네트워크 용량

    • 가장 먼저 고려할 요소는 필요한 메시지를 저장하는 데 필요한 디스크 용량과 단일 브로커가 사용할수 있는 저장소 용량

    • 클러스터가 처리 가능한 요청량

  • 운영체제 튜닝

    • 주로 가상 메모리와 네트워크 서브시스템, 로그 세그먼트를 저장하기 위해 사용되는 디스크의 마운트 등

    • 가상메모리

      • 카프카의 부하 특정에 맞게 스왑 공간이나 더티 메모리 페이지dirty memory page가 사용되는 방식을 조절해줄 수 있음

    • 네트워킹

      • 각 소켓의 송신. 수신 버퍼에 할당되는 기본/최대 메모리의 양

프로덕션 환경에서의 고려 사항

  • 카프카 브로커는 힙 메모리를 상당히 효율적으로 사용할 뿐 아니라 GC의 대상이 되는 객체 역시 가능한 한 적게 생성하기 때문에 아래 설정들을 낮게 잡아줘도 괜찮다.

    • MaxGCPauseMillis

    • InitiaingHeapOccupancyPercent

3. 카프카 프로듀서

4. 카프카 컨슈머

4.6. 오프셋과 커밋

  • 카프카는 컨슈머의 응답을 받지 않는다.

  • 컨슈머가 카프카를 사용해 각 파티션에서의 위치를 추적한다.

    • 특수 토픽인 __consumer_offsets 에 각 파티션별로 커밋된 오프셋을 업데이트

  • 파티션의 현재 위치를 업데이트하는 작업을 ‘오프셋 커밋’이라 함

    • 카프카는 레코드를 개별적으로 커밋하지 않음

    • 마지막 메시지를 커밋함으로써 그 앞의 모든 메시지는 성공적으로 처리했음을 암묵적으로 나타냄

  • 컨슈머서 크래시되거나 새로운 컨슈머가 그룹에 추가될 때 리밸런스 발생

    • 리밸런스 이후 어디서부터 처리해야할지 알기 위해 각 파티션의 마지막으로 커밋된 메시지를 일어온 뒤 처리 진행

자동 커밋

  • enable.auto.commit 값을 `true`로 잡아두면 컨슈머가 5초에 한 번, `poll()`을 통해 받은 메시지 중 마지막 메시지의 오프셋을 커밋함

    • auto.commit.interval.ms 값으로 커밋 간격을 변경할 수 있음

  • 컨슈머의 모든 다른 것들과 마찬가지로, 자동 커밋은 폴링 루프에 의해 실행됨 (‘4.4 폴링 루프’ 참고)

    • 컨슈머 API의 핵심은 서버에 추가 데이터가 들어왔는지 폴링하는 단순한 루프

    • 새 컨슈머에서 처음으로 poll() 호출하면 컨슈머는 `GroupCoordnator`를 찾아서 컨슈머 그룹에 참가, 파티션을 할당 받음

  • 자동 커밋을 사용할때 발생할 수 있는 결과

    • 마지막 커밋한지 3초 위에 컨슈머 크래시

      • 리밸런싱 이후 다시 처리하는데, 마지막 커밋 이후부터 처리하므로 3초간 읽혔던 이벤트들이 두 번 처리된다.

    • 커밋할 때가 되면 이후 호출될 `poll()`이 이전 호출에서 리턴된 마지막 오프셋을 커밋함.

  • 편하다. 허나 중복 메시지를 방지하기엔 충분하지 않다.

현재 오프셋 커밋하기

  • 메시지 유실 가능성을 제거, 리밸런스 이후 중복 메시지를 줄이기 위해 오프셋이 커밋되는 시각을 제거하고자 함 **enable.auto.commit=false 설정

  • 가장 간단하고 신뢰성 있는 커밋 API는 commitSync()

    • poll() 이 리턴한 마지막 오프셋을 커밋한 뒤 커밋이 성공적으로 완료되면 리턴, 혹은 예외 던짐

    • poll() 에 의해 리턴된 마지막 로프셋을 커밋한다는 점에 유의

    • poll() 에서 리턴된 모든 레코드의 처리가 완료되기 전 commitSync() 호출할 경우 이후에 어플리케이션 크래시되면 아직 처리되지 않은 메시지들이 누락될 수 있음

    • 레코드를 처리하는 도중 크래시날 경우 마지막 매치의 맨 앞 레코드에서부터 리밸런스 시작 지점까지의 모든 레코드들은 두 번 처리됨

비동기적 커밋

  • 스동 커밋의 단점중 하나는 브로커가 커밋 요청에 응답할 떄까지 어플리케이션이 블락된다는 것

    • 어플리케이션의 처리량을 제한하게 됨

  • 비동기적 커밋 API: commitAsync()

    • 요청만 보내고 처리를 계속하는 것

  • `commitSync()`는 문제가 있을 경우 재시도하는데, `commitAsync()`는 재시도 하지 않음

  • 오프셋의 순서를 올바르게 하는 것이 중요

특정 오프셋 커밋하기

  • 큰 배치를 처리할 때, 리밸런스 되서 다시 배치가 동작하는 것을 피하고자 할 경우 어떻게?

  • commitSync(), commitAsync() 메서드들은 아직 처리하지 않은, 리턴된 마지막 오프셋을 커밋한다.

  • 위 메서드에 파티션과 오프셋의 맵을 전달할 수 있다.

4.7. 리밸런스 리스너

  • 컨슈머는 종료하기 전이나 리밸런싱 시작 전에 cleanup 작업을 해줘야 할 것

  • onPartitionsAssigned()

    • 리밸런싱 발생할 때마다 호출

  • onPartitionsRevoked()

    • 리밸런스 상황에서 호출되지만, 파티션이 특정 컨슈머에서 해제될 때만 호출

  • onPartitionsLost()

    • 예외적인 리밸런스 상황에서 호출됨

4.8. 특정 오프셋의 레코드 읽어오기

  • seekToBeginning() - 파티션의 맨 앞에서부터 모든 메시지를 읽고자 할 경우

  • seekToEnd() - 앞의 메시지를 건너뛰로 파티션에 새로 들어온 메시지부터 읽고자 할 경우

  • seek() - 오프셋 지정

4.9. 폴링 루프를 벗어나는 방법

  • 즉시 루프를 탈출하고 싶다면 다른 스레드에서 consumer.wakeup() 호출

    • 다른 스레드에 있을 때만 안전하게 작동한는 유일한 컨슈머 메서드

    • 다음 poll() 호출될 때 WakeupException 예외 발생

  • 메인스레드에서 컨슈머 루프가 돌다면 ShutdownHook 활용

4.10. 디시리얼라이저

  • 카프카 프로듀서는 카프카에 데이터를 쓰기 전 커스컴 객체를 바이트 배열로 변환하기 위해 시리얼라이저가 필요함.

  • 카프카 컨슈머는 카프카로부터 받은 바이트 매열을 가바 객체로 변환하기 위해 디시리얼라이저가 필요함