Spark Streaming With Kafka Direct API
by Jaesang Lim
Spark Streaming With Kafka Direct API
로직 설명
KafkaUtils.createDirectStream[String, String](
sparkStreamContext,
PreferConsistent,
Subscribe[String, String](ESTAT_TOPIC, kafkaParams) )
1. createDirectStream
- Return Type : InputDStream[ConsumerRecord[K, V]] using DirectKafkaInputDStream
new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy, perPartitionConfig)
2. LocationStragegy
Kafka에서 받아온 파티션을 Exeuctor에게 어떻게 규칙으로 할당할 것인가에 대한 전략 !
- PreferBrokers
- Kakfa 브로커와 동일한 노드에 Executor를 실행할 때
- 나는 Kafka 클러스터가 따로 구성되어 있기 때문에, 이 옵션을 쓸 일은 없다
- PreferConsistent
- Executor에게 일관되게 Parition을 분배
- 현재 내가 사요하는 전략이며 대부분 이렇게 한다고 코드 주석에도 적혀 있다.
- PreferFixed
- 파티션을 특정 Executor에가 할당하는 법
코드 주석에 이미 잘 설명되어 있음
* Choice of how to schedule consumers for a given TopicPartition on an executor.
* See [[LocationStrategies]] to obtain instances.
* Kafka 0.10 consumers prefetch messages, so it's important for performance
* to keep cached consumers on appropriate executors, not recreate them for every partition.
* Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere.
@Experimental
object LocationStrategies {
/**
* :: Experimental ::
* Use this only if your executors are on the same nodes as your Kafka brokers.
*/
@Experimental
def PreferBrokers: LocationStrategy =
org.apache.spark.streaming.kafka010.PreferBrokers
/**
* :: Experimental ::
* Use this in most cases, it will consistently distribute partitions across all executors.
*/
@Experimental
def PreferConsistent: LocationStrategy =
org.apache.spark.streaming.kafka010.PreferConsistent
/**
* :: Experimental ::
* Use this to place particular TopicPartitions on particular hosts if your load is uneven.
* Any TopicPartition not specified in the map will use a consistent location.
*/
@Experimental
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =
new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
/**
* :: Experimental ::
* Use this to place particular TopicPartitions on particular hosts if your load is uneven.
* Any TopicPartition not specified in the map will use a consistent location.
*/
@Experimental
def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
new PreferFixed(hostMap)
}
3. ConsumerStrategy[K,V] ( subscribe / subscribepattern / Assgin)
- Choice of how to create and configure underlying Kafka Consumers on driver and executors.
Subscribe[String, String](TOPIC_NAME, kafkaParams) )
Optimizing Spark Streaming applications reading data from Apache Kafka
-
Spark Streaming , Flink, Storm , Kafka Stream과 같이 널리 사용되는 framework in real time processing
-
하지만 성능 문제, 이벤트 마다 처리가 아닌 Time Window로 처리 = 그로 인한 딜레이 존재
Kafka direct implementation
- Recevier Based
- Receiver에 기반한 구현은 병렬화가 덜되고 TLS 보안과 호환되지 않음
- 프로세스를 병렬화하려면 여러 주제를 읽는 여러 개의 DStream을 만들어야함
- kafka의 데이터가 오직 하나의 executor에 의해서만 수신된다면,이 데이터는 Spark의 Block Manager에 저장 될 것이고 executor가 한 Transformation에서 그 데이터를 사용할 것..
- 스트리밍 응용 프로그램의 HA를 얻으려면 Checkpointing을 활성화해야함
- Direct API
- 모든 Spark Executor는 Kafka로부터 데이터를받을 수 있기 때문에 직접 구현은 완전히 병렬화가 가능!
- 정보가 하나 이상의 Topic에서 오는 것인지는 중요하지 않음
- Kafka에 자체적인 오프셋 관리하기 때문에 ,Checkpointing을 활성화 할 필요가 없음
Subscribe via RSS