Consumer And Consumer Group

  • Consumer Group ?
    • Consumer Instance를 대표하는 그룹
  • Consumer Instance ?
    • 하나의 프로세스 또는 하나의 서버라고 볼 수 있음
  • Offset?
    • Partition 안에 데이터 위치를 Unique 한 숫자로 표시한 것
    • Consumer는 자신이 어디까지 데이터를 읽었는지 offset으로 관리

Consumer Group이 나온 이유는 ?..

  1. High Avaliability
  2. Consumer에 대한 구분 및 Group 별 Offset 관리

Consumer Group 과 Topic의 Partition과의 관계

Partition Rebalance

  • Rebalance
    • Partition의 ownership이 다른 consumer로 바뀌는 것
    • 발생 시점
      • Consumer 추가 / 완료 / 죽음
      • Partition 추가
    • Rebalance 진행 시, 다른 Consumer들은 Block
  • Group Coordination Protocol : Heartbeat Mechanism
    • 모든 consumer는 group coordinator에게 heartbeat를 전송하여 자신이 살아있다는 것을 알림
    • heartbeat을 특정 시간까지 받지 못한다면, group Coordinator는 consumer가 죽었다고 간주 후
      • 특정 시간 ( session.timeout.ms : 30s (default ) )
    • 살아있는 consumer 에게 다시 partition을 재배치함 ( = Rebalance )
    • consumer가 Poll() 호출 시에만 , heartbeat 전송!
      • group coordinator 란 ?
        • broker 서버 중 하나이며, Consumer Group마다 Group coordinator가 다름

Createing a Kafka Consumer

1. Create KafkaConsumer Instance
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
2. Subscribe Topics (한개 이상)
consumer.subscribe(Collections.singletonList("customerCountries"));
3. Polling
try {
	while (true) {
		ConsumerRecords<String, String> records = consumer.poll(100);
		for (ConsumerRecord<String, String> record : records)
		{
			log.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
						record.topic(), record.partition(), record.offset(),record.key(), record.value());
			
            int updatedCount = 1;
			if (custCountryMap.countainsValue(record.value())) {
				updatedCount = custCountryMap.get(record.value()) + 1;
			}
			
            custCountryMap.put(record.value(), updatedCount)
			JSONObject json = new JSONObject(custCountryMap);
			System.out.println(json.toString(4))
		}
	}
} finally {
	consumer.close();
}

Configuring Consumer

  • fetch.min.bytes
  • fetch.max.wait.ms
  • max.partition.fetch.bytes
  • session.timeout.ms
  • auto.offset.reset
  • enable.auto.commit
  • partition.assignment.strategy
  • client.id
  • max.poll.records
  • receive.buffer.bytes
  • send.buffer.bytes

RangeAssignor VS RoundRobinAssignor


Commits and Offsets

  • Poll() 호출 시, Consumer가 아직 처리하지 않은 레코드를 가져옴.
  • 처리하지 않은 레코드를 추적하는 방법은 Offset
  • 어디까지 처리했는지 기록하는 작업을 Commit
  • offset를 commit 하는 일은 매우 중요함
  • 데이터 중복 및 유실을 방지할 수 있음
방법론
  1. Automatic Commit
  2. Synchronous Commit
  3. Asynchronous Commit
  4. Commit Specified Offset
1. Automatic Commit
  • enable.auto.commit= true ( default )
  • 5초마다 poll() 해서 받은 레코드의 가장 큰 Offset를 Commit 함
  • auto.commit.interval.ms 로 시간 조정 가능 ( default : 5s )
  • poll할 때마다, Consumer는 지금 commit할 시간인가를 체크하고 맞다면, 최근의 poll에서 얻은 레코드의 가장 큰 offset를 commit 함
  • 편함!
  • 3초 후에 Rebalance 발생. 3초 간의 발생한 레코드에 대한 중복 처리 발생
  • interval를 줄이더라도 데이터의 중복처리는 해결하기 어려움
