Kafka Consumer
by Jaesang Lim
Consumer And Consumer Group
- Consumer Group ?
- Consumer Instance를 대표하는 그룹
- Consumer Instance ?
- 하나의 프로세스 또는 하나의 서버라고 볼 수 있음
- Offset?
- Partition 안에 데이터 위치를 Unique 한 숫자로 표시한 것
- Consumer는 자신이 어디까지 데이터를 읽었는지 offset으로 관리
Consumer Group이 나온 이유는 ?..
- High Avaliability
- Consumer에 대한 구분 및 Group 별 Offset 관리
Consumer Group 과 Topic의 Partition과의 관계
Partition Rebalance
- Rebalance
- Partition의 ownership이 다른 consumer로 바뀌는 것
- 발생 시점
- Consumer 추가 / 완료 / 죽음
- Partition 추가
- Rebalance 진행 시, 다른 Consumer들은 Block
- Group Coordination Protocol : Heartbeat Mechanism
- 모든 consumer는 group coordinator에게 heartbeat를 전송하여 자신이 살아있다는 것을 알림
- heartbeat을 특정 시간까지 받지 못한다면, group Coordinator는 consumer가 죽었다고 간주 후
- 특정 시간 ( session.timeout.ms : 30s (default ) )
- 살아있는 consumer 에게 다시 partition을 재배치함 ( = Rebalance )
- consumer가 Poll() 호출 시에만 , heartbeat 전송!
- group coordinator 란 ?
- broker 서버 중 하나이며, Consumer Group마다 Group coordinator가 다름
- group coordinator 란 ?
Createing a Kafka Consumer
1. Create KafkaConsumer Instance
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
2. Subscribe Topics (한개 이상)
consumer.subscribe(Collections.singletonList("customerCountries"));
3. Polling
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
log.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4))
}
}
} finally {
consumer.close();
}
Configuring Consumer
- fetch.min.bytes
- fetch.max.wait.ms
- max.partition.fetch.bytes
- session.timeout.ms
- auto.offset.reset
- enable.auto.commit
- partition.assignment.strategy
- client.id
- max.poll.records
- receive.buffer.bytes
- send.buffer.bytes
RangeAssignor VS RoundRobinAssignor
Commits and Offsets
- Poll() 호출 시, Consumer가 아직 처리하지 않은 레코드를 가져옴.
- 처리하지 않은 레코드를 추적하는 방법은 Offset
- 어디까지 처리했는지 기록하는 작업을 Commit
- offset를 commit 하는 일은 매우 중요함
- 데이터 중복 및 유실을 방지할 수 있음
방법론
- Automatic Commit
- Synchronous Commit
- Asynchronous Commit
- Commit Specified Offset
1. Automatic Commit
- enable.auto.commit= true ( default )
- 5초마다 poll() 해서 받은 레코드의 가장 큰 Offset를 Commit 함
- auto.commit.interval.ms 로 시간 조정 가능 ( default : 5s )
- poll할 때마다, Consumer는 지금 commit할 시간인가를 체크하고 맞다면, 최근의 poll에서 얻은 레코드의 가장 큰 offset를 commit 함
- 편함!
- 3초 후에 Rebalance 발생. 3초 간의 발생한 레코드에 대한 중복 처리 발생
- interval를 줄이더라도 데이터의 중복처리는 해결하기 어려움
2. Synchronous Commit
- 데이터 유실 및 중복처리를 막기 위해, 시간 기반의 Commit이 아닌,
- 현재 offset를 commit 할 수 있도록 기능을 제공
- 성공할 수 없거나 복구 할 수 없는 실패가 발생할 때까지 커밋을 다시 시도
- 단점
- commit request에 대한 broker의 response가 올 때까지 Blocking
- throughput 감소의 원인
- commitSync()
- auto.commit.offset = false
- 최근 Poll()에 의해 반환된 레코드의 가장 큰 offset을 commit, 실패 시, exception 발생
- 즉, Poll()에 의해 반환된 레코드를 다 처리한 후에, commitSync() 호출해야 데이터 유실 를 막을 수 있음
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset =%d, customer = %s, country = %s\n",
record.topic(), record.partition(),record.offset(), record.key(), record.value());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}
3. Asynchronous Commit
- CommitAsync()
- Synchronous Commit 의 Blocking 단점을 극복하기 위함
- Broker에게 commit request 후, Blocking 하지 않음
- Callback 함수 제공
- 단점
- 실패하더라도 Retry 하지 않음
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset =%d, customer = %s, country = %s\n",
record.topic(), record.partition(),record.offset(), record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition,OffsetAndMetadata> offsets, Exception exception) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
});
}
4. Combining Commit
- CommitAsync() 와 CommitSync()를 동시에 사용
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset =%d, customer = %s, country = %s\n",
record.topic(), record.partition(),record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
5.Commit Specified Offset
- 현재까지 모든 데이터를 처리된 후에 Commit를 했지만 처리하는 과정에서도 Commit을 할 수 있음
- Poll에 의해 반환된 레코드 수가 커서, 중간 중간 커밋을 통해 안전성을 보장받기 위함
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
....
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset =%d, customer = %s, country = %s\n",
record.topic(), record.partition(),record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(),
record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
if (count % 1000 == 0)
consumer.commitAsync(currentOffsets, null);
count++;
}
}
Rebalance Listeners
- Consumer가 partition에 대한 ownership를 잃을 때 , 가장 최근에 처리한 레코드의 offset를 commit 하는 것이 가장 이상적
- partition이 추가되거나, 또는 삭제 되었을 때, ConsumerRebalanceListener 클래스의 2개의 함수 호출
- public void onPartitionsAssigned()
- Consumer가 consuming 하기 전에 호출
- public void onPartitionsRevoked()
- Rebalance 시작 전 과 consumer가 consuming을 멈춘 후에 호출
- 이 시점에 commit 하면 이상적
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
private class HandleRebalance implements ConsumerRebalanceListener {
// Consumer가 Partition을 Consuming 하기 바로 직전에 호출!!
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
// Rabalance가 발생하기 바로 직전에 호출 !!
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}
//위에 정의한 Class를 Subscribe의 parameter로 등록하면 끝!!
consumer.subscribe(topics, new HandleRebalance());
이 글은 Kafka: The Definitive Guide 원서를 읽고 정리한 내용입니다 :)
Subscribe via RSS