Kafka Produer
by Jaesang Lim
Producer
- 카프카 클러스터로 레코드들을 발행하는 카프카 클라이언트
- 프로듀서는 Thread-safe
- 일반적으로 여러개의 스레드가 프로듀서를 공유하는것이 더 빠르다.
메시지 전송 방식
- Fire-And-Forget
- Synchronous send
- Asynchronous send
Constructomg Kafka Producer
- bootstrap.servers
- key.serializer
- value.serializer
KafkaProducer<String, String> producer = null;
Map<String, Object> configMap = new HashMap<String, Object>();
configMap.put("bootstrap.servers", BROKER_LIST);
configMap.put("key.serializer", KEY_SERIALIZER_CLASS);
configMap.put("value.serializer", VALUE_SERIALIZER_CLASS);
producer = new KafkaProducer<String, String>(configMap);
Sending a Message to Kafka
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products","France");
// Topic : CustomerCountry
// Key : Precision Products
// Value : France
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
발생가능 예외
- SerializationException
- BufferExhaustedException
- TimeoutException
- InterruptException
Sending a Message to Synchronously
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products","France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
발생가능 예외
- nonretriable exceptions
- exhausted the available retries
Sending a Message to Asynchronously
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// Kafka가 성공하면 null / 아니면 Execption
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products","France");
try {
producer.send(record , new DemoProducerCallback());
} catch (Exception e) {
e.printStackTrace();
}
Configuring Producer
- acks
- 0, 1, all
- 1 = 리더 복제본이 데이터를 받으면 성공 응답 반환, 리더가 쓰기 불가능한경우 실패 반환
- buffer.memory
- 프로듀서에서 브로커로의 전송 대기중인 메시지를 버퍼링 하는 메모리양
- block.on.buffer.full 파라미터에 따라 차단되거나 예외가 발생될 수 있음.
- compression.type
- snappy, gzip, lz4
- 브로커로 전송전 압축
- 네트워크 ,스토리지 사용량을 줄일 수 있다. 가끔 병목구간이 되기도 함.
- retries
- 브로커로의 메세지 전송시 실패시 최대 재전송 시도 횟수
- retry.backoff.ms : 재전송 간 대기시간 (복구 시간 테스트 후 고려)
- 일부 오류는 재시도 하지 않음(전송 가능 메시지 용량 초과)
- 브로커로의 메세지 전송시 실패시 최대 재전송 시도 횟수
- max.in.flight.requests.per.connection
- 커넥션당 최대 동시 가능 요청 개수,
- 데이터의 정렬 필수인 경우 1 고려
- timeout.ms
- 동기화(ack config 만족) 확인 대기시간
- request.timeout.ms
- 전송 시 서버로의 응답 대기시간
- metadata.fetch.timeout.ms
- 파티션에 대한 메타데이터(리더정보 등) 요청 대기시간
- max.block.ms
- send(), partitionFor() (메타데이터) 송신 버퍼가 가득 차거나, 메타데이터 사용불가능한 상황을 처리
- max.request.size
- (프로듀서 쪽 설정) 전송가능 최대 메시지 크기
- message.max.bytes
- (브로커 쪽 설정) 두 값을 일치하는 것을 권장함
Paritions
- Key : 메시지의 추가 정보, 토픽의 파티션 결정 (nullable)
- 동일한 키는 동일한 파티션으로 이동 (읽기 정책으로 활용가능)
- Key == null && default partitioner : 무작위(random)의 파티션에 전송
- 파티션에 대한 메시지의 매핑은 파티션 수가 변경되면 달라질수 있다.
- partitioning key가 중요할때 충분한 수의 파티션을 만드는 것을 권함.
Subscribe via RSS