1. 카프카 시작하기
-
publish/subscribe message
-
전송자(발행하는 쪽)가 데이터(메시지)를 보낼 때 직접 수신자(구독하는 쪽)로 보내지 않는다.
-
전송자를 어떤 형태로든 메시지를 분류해서 보내고, 수신자는 분류된 메시지를 구독한다.
-
대개 발행된 메시지를 전달 받고 중계해주는 중간 지점 역할을 하는 브로커broker가 있다.
-
-
아파치 카프카는 메시지 발행/구독 시스템이다.
-
'분산 커밋 로그' or '분산 스트리밍 플랫폼’이라고 불리기도 한다.
-
파일시스템이나 데이터베이스 커밋 로그commit log는 모든 트랜잭션 기록을 지속성durable있게 보존함으로써 시스템의 상태를 일관성consistency 있게 복구할 수 있도록 고안되었다.
-
카프카에 저장된 데이터는 순서를 유지한채로 지속성 있게 보관되면 결정적deterministic으로 읽을 수 있다.
-
확장시 실패하더라도 데이터 사용에는 문제가 없도록 시스템 안에서 데이터를 분산시켜 저장할 수 있다.
-
-
일정 기간 동안 메시지를 지속성durability 있게 보관하는 보존retention 기능이 있다.
-
특정 기간동안 메시지를 보전하거나, 파티션의 크기가 특정 사이즈에 도달할 떄까지 데이터를 보존한다.
-
메시지message
-
카프카에서 데이터의 기본 단위. 데이터베이스의 row나 record와 비슷해보일 수 있다. 카프카 입장에서 메시지는 단순히 바이트 배열일 뿐이므로 특정한 형식이나 의미가 없다.
-
메시지는 key라 불리는 메타데이터를 포함할 수 있다. key는 메시지를 저장할 파티션을 결정하기 위해 사용된다.
-
특정한 토픽에 쓰여진다.
배치batch
-
카프카는 효율성을 위해 메시지를 배치 단위로 저장한다. 배치는 토픽의 파티션에 쓰여지는 메시지들의 집합이다.
-
메시지를 배치 단위로 쓰면 네트워크상 오버헤드를 줄일 수 있다. 허나 지연latency과 처리량throughput 사이에 트레이드오프를 발생시킨다.
스키마
-
카프카 입장에서 메시지는 단순한 바이트 배열일 뿐이지만, 내용일 이해하기 쉽도록 일정한 구조(혹은 스키마)를 부여하는 것이 권장된다.
-
JSON, XML, Avro(카프카 개발자들이 선호)이 있다.
토픽topic
-
카프카에 저장되는 메시지는 토픽 단위로 분류된다. 토픽은 다시 여러 개의 파티션partition으로 나눠진다.
-
파티션에 메시지가 쓰일 땐 append-only 형태로 쓰여진다. 읽을 땐 FIFO로 읽힌다.
-
토픽에 여러 파티션이 있는 만큼 토픽 안의 메시지 전체에 대해 순서는 보장되지 않으며, 단일 파티션 안에서만 순서가 보장될 뿐이다.
-
파티션은 복제될 수 있다. 서로 다른 서버들이 동일한 파티션의 복제본을 저장하고 있기 때문에 서버 중 하나에 장애가 발생해도 문제 없다.
-
스트림stream은 (파티션의 갯수와 상관없이) 하나의 토픽에 저장되는 데이터로 간주되며, 프로듀서producer로부터 컨슈머consumer로의 하나의 데이터 흐름을 나타낸다.
-
메시지 집합을 스크림이라는 용어로 부르는 것은 …
-
이러한 방식의 처리는 데이터를 시간이 흐른 뒤 한꺼번에 대량으로 처리하는 하둡과 같은 오프라인 프레임워크와 대비된다.
-
-
토픽에는 로그 압착log compaction 기능을 설정할 수 있는데,같이 키를 갖는 메시지 중 가장 최신의 것만 보존된다.
프로듀서
-
메시지 생성한다.
-
발행자publisher 혹은 작성자writer라고도 부른다.
-
프로듀서가 특정한 파티션을 지정해서 메시지를 쓸 수 있다. 메시지 키와 키값의 해시를 특정 파티션으로 대응시켜주는 파티셔너partitioner를 사용해서 구현한다.
컨슈머
-
메시지를 읽는다.
-
구독자subscriber 혹은 독자reader라고도 부른다.
-
토픽을 구독해서 여기에 저장된 메시지들을 각 파티션에 쓰여진 순서대로 읽어 온다.
-
컨슈머는 메시지의 오프셋offset을 기록함으로써 어느 메시지까지 읽었는지를 유지한다.
-
컨슈머는 컨슈머 그룹의 일원consumer group으로 작동한다.
컨슈머 그룹
-
컨슈머 그룹 단위로 offset을 관리한다.
-
컨슈머 그룹에 속한 컨슈머들은 자신들이 구독하는 토픽의 파티션들에 대한 소유권을 공유함
-
컨슈머에 파티션을 재할당reassignment하는 작업은 컨슈머 그룹에 읽고 있는 토픽이 변경되었을 때도 발생한다.
-
컨슈머에 할당된 파티션을 다른 컨슈머에 할당해주는 작업을 '리밸런스rebalance'라고 함
-
리밸런스는 컨슈머 그룹에 높은 가용성high availability과 규모 가변셩scalability을 제공하는 기능
브로커
-
하나의 카프카 서버를 브로커라고 부른다.
-
브로커는 프로듀서로부터 메시지를 전달받아 오프셋을 할당한 뒤 디스크 저장소에 쓴다.
-
브로커는 컨슈머의 파티션 읽기fetch 요청 역시 처리하고 발행된 메시지를 보내준다.
클러스터
-
카프카 브로커는 클러스터의 일부로서 작동하도록 설계되었다.
-
하나의 클러스터 안에 여러 개의 브로커가 포함될 수 있다.
-
그중 하나의 브로커가 클러스터 컨트롤러 역할을 하게 된다.
-
컨트롤러는 파티션을 브로커에 할당해주거나 장애나 발생한 브로커를 모니터링하는 등의 관리 기능을 담당한다.
-
-
-
파티션은 클러스터 안의 브로커 중 하나가 담당하며, 그 브로커는 파티션 리더partition leader라고 부른다.
-
복제된 파티션이 여러 브로커에 할당될 수도 있는데 이것들은 파티션의 팔로워follewer라고 부른다.
-
복제replication~~ 기능은 파티션의 메시지를 중복 저장함으로써 리더 브로커에 장애가 발생했을 때 팔로워 중 하나가 리더 역할을 이어받을 수 있도록 한다.
-
2. 카프카 설치하기
주키퍼
-
아파치 카프카는 카프카 클러스터의 메타데이터와 컨슈터 클라이언트에 대한 정보를 저장하기 위해 아파키 주키퍼를 사용한다.
-
클러스터 환경 관리를 위한 (분산 시스템) 코디네이터
-
-
주키퍼는 고가용성을 보장하기 위해 앙상블ensemble이라 불리는 클러스터 단위로 작동하고록 설계되었다.
-
주키퍼가 사용하느 부하 분산 알고리즘 때문에 앙상블은 홀수 개의 서버를 가지는 것이 권장된다.
-
주키퍼가 요청에 응답하려면 앙상블 멤버(쿼럼quorum)의 과반 이상이 작동하고 있어야 하기 때문이다.
-
Actually, the problem is not with ZooKeeper itself but with the concept of external metadata management. (사실 문제는 ZooKeeper 자체가 아니라 외부 메타데이터 관리 개념에 있습니다.)
-
KRaft
-
새로운 메타데이터 관리를 위함
-
Raft 합의 프로토콜의 이벤트 기반 변형을 사용하는 kafka의 새로원 쿼럼 컨트롤러 서비스
-
Quichstart
# step 1 - 다운로드
# https://kafka.apache.org/downloads
$ curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
$ tar -xzf kafka_2.13-3.5.1.tgz
$ cd kafka_2.13-3.5.1
# step 2 - 카프카 실행
# Kafka with KRaft
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
$ bin/kafka-server-start.sh config/kraft/server.properties
# step 3 - 토픽 생성/확인
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test
# step 4 - 메시지 생성
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
# step 5 - 메시지 읽기
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# kafka-ui - https://github.com/provectus/kafka-ui
# https://docs.kafka-ui.provectus.io/development/building/without-docker
# https://github.com/provectus/kafka-ui/releases
$ java -Dspring.config.additional-location=application.yml -jar kafka-ui-api-v0.7.1.jar
브로커 설정
-
broker.id
-
모든 카프카 브로커는 정숫값 식별자를 갖는다.
-
기본값은 0
-
-
listeners
-
쉼표로 구분된 리스너 이름과 URI 목록
-
1024 미만의 포트 번호를 사용할 경우 루트 권한으로 카프카를 실행시켜야 하며, 이는 바람직하지 않다.
-
-
zookeeper.connect
-
브로커의 메타데이터가 저장되는 주키퍼의 위치
-
-
log.dirs
-
카프카는 모든 메시지를 로그 세그먼트log segment 단위로 묶어서 지정된 디스크에 저장한다.
-
-
num.recovery.threads.per.data.dir
-
카프카는 설정 가능한 스레드 풀을 사용해서 로그 세그먼트를 관리한다.
-
기본적으로 로그 디렉토리에 대해 하나의 스레드만이 사용된다.
-
이 설정에 따라 언클린 셧다운unclean shutdown 이후 복구를 위한 재시작 시간이 차이날 수 있다.
-
-
auto.create.topics.enable
-
브로커가 토픽을 자동으로 생성하도록 하는 상황
-
프로듀서가 토픽에 메시지를 쓰기 시작할 때
-
컨슈머가 토틱으로부터 메시지를 읽기 시작할 떄
-
클라이언트가 토픽에 대한 메타데이터를 요청할 떄
-
-
이런 자동 생성을 제어하는 설정
-
-
auto.leader.rebalendce.enable
-
모든 토픽의 리더 역할이 하나의 브로커에 집중됨으로써 카프카 클러스터의 균형이 때지는 수가 있음
-
이를 균등하게 분산되도록하는 설정. 파티션의 분포 상태를 주기적으로 확인하느 백그라운드 스레드가 시작됨
-
-
delete.topic.enable
-
토픽을 임의로 삭제 못하게끔 하는 설정
-
토픽 설정
-
num.partitions
-
새로운 토픽이 생성될 떄 몇 개의 파티션을 갖게 되는지 결정
-
기본값 1
-
토픽의 파티션 갯수는 늘릴 수만 있지 줄일 수는 없다.
-
일단 작은 크기로 시작해서 나중에 필요할 떄 확장하는 것이 낫다.
-
파티션 수는 어떻게 결정?
-
토픽에 대해 달성하고자 하는 처리량
-
단일 파티션에 대해 달성하고자 하는 최대 읽기 처리량
-
-
-
default.replication.factor
-
복제 팩터 값은 min.insync.replicas 설정값보다 최소한 1 이상 크게 잡아줄 것을 강력히 권장
-
좀 더 내고장성fault tolerance 있는 설정을 바란다면, 설정값보다 2 큰 값으로 복제 팩터를 설정하는 것이 좋다. 이것을 보통 RF++로 줄여 쓴다.
-
-
log.retention.ms
-
얼마나 오랫동안 메시지를 보존해야하는지 지정할 떄 가장 많이 사용되는 설정이 시간 기준 보존 주기 설정
-
log.retension.ms 사용을 권장. 1개 이상 설정되었을 때 더 작은 단위 설정값이 우선권을 가짐
-
-
log.retention.bytes
-
메시지 만료의 또 다른 기준은 보존되는 메시지의 용량
-
파티션 단위로 적용
-
-1로 설정하면 영구히 보존
-
-
log.segment.bytes
-
log.roll.ms
-
min.insync.replicas
-
데이터 지속성 위주로 클러스터를 설정할 때 이 값을 2로 잡아주면 최소한 2개의 레플리카가 최산 상태로 프로듀서와 동기화되도록 할 수 있다.
-
-
message.max.bytes
-
카프카 브로커는 쓸 수 있는 메시지의 최대 크기를 제한
-
기본값은 1,000,000(1MB)
-
이 값을 증가시키는 겂은 성능에 큰 영향을 미친다.
-
메시지가 커지는 만큼 네트워크 연결과 요청을 처리하는 브로커 스레드의 요청당 작업 시간도 증가하기 때문
-
디스크에 써야하는 크기 역시 증가하는데, 이는 I/O 처리량에 영향을 미친다.
-
-
하드웨어
-
디스크 처리량
-
로그 세그먼트를 저장하는 브로커 디스크의 처리량은 프로듀서 클라이언트 성능에 가장 큰 영향을 미친다.
-
디스크 쓰기 속도가 빨라진다는 것은 곧 쓰기 지연이 줄어드는 것
-
경험적으로, 대체로 많은 수의 클라이언트 연결을 받아내야 하는 경우에는 SSD, 자주 쓸 일이 없는 데이터를 굉장히 많이 저장해야 하는 클러스터의 경우 HDD
-
-
디스크 용량
-
메모리
-
시스템 페이지 캐시로 사용할 수 있느 ㄴ메모리를 더 할당해 줌으로써 컨슈머 클라이언트 성능을 향상시킬 수 있음
-
카프카 그 자체는 JVM에 많은 힙 메모리를 필요로 하지 않음
-
-
네트워크
-
사용 가능한 네트워크 대역폭은 카프카가 처리할 수 있는 트래픽의 처리량을 결정
-
-
CPU
-
카프카 클러스터를 매우 크게 확장하지 않는 한 그렇게 중요하진 않음
-
브로커는 작업이 끝나고 디스크에 저장하기 위해 메시지를 다시 압축하는데 이 부분이 처리 능력이 중요해지는 지점
-
작업 - 카프카 브로커가 각 메시지의 체크섬을 확인하고 오프셋을 부여하기 위해 모든 메시지 배치의 압축을 해제
-
-
카프카 클러스터 설정
-
브로커 갯수
-
카프카 클러스터의 적절한 크기롤 결정하는 요소들
-
디스크 용량
-
브로커당 레플리카 용량
-
CPU 용량
-
네트워크 용량
-
-
가장 먼저 고려할 요소는 필요한 메시지를 저장하는 데 필요한 디스크 용량과 단일 브로커가 사용할수 있는 저장소 용량
-
클러스터가 처리 가능한 요청량
-
-
운영체제 튜닝
-
주로 가상 메모리와 네트워크 서브시스템, 로그 세그먼트를 저장하기 위해 사용되는 디스크의 마운트 등
-
가상메모리
-
카프카의 부하 특정에 맞게 스왑 공간이나 더티 메모리 페이지dirty memory page가 사용되는 방식을 조절해줄 수 있음
-
-
네트워킹
-
각 소켓의 송신. 수신 버퍼에 할당되는 기본/최대 메모리의 양
-
-
프로덕션 환경에서의 고려 사항
-
카프카 브로커는 힙 메모리를 상당히 효율적으로 사용할 뿐 아니라 GC의 대상이 되는 객체 역시 가능한 한 적게 생성하기 때문에 아래 설정들을 낮게 잡아줘도 괜찮다.
-
MaxGCPauseMillis
-
InitiaingHeapOccupancyPercent
-
3. 카프카 프로듀서
4. 카프카 컨슈머
4.6. 오프셋과 커밋
-
카프카는 컨슈머의 응답을 받지 않는다.
-
컨슈머가 카프카를 사용해 각 파티션에서의 위치를 추적한다.
-
특수 토픽인
__consumer_offsets
에 각 파티션별로 커밋된 오프셋을 업데이트
-
-
파티션의 현재 위치를 업데이트하는 작업을 ‘오프셋 커밋’이라 함
-
카프카는 레코드를 개별적으로 커밋하지 않음
-
마지막 메시지를 커밋함으로써 그 앞의 모든 메시지는 성공적으로 처리했음을 암묵적으로 나타냄
-
-
컨슈머서 크래시되거나 새로운 컨슈머가 그룹에 추가될 때 리밸런스 발생
-
리밸런스 이후 어디서부터 처리해야할지 알기 위해 각 파티션의 마지막으로 커밋된 메시지를 일어온 뒤 처리 진행
-
자동 커밋
-
enable.auto.commit
값을 `true`로 잡아두면 컨슈머가 5초에 한 번, `poll()`을 통해 받은 메시지 중 마지막 메시지의 오프셋을 커밋함-
auto.commit.interval.ms
값으로 커밋 간격을 변경할 수 있음
-
-
컨슈머의 모든 다른 것들과 마찬가지로, 자동 커밋은 폴링 루프에 의해 실행됨 (‘4.4 폴링 루프’ 참고)
-
컨슈머 API의 핵심은 서버에 추가 데이터가 들어왔는지 폴링하는 단순한 루프
-
새 컨슈머에서 처음으로
poll()
호출하면 컨슈머는 `GroupCoordnator`를 찾아서 컨슈머 그룹에 참가, 파티션을 할당 받음
-
-
자동 커밋을 사용할때 발생할 수 있는 결과
-
마지막 커밋한지 3초 위에 컨슈머 크래시
-
리밸런싱 이후 다시 처리하는데, 마지막 커밋 이후부터 처리하므로 3초간 읽혔던 이벤트들이 두 번 처리된다.
-
-
커밋할 때가 되면 이후 호출될 `poll()`이 이전 호출에서 리턴된 마지막 오프셋을 커밋함.
-
-
편하다. 허나 중복 메시지를 방지하기엔 충분하지 않다.
현재 오프셋 커밋하기
-
메시지 유실 가능성을 제거, 리밸런스 이후 중복 메시지를 줄이기 위해 오프셋이 커밋되는 시각을 제거하고자 함 **
enable.auto.commit=false
설정 -
가장 간단하고 신뢰성 있는 커밋 API는
commitSync()
-
poll()
이 리턴한 마지막 오프셋을 커밋한 뒤 커밋이 성공적으로 완료되면 리턴, 혹은 예외 던짐 -
poll()
에 의해 리턴된 마지막 로프셋을 커밋한다는 점에 유의 -
poll()
에서 리턴된 모든 레코드의 처리가 완료되기 전commitSync()
호출할 경우 이후에 어플리케이션 크래시되면 아직 처리되지 않은 메시지들이 누락될 수 있음 -
레코드를 처리하는 도중 크래시날 경우 마지막 매치의 맨 앞 레코드에서부터 리밸런스 시작 지점까지의 모든 레코드들은 두 번 처리됨
-
비동기적 커밋
-
스동 커밋의 단점중 하나는 브로커가 커밋 요청에 응답할 떄까지 어플리케이션이 블락된다는 것
-
어플리케이션의 처리량을 제한하게 됨
-
-
비동기적 커밋 API:
commitAsync()
-
요청만 보내고 처리를 계속하는 것
-
-
`commitSync()`는 문제가 있을 경우 재시도하는데, `commitAsync()`는 재시도 하지 않음
-
오프셋의 순서를 올바르게 하는 것이 중요
특정 오프셋 커밋하기
-
큰 배치를 처리할 때, 리밸런스 되서 다시 배치가 동작하는 것을 피하고자 할 경우 어떻게?
-
commitSync()
,commitAsync()
메서드들은 아직 처리하지 않은, 리턴된 마지막 오프셋을 커밋한다. -
위 메서드에 파티션과 오프셋의 맵을 전달할 수 있다.
4.7. 리밸런스 리스너
-
컨슈머는 종료하기 전이나 리밸런싱 시작 전에 cleanup 작업을 해줘야 할 것
-
onPartitionsAssigned()
-
리밸런싱 발생할 때마다 호출
-
-
onPartitionsRevoked()
-
리밸런스 상황에서 호출되지만, 파티션이 특정 컨슈머에서 해제될 때만 호출
-
-
onPartitionsLost()
-
예외적인 리밸런스 상황에서 호출됨
-
4.8. 특정 오프셋의 레코드 읽어오기
-
seekToBeginning()
- 파티션의 맨 앞에서부터 모든 메시지를 읽고자 할 경우 -
seekToEnd()
- 앞의 메시지를 건너뛰로 파티션에 새로 들어온 메시지부터 읽고자 할 경우 -
seek()
- 오프셋 지정
4.9. 폴링 루프를 벗어나는 방법
-
즉시 루프를 탈출하고 싶다면 다른 스레드에서
consumer.wakeup()
호출-
다른 스레드에 있을 때만 안전하게 작동한는 유일한 컨슈머 메서드
-
다음
poll()
호출될 때WakeupException
예외 발생
-
-
메인스레드에서 컨슈머 루프가 돌다면
ShutdownHook
활용
4.10. 디시리얼라이저
-
카프카 프로듀서는 카프카에 데이터를 쓰기 전 커스컴 객체를 바이트 배열로 변환하기 위해 시리얼라이저가 필요함.
-
카프카 컨슈머는 카프카로부터 받은 바이트 매열을 가바 객체로 변환하기 위해 디시리얼라이저가 필요함