2. Synchronous Commit
  • 데이터 유실 및 중복처리를 막기 위해, 시간 기반의 Commit이 아닌,
  • 현재 offset를 commit 할 수 있도록 기능을 제공
  • 성공할 수 없거나 복구 할 수 없는 실패가 발생할 때까지 커밋을 다시 시도
  • 단점
    • commit request에 대한 broker의 response가 올 때까지 Blocking
    • throughput 감소의 원인
  • commitSync()
    • auto.commit.offset = false
    • 최근 Poll()에 의해 반환된 레코드의 가장 큰 offset을 commit, 실패 시, exception 발생
    • 즉, Poll()에 의해 반환된 레코드를 다 처리한 후에, commitSync() 호출해야 데이터 유실 를 막을 수 있음
while (true) {
	ConsumerRecords<String, String> records = consumer.poll(100);
	for (ConsumerRecord<String, String> record : records)
	{
		System.out.printf("topic = %s, partition = %s, offset =%d, customer = %s, country = %s\n",
				  	 	record.topic(), record.partition(),record.offset(), record.key(), record.value());
	}

	try {
		consumer.commitSync();
	} catch (CommitFailedException e) {
		log.error("commit failed", e)
	}
}
3. Asynchronous Commit
  • CommitAsync()
    • Synchronous Commit 의 Blocking 단점을 극복하기 위함
    • Broker에게 commit request 후, Blocking 하지 않음
    • Callback 함수 제공
  • 단점
    • 실패하더라도 Retry 하지 않음
while (true) {
	ConsumerRecords<String, String> records = consumer.poll(100);
	for (ConsumerRecord<String, String> record : records)
	{
		System.out.printf("topic = %s, partition = %s, offset =%d, customer = %s, country = %s\n",
				  	 	record.topic(), record.partition(),record.offset(), record.key(), record.value());
	}

	consumer.commitAsync(new OffsetCommitCallback() {
			public void onComplete(Map<TopicPartition,OffsetAndMetadata> offsets, Exception exception) {
				if (e != null)
					log.error("Commit failed for offsets {}", offsets, e);
			}
	});
}
4. Combining Commit
  • CommitAsync() 와 CommitSync()를 동시에 사용
try {
	while (true) {
		ConsumerRecords<String, String> records = consumer.poll(100);
		for (ConsumerRecord<String, String> record : records)
		{
			System.out.printf("topic = %s, partition = %s, offset =%d, customer = %s, country = %s\n",
				  	 	record.topic(), record.partition(),record.offset(), record.key(), record.value());
		}
		consumer.commitAsync();
	}
} catch (Exception e) {
	log.error("Unexpected error", e);
} finally {
	try {
		consumer.commitSync();
	} finally {
	consumer.close();
	}
}
5.Commit Specified Offset
  • 현재까지 모든 데이터를 처리된 후에 Commit를 했지만 처리하는 과정에서도 Commit을 할 수 있음
  • Poll에 의해 반환된 레코드 수가 커서, 중간 중간 커밋을 통해 안전성을 보장받기 위함
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
....
while (true) {
	ConsumerRecords<String, String> records = consumer.poll(100);
	for (ConsumerRecord<String, String> record : records)
	{
		System.out.printf("topic = %s, partition = %s, offset =%d, customer = %s, country = %s\n",
				  	 	record.topic(), record.partition(),record.offset(), record.key(), record.value());
	
    currentOffsets.put(new TopicPartition(record.topic(),
							record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
	if (count % 1000 == 0)
		consumer.commitAsync(currentOffsets, null);
	count++;
	}
}

Rebalance Listeners

  • Consumer가 partition에 대한 ownership를 잃을 때 , 가장 최근에 처리한 레코드의 offset를 commit 하는 것이 가장 이상적
  • partition이 추가되거나, 또는 삭제 되었을 때, ConsumerRebalanceListener 클래스의 2개의 함수 호출
  • public void onPartitionsAssigned()
    • Consumer가 consuming 하기 전에 호출
  • public void onPartitionsRevoked()
    • Rebalance 시작 전 과 consumer가 consuming을 멈춘 후에 호출
    • 이 시점에 commit 하면 이상적
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener {
	// Consumer가 Partition을 Consuming 하기 바로 직전에 호출!!
	public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
	
    }

	// Rabalance가 발생하기 바로 직전에 호출 !!
	public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
		System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets);
		consumer.commitSync(currentOffsets);
	}
}

//위에 정의한 Class를 Subscribe의 parameter로 등록하면 끝!!
consumer.subscribe(topics, new HandleRebalance());

이 글은 Kafka: The Definitive Guide 원서를 읽고 정리한 내용입니다 :)