Reliable Data Delivery In Kafka
by Jaesang Lim
Reliable Data Delivery In Kafka
다룰 내용들~
- Reliability Guarantees
- Replication
- Broker Configuration
- Using Producers in a Reliable System
- Using Consumers in a Reliable System
1. Reliability Guarantees
- Reliable System ?
- 데이터의 유실 없이 최소화되어
- 안정적인 전달이 보장되는 시스템
- Guarantee ?
- partition 내의 messages의 순서
- commit된 messages는 partition의 모든 in-sync replicas에 쓰여짐 -acks (producer, default = 1) -min.insync.replicas (broker/topic, default = 1) -하나 이상의 replica가 살아있는 한, messages는 손실되지 않음 -consumers는 commit된 message만 읽을 수 있음
- 이러한 기본적인 guarantees로 어느정도 reliable system을 구성할 수 있음
- 하지만, 완벽하지 않으며 configuration parameter를 이용하여 시스템에서 필요한 reliability를 결정해야 함
- reliability ↑ : durability ↑ throughput↓ latency↑ hardware costs↑
2. Replication
- Multiple Replicas를 가지는 구조는 Kafka의 일부에 장애가 발생했을 때도,
-
Reliability와 durability를 제공
- In-Sync Replica
- leader replica
- 아래의 조건을 만족하는 followers
- Zookeeper에 n초안에 heartbeat를 보냄 (zookeeper.session.timeout.ms = 6s)
- n초안에 messages를 leader로부터 fetch (replica.lag.time.max.ms = 10s)
- n초안에 leader에서 가장 최신 데이터를 fetch
-
In-Sync Replica와 성능의 관련성
- in-sync replicas가 조금씩 뒤쳐져 있다면?
- producer와 consumer를 느리게함
- in-sync replicas의 수가 적으면 ?
- partition의 장애나 데이터 손실에 대한 위험성이 더 높아짐
- 참고
- out-of-sync replica가 뒤쳐져 있다면 producer와 consumer의 performance에 영향이 없음
- producer와 consumer가 out-of-sync replica가 messages를 얻기까지 기다리지를 않기 때문
3. Broker Configuration
- Replication Factor
- Unclean Leader Election
- Minimum In-Sync Replicas
3_1. Replication Factor
- topic을 생성할 때 기본 replication 개수
- higher replication factor
- availability ↑ reliability ↑ disaster ↓ disk space ↑
- availability가 중요한 경우에는 3이상을 추천
- replicas의 위치
- 기본적으로 Kafka는 partition의 replicas가 각각 다른 broker에 있는지 확인
- replicas를 다수의 rack에 배치해야 하고 rack이름을 설정
- broker.rack (broker, default = null)
3_2. Unclean Leader Election
- Clean Leader Election
- partition의 leader가 더이상 available할 수 없을 때, in-sync replicas 중 하나가 leader로 선출되는 경우
- Unclean Leader Election
- 모든 replica가 죽으면?
- leader외에 in-sync replicas가 존재하지 않을 때, leader가 사용불가능 하게 되어 new leader를 선출해야 하는 경우에 Unclean Leader Election을 허용할지 말지를 결정할 수 있음
- out-of-sync replica를 leader로 선출할지의 여부
- Disable
- data의 quality와 consistency와 durability가 중요한 경우
- bank system
- Able
- availability가 더 중요한 경우
- real-time click analysis
3_3. Minimum In-Sync Replicas
- in-sync 되어야 하는 replica의 최소 개수
- 이를 충족시키지 못하면
- produce request를 거절 후 ,NotEnoughReplicasException을 수신
- consumer는 존재하는 data를 계속 읽을 수 있음
broker configuration parameter | topic configuration parameter | default value | |
---|---|---|---|
Replication Factor | default.replication.factor | replication.factor | 1 |
Unclean Leader Election | unclean.leader.election.enable | unclean.leader.election.enable | true (0.11.0이후는 false) |
Minimum In-Sync Replicas | min.insync.replicas | min.insync.replicas | 1 |
4. Using Producers in a Reliable System
- Send Acknowledgments
- Error Handling : Configuring Producer Retries
- Error Handling : Additional Error Handling
- Producer가 broker에게 Produce Request를 보내면 성공/Error가 반환
- Configuring Producer Retriesprducer가 자동적으로 retry함으로써 error handle
- Additional Error Handlingdeveloper가 producer library를 사용하여 처리
4_1. Send Acknowledgments
- acks = 0
- producer가 network를 통해 message를 보내기만 하면 성공으로 간주
- latency↓ throughput↑ reliability↓
- acks = 1
- message가 leader replica에 쓰이면 성공으로 간주
- message가 성공적으로 leader에 쓰여서 produce request가 성공으로 간주되었지만, follower에게 복제되기전에 leader에 장애가 발생하면 데이터가 유실됨
- acks = all
- min.insync.replica 값의 in-sync replicas에서 message를 복제하면 성공으로 간주
- latency↑ throughput↓ reliability↑
4_2. Configuring Producer Retries
- Produce Request를 하였을 때 Retriable Error가 발생한다면?
-
messages를 손실하지 않기 위해서 messages를 계속해서 재전송하도록 producer를 configure
- Retires
- producer configuration
- default value : 1
- Retriable Error가 발생했을 때, producer가 자동으로 retry하는 횟수
- Retry 횟수의 선정 방법
- producer가 n번 retry 후에 throw exception을 하였을 때, 수행할 계획에 따라 달라짐
- exception을 catch한 후에 또다시 retry할 것이라면 retries를 늘려야 함
4_3. Additional Error Handling
- Nonretriable broker errors
- Messages를 Broker에 보내기 전에 발생하는 errors
- Nonretriable broker errors (message size, authorization errros 등) broker에 mssages를 보내기 전에 발생하는 errors (serialization error 등)
- Producer가 Retry Attempt를 모두 소비했을 때, Available Memory가 Limit에 도달하는 경우
- developer가 직접 처리해야 함
- Messages를 Broker에 보내기 전에 발생하는 errors
- error handlers를 작성하여 application의 목표에 맞도록 error를 처리
- messages를 drop?
- error를 log?
- local disk의 directory에 message를 store?
- another application으로 callback을 trigger? 등
4_4. Message delivery guarantees
- At most once
- message가 손실되더라도 재배달되지 않음
- At least once
- message가 손실되지 않지만 재배달 될 수 있음
- Exactly once
- message는 딱 한번만 배달 됨
- Message Retry가 발생하는 경우
- broker에 message를 쓰지 못한 경우
- broker가 producer에게 응답하는 과정에서 실패한 경우
- ( = message가 여러번 broker에 쓰여짐 )
- broker에 message가 쓰여지는것이 실패한 경우 외에도 broker가 producer에 응답하는 과정에서 실패할 때도 message를 재전송하게 됨
- At least once (Kafka 0.11.0 version 이전)
- consumer에서 처리
- message를 idempotent(멱등원)로 생성
- Retries 및 careful error handling으로 각 message가 적어도 한 번 저장되는것은 guarantee되지만, message가 정확하게 한 번 저장되는 것을 보장할 수 는 없음
- message마다 unique한 identifier를 사용하여 messages를 consume할 때 중복을 감지하고 정리
- message를 idempotent로 만들어서 같은 message가 2번 보내지더라도 부정적인 영향이 없도록 함
- Exactly once (Kafka 0.11.0 version 이후)
- producer가 idempotent delivery option을 지원
- broker는 각 producer에게 ID를 assing하고, producer가 모든 message를 sequence number와 함께 전송하여 duplicate를 제거
- producer는 transcation과 같은 semantics를 이용하여 multiple topic partitions에 messages를 보내는 기능을 지원하므로, 모든 messages가 write되거나 write되지 않음
5. Using Consumer in a Reliable System
5_1. Commited Offsets
- 특정 partition의 특정 offset까지의 모든 messages를 받고 처리했음을 의미
- offsets을 commit하는 시기와 방법에 주의해야 함
- 읽고 아직 처리하지 않은 offsets을 commit하면 데이터 유실이 발생할 수 있음
5_2. Consumer Configuration
- group.id (“”)
- 해당 topic의 모든 messages는 전체 group에 의해 나뉘어져 읽어짐
- offset.reset (latest)
- earlist : 중복처리의 가능성↑ reliable↑
- latest : 중복처리의 가능성↓ reliable↓
- enable.auto.commit (true)
- autocommit을 사용하면 : 중복처리의 가능성 존재, 데이터 처리가 보장되지 않음
- consumer가 일부 records를 처리하고 중단이 되면 commit이 되기 전이기 때문에 처리된 일부 records가 재처리됨
- consumer poll loop안에서 consume된 모든 record를 처리하면 처리하지 않은 offset이 commit되지 않음
- auto.commit.interval.ms (5000)
- commit frequency가 증가하면 : overhead↑, 중복처리 데이터 개수↓
- Commit은 언제하나?
- 데이터가 처리된 후에 offsets을 commit
- Consumer가 종료하기 전에 partitions의 offset을 commit하고, 해당 partitions이 reassign될 때는 기존의 상태를 가져와야 함
- Commit의 빈도는?
- Commit의 빈도를 performance와 중복처리간의 trade-off를 고려하여 선정
- commit의 빈도가 높아지면 성능이 줄어들지만, 장애발생시 중복처리되는 데이터의 개수가 줄어듬
- commit의 빈도가 낮아지면 성능은 높아지지만, 장애발생시 중복처리되는 데이터의 개수가 늘어남
- poll loop안에서 여러번 commit하거나, 여러개의 loops에 대해 한번만 commit하도록 구성할 수 있음
- Consumer의 Retry가 필요한 경우
- buffer를 이용하여 retry
- retry topic을 이용하여 retry
- Consumer는 상태를 유지해야 할 수도 있음
- offset을 commit하는 동시에 state를 별도의 topic에 기록
- consumer가 재시작될 때 해당 tpic에서 state를 가져와서 중단된 부분부터 재시작
- Kafka 0.11.0 이전에서는 transaction이 제공되지 않기 때문에 데이터 중복처리 혹은 유실 가능성 존재
- (Kafka 0.11.0 이후에서는) transcation을 지원
- 긴 데이터 처리 시간이 걸리는 경우
- Kafka 0.10.1 이전에서는 broker에게 heartbeats를 보내기 위해 poll을 계속 해야함
- multiple threads로 병렬 처리
- worker threads로 data를 전달 한 후 work thread가 끝날때까지 pause, 끝나면 resume
- Exactly-once delivery
- Kafka 0.11.0 이전에서는 지원되지 않으므로 직접 처리
- Unique key를 지원하는 시스템에 결과를 저장
- transaction이 있는 시스템에서 records와 offsets을 저장
- 참고
- Kafka 0.10.1 이전 version에서는 추가적인 records를 처리하지 않으려고 하더라도, broker에게 heartbeats를 보내기 위해 poll을 계속 해야 함
-
데이터 처리시간이 너무 오래 걸리면 broker에게 heartbeats를 보낼 수 없어 rebalance가 trigger됨 - consumer를 pause하면 additional data의 fetch없이 poll이 유지됨
- Kafka 0.11.0 이전 version에서는, exactly-once를 지원을 제공하지는 않지만 Kafka가 정확하게 한 번 외부 시스템에 쓰여지도록 보장하는 few trick을 사용할 수 있음
- unique keys를 지원하는 system에 results를 write (idempotent(멱등원) write)
- record를 unique key와 함께 value로 쓰고, 나중에 실수로 같은 record를 다시 consume하면 정확하게 같은 key와 value를 write하게 됨.
- data store는 기존의 것이 override되고 중복처리되지 않은 경우와 동일한 결과를 얻음
- transactions이 있는 system에서 records와 offsets을 write
- 동일한 transcation에서 records와 offsets을 write하여 in-sync되도록 함
- 시작 시, external store에 write되어 있는 최신 records의 offsets를 retrieve(검색)한 후 consumer.seek()를 사용하여 해당 offset에서 consume을 시작
- unique keys를 지원하는 system에 results를 write (idempotent(멱등원) write)
Subscribe via RSS