Producer

  • 카프카 클러스터로 레코드들을 발행하는 카프카 클라이언트
  • 프로듀서는 Thread-safe
  • 일반적으로 여러개의 스레드가 프로듀서를 공유하는것이 더 빠르다.

메시지 전송 방식

  1. Fire-And-Forget
  2. Synchronous send
  3. Asynchronous send

Constructomg Kafka Producer

  • bootstrap.servers
  • key.serializer
  • value.serializer
KafkaProducer<String, String> producer = null;

Map<String, Object> configMap = new HashMap<String, Object>();
configMap.put("bootstrap.servers", BROKER_LIST);
configMap.put("key.serializer", KEY_SERIALIZER_CLASS);
configMap.put("value.serializer", VALUE_SERIALIZER_CLASS);
producer = new KafkaProducer<String, String>(configMap);

Sending a Message to Kafka

ProducerRecord<String, String> record =
	new ProducerRecord<>("CustomerCountry", "Precision Products","France");
    // Topic : CustomerCountry
    // Key : Precision Products
    // Value : France
try {
	producer.send(record);
} catch (Exception e) {
	e.printStackTrace();
}

발생가능 예외

  • SerializationException
  • BufferExhaustedException
  • TimeoutException
  • InterruptException

Sending a Message to Synchronously

ProducerRecord<String, String> record =
	new ProducerRecord<>("CustomerCountry", "Precision Products","France");
try {
	producer.send(record).get();
} catch (Exception e) {
	e.printStackTrace();
}

발생가능 예외

  • nonretriable exceptions
  • exhausted the available retries

Sending a Message to Asynchronously


private class DemoProducerCallback implements Callback {
	@Override
	public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    // Kafka가 성공하면 null / 아니면 Execption
	if (e != null) {
		e.printStackTrace();
	}
	}
}

ProducerRecord<String, String> record =
	new ProducerRecord<>("CustomerCountry", "Precision Products","France");
try {
	producer.send(record , new DemoProducerCallback());
} catch (Exception e) {
	e.printStackTrace();
}

Configuring Producer

  • acks
    • 0, 1, all
    • 1 = 리더 복제본이 데이터를 받으면 성공 응답 반환, 리더가 쓰기 불가능한경우 실패 반환
  • buffer.memory
    • 프로듀서에서 브로커로의 전송 대기중인 메시지를 버퍼링 하는 메모리양
    • block.on.buffer.full 파라미터에 따라 차단되거나 예외가 발생될 수 있음.
  • compression.type
    • snappy, gzip, lz4
    • 브로커로 전송전 압축
    • 네트워크 ,스토리지 사용량을 줄일 수 있다. 가끔 병목구간이 되기도 함.
  • retries
    • 브로커로의 메세지 전송시 실패시 최대 재전송 시도 횟수
      • retry.backoff.ms : 재전송 간 대기시간 (복구 시간 테스트 후 고려)
    • 일부 오류는 재시도 하지 않음(전송 가능 메시지 용량 초과)
  • max.in.flight.requests.per.connection
    • 커넥션당 최대 동시 가능 요청 개수,
    • 데이터의 정렬 필수인 경우 1 고려
  • timeout.ms
    • 동기화(ack config 만족) 확인 대기시간
  • request.timeout.ms
    • 전송 시 서버로의 응답 대기시간
  • metadata.fetch.timeout.ms
    • 파티션에 대한 메타데이터(리더정보 등) 요청 대기시간
  • max.block.ms
    • send(), partitionFor() (메타데이터) 송신 버퍼가 가득 차거나, 메타데이터 사용불가능한 상황을 처리
  • max.request.size
    • (프로듀서 쪽 설정) 전송가능 최대 메시지 크기
  • message.max.bytes
    • (브로커 쪽 설정) 두 값을 일치하는 것을 권장함

Paritions

  • Key : 메시지의 추가 정보, 토픽의 파티션 결정 (nullable)
    • 동일한 키는 동일한 파티션으로 이동 (읽기 정책으로 활용가능)
    • Key == null && default partitioner : 무작위(random)의 파티션에 전송
  • 파티션에 대한 메시지의 매핑은 파티션 수가 변경되면 달라질수 있다.
    • partitioning key가 중요할때 충분한 수의 파티션을 만드는 것을 권함.