Kafka Internal
by Jaesang Lim
Kafka Internal
다룰 내용들~
- Kafka Controller
- Kafka Replication work
- Request Processing
- Kafka Physical Storage
1. Kafka Controller
- 일반적인 broker 기능 + partition 관리자
- partition과 replica들의 상태 관리
- partition 재할당 관리
- zookeeper /controller, /controller_epoch
broker는 고유한 id을 가지고 있고, 이 id기반으로 주키퍼에 연결하여, ephemeral node로 연결
- emphemeral node여서 어떤 이유로 connection이 끊어지면 삭제되고, watch에 의해 broker가 사라진 것을 다른 브로커들도 알게됌
- 해당 broker을 다시 재시작하면, broker ephemeral node에느 사라지지만, isr같은 다른 자료구조에서 해당 노드가 기존에 있던 브로커임을 인지하고 재시작함
controller는 먼저 등록한 Broker로 할당
- 만약 broker가 사라지면, controller의 watch로 인지하고 다른 Broker이 controller로 연결
- 이미 할당되었다면 node already exist로 1대만 controller로 split brain을 막음
- 모든 broker는 controller epoch number로 관리
- partition 재할당 관리
- broker 삭제
- 삭제된 broker가 가지고 있던 leader partition들의 새로운 leader 필요
- 새로운 leader가 필요한 모든 partition 검사, 어떤 partition이 새로운 leader가 될 지 결정
- 모든 broker에 해당 정보를 포함한 request 요청
- 새로운 broker 추가
- 새로운 broker가 추가됐음을 기존의 broker들에게 알림
- broker id를 통해 broker에 replica가 있는지 check
- 만약 replica가 있다면 새로운 broker는 기존에 존재하는 leader로부터 message를 복제
- broker 삭제
2. Kafka Replication work
- two type of replicas가 존재함
- leader replica
- 모든 request는 leader를 통해 수행 (일관성 보장)
- follower replica
- leader가 아닌 모든 replica
- request 처리 x
- 오직 leader로부터 message 복사만 수행
- leader replica
- fetch request
- follower가 leader와 sync 상태를 유지하기 위해 보내는 request
- consumer request와 동일
- 다음에 받기를 원하는 message의 offset이 포함
- leader는 offset을 보고 follower가 어디까지 message를 받았는지 확인 가능
- replica가 isr인지 판단하는 것은 replica가 leader에게 요청함 Fetch request
- 이게 일정 시간동안 안오거나, 최신 Offset이 아닌 값에 대한 fetch request할 시, out-of-sync로 간주
- replica.lag.time.max.ms -> 10초
- preferred leader
- 토픽 만들고 처음에 할당한 leader partition
- 이 정보를 계속 가지고 있는 이유는, 초기에 토픽을 만들 때 브러커당 partition 분배를 균형있게 해놔서
- auto.leader.rebalance.enable = true
- prefered leader가 현재 리더가 아닌데, isr이면 prefered-leader가 current leader로 변경
- isr의 첫번째 partition
3. Request Processing
broker은 여러 thread와 두개의 queue를 가짐
- acceptor thread
-
processing thread
- request queue
- response queue
client의 acceptor thread을 통해 연결을 맺고, processor thread가 Request을 Request Queue에 저장
- 또 processor thread는 request queue에 있는 요청을 받아 처리하고 response queue에 저장
- Request Protocol
- Kafka request의 형식 및 응답 형식에 대한 binary protocol
- request includes standard header:
- Request Type
- Request version
- Correlation ID (=request 고유 번호)
- Client ID
-
대표적인 Request
- Producer Request
- producer가 broker에 보내는 요청
-
client가 작성한 message를 포함
- Fetch Request
- consumer와 follower replica가 보내는 요청
- broker의 message를 fetch
- Producer Request
3_1. Metadata Request
- client가 어디에 요청을 보낼 지 어떻게 알 수 있을까?
- metadata request는 client가 관심있는 topic list를 포함
- metadata request를 받은 서버는 그 topic에 해당하는 partition의 정보(leader 위치 등)를 반환
- client는 metadata를 cache, request 실패 시 갱신
- 모든 broker가 metadata를 cache, 어떤 broker로든 요청 보낼 수 있음
- cache refresh interval은 metadata.max.age.ms로 조정 가능
3_2. Producer Request
- validation check
- leader partition이 request를 받으면, request 전에 선행
- client가 해당 topic에 쓰기 권한이 있는가?
- request에 지정된 acks(ch3)가 유효한가?(0, 1, all만이 허락됨)
- message durability guarantee
- linux에서 kafka message는 filesystem cache에 쓰이며,
- disk에 쓰이는 것이 보장되지 않음
- 시스템 down 시 메시지 휘발
- Kafka는 replica를 통해 message durability 보장
- 참고
- leader partition에 message가 기록되면, broker는 acks configuration을 검사
- acks가 0또는 1로 설정되어있다면 broker는 즉시 응답
- all로 설정되어 있다면 request는 leader가 follower가 message를 복제한 것을 관찰할 때까지 purgatory(사전적 의미는 일시적 장소)라 불리는 buffer에 저장
- 든 follower가 message를 복제한 시점에 client에 응답
3_3. Fetch Request
- 특정 topic의 partition과 offset을 지정하여 message 요청
- validation check
- producer request validation check와 유사하게 처리
- offset이 특정 partition에 대해 존재하는가?
- 요청 message가 삭제될 만큼 너무 오래 되진 않았는가?
- zero-copy message return
- message를 file(linux file cache)에서 중간 buffer 없이 네트워크 채널로 직접 전송
- bytes 복사 및 buffer 관리 overhead 감소, 성능 향상
- message size upper/lower bound
- upper bound
- lower bound
- traffic이 적은 topic을 읽을 때 CPU와 네트워크 사용량 감소
- 다른 batch와 마찬가지로 size+time을 기준으로 전송
4. Physocal Storage
- kafka 기본 저장단위 : partition replication
- 한 partition은 여러 개의 broker로 쪼개질 수 없음
- 한 broker 내의 partition은 여러 디스크에 나눠 저장될 수 없음
4_1. Parition Allocation
- 목표
- broker 간의 균등한 replica 분산
- 한 partition의 replica들은 각각 다른 broker에 존재(같은 partition은 한 broker 내에 x)
- partition allocation process
- random하게 broker 시작, round-robin 방법으로 partition leader 할당
- replica는 leader가 있는 broker와 일정한 간격의 broker에 배치
- broker 추가 시, auto partition rebalancing 없음
- replica reassignment tool 사용
- bin/kafka-reassign-partitions.sh ( kafka manager로 편하게 할 수 잇음!)
- 참고
- 5개의 브로커에 레플리카 3의 파티션 5개를 을 할당한다고 할 때, 0번 리더가 2번에 할당되고, 1번 리더가 4번에 할당되고, 2번 리더가 0번 브로커에 할당되고, and so on … - 파티션 0의 리더가 브로커 4에 있으면 첫번째 팔로워는 브로커 0에 있고 두번째 팔로워는 브로커 1에 있고..and so on
4_2. File Management
- Retention
- Kafka는 message를 영구히 저장하지 않음
- consumer가 메시지를 읽을 때까지 기다리지 않음
- retention period에 따라 message 삭제
- log.retention.hours: message 수명
- log.retention.bytes: partition의 최대 물리적 크기
- Segment
- 대용량 파일에서 삭제할 메시지를 찾고 삭제하는 것은 overhead가 큼
- 각 partition을 segment로 분할
- segment가 한도에 도달하면 현재 쓰고있는 파일을 닫고, 새 파일을 열어 작업 수행
- 현재 작성중인 활성(active) 세그먼트는 절대 삭제되지 않음
- retention기간보다 오래된 데이터가 삭제되지 않을 수 있음에 주의
- Index
- 어떻게 broker는 유효한 offset의 message를 빠르게 찾을 수 있을까?
- 주어진 offset의 메시지를 빠르게 찾기 위해 각 partition에 index를 유지/관리
- 세그먼트 파일과 파일 내의 위치에 대한 offset을 mapping
- Retention Compaction
- 오래된 message를 삭제하는 것이 아닌, 가장 최신의 값만 저장
- key/value message를 생성하는 응용프로그램에서만 가능
- null key가 있다면 compaction 실패
- two log sectio:
- Clean Section
- 이전에 압축된 message
- 각 key에는 압출될 때 가장 최신이였던 하나의 값만이 포함
- Dirty Section
- 마지막 압축 이후 쓰여진 messages
- Clean Section
- Log Compaction Details
- 전체 partition에서 dirty message의 비율이 가장 큰 partition을 선택, 압축 수행
- dirty message들에 대해 offset map 작성
- map entry는 key에 16byte hash, message에 8byte offset으로 구성
- segment 압축에 오직 24byte per entry 사용
- clean message를 오래된 순서로 읽으며, offset map과 비교/검사
- clean message의 key가 map에 없다면, 그 message이후로 들어온 같은 key의 message가 없다는 것이므로 message를 replacement segment에 복사
- 반대로, key가 map에 있다면, 이후에 해당 key에 신규 message가 입력됐다는 것이므로 그 message는 무시
- message 복사가 모두 끝나면 원본 segment와 replace segment를 swap하고 다음 segment로 이동
Subscribe via RSS