Mastering Spark in Scalable Algorithms
by Jaesang Lim
Mastering Spark in Scalable Algorithms
- Spark의 아키텍쳐와, 문제상황에서 어떻게 해결할까에 대한 디자인패턴에 대해 다룰 것~~
Spark job을 어떻게 만들 것인가에 대한 기본 원칙
- 가능한 data locality를 지킬 것
- Spark에서 기본적으로 해주므로 걱정할 필요 없지만 확인할 것
- 각 stage에서 불필요하게 데이터를 이동시키지 않는가?
- 데이터의 균등 분배를 보장할 것
- 놀고 있는 executor 없도록 적절하게 데이터 분배
- 더 빠른 저장소를 선호
- L1 cache > L2 cache > Main memory > Disk(random seek)
- Spark에서 in-memory 처리 기능도 제공
- 불필요하게 main memory나 disk 쓰지말고 빠른 cache 활용해도 좋음(Project Tungsten)
- 지켜본 후 최적화할 것
- Donald Knuth왈, “어설픈 최적화는 모든 악의 근원이다.”
- evidence-based로 접근하라
- 작게 시작해서 크게 늘릴 것
Spark Arcitecture
- Driver
- Spark의 main entry point
- single JVM
- job 시작 및 모든 operation 제어
- 너무 큰 데이터셋을 driver로 가져오지 말것 (rdd.collect) -> OOM
- 필요하다면 JVM heap 사이즈를 늘려줄 것 (–driver-memory)
- SparkSession
- driver가 시작할 때 SparkSession class가 초기화됨
- SQLContext, SparkContext, StreamContext class 등을 통해 모든 Spark service들에 접근 제공
- Spark runtime performance-related 설정 튜닝하는 위치
- Resilient distributed datasets (RDDs)
- 동종 데이터의 분산 셋에 대한 추상화된 표현
- 실제 데이터는 클러스터 내 여러 노드에 저장되지만 분석 시 실제 위치를 알 필요는 없음. RDD활용하면 됨
- RDD는 케이크 조각처럼 partition들로 구성되어 있고, 각 partition들은 복제됨(Spark가 data locality를 고려하여 결정), -newHadoopRDD Github
- RDD는 HDFS같은 block storage로 부터 데이터가 제대로 cache되는 것을 보장할 책임이 있음
- Executor
- 일꾼 노드
- 각 executor는 driver와 연결되어 있고 데이터에 대한 작업을 실행하기 위한 지침을 기다림
- Shuffle operation
- executor간 물리적 데이터 전송
- 주로 같은 키로 이루어진 데이터 그룹 이동
- 전략적으로 더 높은 병렬도를 위한 data repartition
- 네트워크를 통한 데이터 이동, 데이터를 디스크에 저장해야 하므로 매우 느림
- scalability에 매우 중요한 부분이라 할 수 있음
- Cluster manager
- Spark 외부에서 cluster의 리소스 교섭자(negotiator) 역할을 함
- executor의 코어 개수나 메모리 할당
- cluster manager는 여러가지 있으나 algorithmic 성능에 크게 영향을 줄 가능성은 낮음
- Task
- Data의 single partition에 대한 일련의 작업을 실행하기 위한 instruction
- Processing을 데이터로 이동시킨 것
- DAG
- Action에 대한 모든 transformations의 논리적 실행 계획(execution plan)
- SparkSQL, Datasets의 최적화는 catalyst optimizer가 대신함
- DAG scheduler
- physical plan 만듦
- DAG에서 stage로 나누고, 각 stage는 그와 대응되는 일련의 task(각 partition당 하나)를 생성함
- Transformation
- 기존의 RDD data를 변경하여 새로운 RDD data를 생성해내는 것.
- filter와 같이 특정 data만 뽑아 내거나 map 함수 처럼, data를 분산 배치 하는 것 등을 들 수 있다. 참고
- Action
- RDD 값을 기반으로 무엇인가를 계산해서(computation) 결과를 (셋이 아닌) 생성해 내는 것
- count()와 같은 operation들이 있음
- Stage
- 물리적으로 task가 매핑될 수 있는 작업의 그룹 (partition 당 하나)
- 연속된 narrow transformation은 single stage. 동일한 executor에서 작업이 가능하므로 shuffle이 필요없음
- wide transformation을 만나면 stage boundary가 발생하고, 2개의 stage가 존재할 때 첫번째 stage가 완료될 때까지 두번째 stage는 시작될 수 없음
- Task scheduler
- 일련의 task(DAG scheduler에 의해 결정된)를 전달받고(partition당 하나)
- data locality와 함께 적절한 executor에서 각각 실행되도록 스케줄함
Tune your analytic
- Spark UI 확인할 것
- 리소스 병목 확인, 코드에서 어느 부분이 시간을 많이 잡아먹고 있는가
- Input Size or Shuffle Read Size/Records
- task별 data read 양 (remote나 local 관계없이)
- 너무 많다면 executor를 늘리거나 repartition 고려할 것
- Duration
- task가 실행된 시간
- 만약 input size가 적은데 duration이 길다면, CPU-bound일 수 있음
- Thread dump 활용해서 어디서 시간 잡아먹는 지 확인
- Spark UI에서 보면 min, 25%, median, max 등등 수치 있는데 그 수치의 variance 확인할 것
- CPU-bound, I/O-bound, Memory-Bound 에 대한 설명
- Shuffle Write Size/Records
- 가능한 적게
- Locality Level
- Stage 페이지에 있는 data locality 수치
- PROCESS_LOCAL 제일 좋음
- 보통은 도움 안되지만 narrow transformation에서 NODE_LOCAL 이나 RACK_LOCAL이 많다면 executor의 수를 늘려봐라
- GC Time
- 각 task 별로 GC에 소요되는 시간
- 전체 시간에 10프로 미만 권장
- 너무 높다면 뭔가 근본적인 문제가 있다는 것임
- GC문제라기 보다는 data distribution과 관련된 부분을 review해 볼 것
- executor 수, JVM heap size, 파티션 수, 병렬도, 데이터 치우침(skew) 등등
- Thread dump (per executor)
- Executor 페이지 확인
- executor 내부 작업 엿보기
- 정렬, interested thread를 상단에 리스팅(Executor task launch worker)
- Skipped Stages
- 실행할 필요없는 stage
- RDD가 cache되어 있어서 재 연산할 필요없다는 의미임
- 일반적으로 good caching strategy의 sign
- Event Timeline
- Stages 페이지에서 실행중인 task의 timeline 제공
- 병렬도 수준 확인, 주어진 시간에 executor당 몇개의 task가 실행되는 지 확인
Design patterns and techniques
1. Spark APIs
- Problem
- API와 function들이 너무 많아서 어떤 것이 가장 성능이 좋은 지 알기 어려움
- Solution
- Off-heap explicit memory management, cache-miss improvement, dynamic stage generation 같은 최근 최적화(project tungsten) 기술은 DataFrame과 Dataset만 지원됨.
- RDD는 안됨
- 새롭게 소개된 Encoder 역시 Kryo Serialization이나 Java serialization 보다 빠르고 공간 효율성도 좋음
personDS.groupBy($"name").count.sort($"count".desc).show
// DS 36 seconds
personRDD.map(p => (p.person,1)).reduceByKey(_+_).sortBy(_._2.false)
// RDD 99 seconds
2. Summary pattern
- Problem
- 엄격한 SLA(service level agreements)가 있을 경우 전체 데이터를 가지고 계산할 시간적 여유가 없음
- Solution
- Summary pattern은 two-pass algorithms
- 전체 데이터를 직접 처리하지 않지만, aggregation의 결과가 전체 데이터에서 실행했을 때와 같도록 만듦
- 적절한 구간의 summary를 계산(분 당, 일 당, 주 당 등등)
- 나중에 사용하기 위해 summary데이터 저장
- 더 큰 구간에 대한 aggregate를 계산
- 스트리밍 분석을 위한 incremental 혹은 online algorithm에 유용함
4. Expand and Conquer pattern
- Problem
- 상대적으로 적은 수의 task, 각 task는 높은 input/Shuffle Size
- task 완료시간이 오래걸리며, 때로는 idle executor가 생기는 경우도 있음
- Solution
- flatMap을 써서 shuffle시킴
- 각 row당 많은 record가 생기므로(tokenized) 시간 복잡도 증가할 수 있음
- repartition을 써서 task 수 늘리고 task당 처리하는 data 양을 줄임
- OOM도 줄일 수 있음
- flatMap을 써서 shuffle시킴
5. Lightweight shuffle
- Problem
- 전체 시간 중 Shuffle Read Blocked Time의 비중이 높을 때(>5%)
- 어떻게 shuffle 완료를 기다리는 것을 피할 수 있나?
- Solution
- Spark에서 data compression, merge file consolidation 등 많은 테크닉을 쓰지만 그래도 성능 병목이 되는 2가지 근본적인 문제가 있음
- I/O intensive함 (집중적으로 사용함)
- 네트워크 타고 데이터 이동, target machine에 데이터 쓰기
- cache나 local partition에 비해 120배 느림
- 동시성을 위한 동기화 포인트임
- stage 내 각 task가 모두 끝나야 다음 stage로 넘어갈 수 있음
- I/O intensive함 (집중적으로 사용함)
- shuffle은 가능한 피하거나 줄이거나
- Spark에서 data compression, merge file consolidation 등 많은 테크닉을 쓰지만 그래도 성능 병목이 되는 2가지 근본적인 문제가 있음
- Solution
- Dataset이나 DataFrame API를 쓴다면 실행 계획 만들 때 50가지 이상의 최적화 기술 들어감
- 안쓰는 컬럼 이나 partition 자동 pruning
- Spark Sql Catalyst Optimizer github
- RDD일 경우 몇가지 테크닉
- shuffle 하기 전에 map을 이용해서 안쓰는 데이터 제거
- key-value pair를 가지고 있다면, rdd대신 rdd.keys 활용 검토
- count나 membership 테스트일 경우 key만 있어도 충분함
- stage의 순서 조정
- join한 뒤 group by 할 것인가 group by한 뒤 join할 것인가
- transformation 전 후의 레코드 수 비교하여 비용산정해봐야 함
- Filter first
- shuffle전에 filter할 거 있으면 먼저 해야함
- CoGroup 사용
- 두개 이상의 RDD있을 때, 같은 키끼리 모으고 싶다면 CoGroup 활용
- 같은 타입의 K를 키로 사용하는 RDD[(K,V)]를 HashPartitioner를 이용하여 group지어서 항상 같은 노드에 가도록 함
- 다른 codec 활용
- lz4, lzf, snappy 사용해보고 어떤 게 좋은지 결정
- Dataset이나 DataFrame API를 쓴다면 실행 계획 만들 때 50가지 이상의 최적화 기술 들어감
6. Broadcast variables pattern
- Problem
- 분석에 작은 크기의 reference dataset들과 dimension table들이 값비싼 shuffle을 유발함
- Solution
- transaction log나 트윗같이 무한히 증가하는 데이터 말고 UK postcode 같은 유한한 데이터(bounded datasets) - 종종 바뀌지만 유한한 크기
- bounded dataset일 경우 join하지 말고 broadcast variable을 활용해서 모든 executor에 다 쏴라
7. Optimized cluster
- Problem
- 어떻게 설정을 해야 클러스터의 전체 리소스를 사용할 수 있는지 잘 모르겠음
- Solution
- 일반적으로 큰 executor보다는 많은 executor 선호
- YARN-기반 클러스터에서 리소스 분배 측정법
- number of executors = (total cores – cluster overhead) / cores per executor
- ((T – (2*N + 6)) / 5)
- T = 클러스터의 총 코어 수, N = 클러스터의 총 노드 수
- 2 = HDFS와 YARN의 오버헤드 (노드에 Datanode, NodeManager있다고 가정)
- 6 = HDFS와 YARN의 master process
- NameNode, ResourceManager, SecondaryNameNode, ProxyServer, HistoryServer 등 (주키퍼, HA등 때문에 더 추가될 수 있음)
- 5 = executor 당 코어수, I/O 경합없는 최적의 task concurrency (입증할 수 없지만)
- ((T – (2*N + 6)) / 5)
- mem per executor = (mem per node / number of executors per node) * safety fraction
- (64 / E)* 0.9 => 57.6 / E
- E = 노드 당 executor 수
- 0.9 = off-heap overhead(기본 10%)를 제외한 heap에 할당된 실제 메모리 비율
- 일반적으로 executor에 많은 메모리 할당하는 것이 좋음(cache, sort space)
- GC오래 걸릴 수 있음 / Spark UI보면서 조정할 것
- number of executors = (total cores – cluster overhead) / cores per executor
8. Redistribution Pattern
- Problem
- 항상 적은 수의 executor만 실행됨
- Solution
- repartition 혹은 파티션 수 명시하면 re-partition함
- 반대로 너무 많은 task(10000+)가 뜬다면 coalesce
9. Salting Key Pattern
- Problem
- Task의 대부분은 괜찮은 시간에 끝나는 데 한 두개가 오래걸림. repartition해봐도 별 효과없음
- Solution
- 데이터 분포가 한 쪽으로 치우침(skew)
- 몇몇 task가 너무 오래 걸리거나 input 혹은 output이 너무 많음
- rdd.keys.count 해서 RDD의 키가 executor의 수 보다 너무 적다면 키 전략 다시 고려해 볼 것
- salting key한 후 예전 key로 re aggregation
rdd filter { case (k,v) => isPopular(k) } .map { case (k,v) => (k+ r.nextInt(n),v) }
10. Secondary sort Pattern
- Problem
- 키 별로 grouping하고 sorting할 때 OOM
rdd.reduceByKey(_+_).sort(_._2,false) // inefficient for large groups
- 키 별로 grouping하고 sorting할 때 OOM
- Solution
- Composition key
- 그룹화할 요소와 정렬할 요소 포함
- Grouping partitioner
- Composition key의 어느 부분이 grouping과 관련있는 지 이해
- Composite key ordering
- Composition key의 어느 부분이 sorting과 관련있는 지 이해
- Composition key
- Example
case class Mention(name:String, ariticle:String,Published: Long) //Composite key : cas class SortKey(name:String,pulished :Long)
최근에 멘션 날린 이름 순으로 정렬하고 싶을 때 이름과 멘션 날짜로 composite key 생성
- 이름 별로 grouping하는 Grouping partitioner 구현
class GroupingPartitioner(paritions: Int) extends Pratitioner {
override def numParitions:Int = partitions
override def getParition(key: Any): Int = {
val groupBy = key.asInstanceOf[SortKey]
groupBy.name.hashcode() % numParitions
}
}
- 발행시점으로 정렬하는 Composite key ordering 구현
- 주어진 partitioner로 repartition 하면서 repartition 결과의 키가 정렬 됨
- 지금 예제에서는 implicit에서 sortBy를 published로 바꾼 것임
- Keyby 는 map의 특별한 case로 Composite key 생성한 것을 키로 하고 m을 value로 만듦
implicit val sortBy: Ordering[SortKey] = Ordering.by(m => m.published
val pais = mentions.rdd.keyBy(m => SortKey(m.name,m.published))
pairs.repartiionAndSortWithinParitions(new GroupingParitioner(n))
11. Probability Algorithms
- Problem
- 데이터셋 너무 커서 통계량 구하는 데 너무 오래 걸림
- 조금 틀려도 좀 더 빨리 받고 싶음
- Solution
- 시간이 문제될 때
- rdd.countApprox(), rdd.countByValueApprox(), rdd.countApproxDistinct()
- org.apache.spark.sql.DataFrameStatFunction의 df.stst
- 공간이 문제될 때
- Bloom Filter
- membership test
- 있는 건 없다고 안하지만 없는 건 있다고 할 수 있음
- HyperLogLog
- 컬럼의 distinct 값
- CountMinSketch
- 데이터 스트림에서 이벤트 발생 빈도 테이블 제공(Spark Stream에서 유용)
- Bloom Filter
- 시간이 문제될 때
12 .Selective caching
- Problem
- cache사용하니까 이전보다 느려짐
- Solution
- RDD를 한번 이상 쓸 때 cache가 효율적임
- 데이터가 여러 stage 교차적으로 사용될 때
- 데이터가 다수의 child 데이터셋의 lineage에 있을 때
- iterative process일 때 (stochastic gradient descent)
- 재활용안하면 문제됨
- cache overhead(만들고 업데이트 하고 flush하고 안쓰면 GC하고)
- 너무 크면 문제됨
- OOM 혹은 디스크로 swap data
-
ds.cache -> ds.count 한 후 spark UI의 Storage에서 확인
- Solution
- OOM날 경우 MEMORY_AND_DISK 활용 검토
- GC시간 길 경우 MEMORY_AND_SER 검토
- Fraction Cached가 100%이면서 Size on Disk가 최대한 작게 만드는 것이 목표
- RDD를 한번 이상 쓸 때 cache가 효율적임
13. Garbage collection
- Problem
- GC time이 전체 processing time에서 상당한 비율을 차지할 때(> 15%)
- Solution
- Spark의 GC는 자체적으로 꽤 효율적으로 작동하므로, 확실할때만 조정해야함 (내 잘못)
- 과도한/비정상적인 메모리 소비의 근원이 아닌지 확인
- cache 전략 확인
- 안쓰는 RDD는 명시적으로 unpersist
- 객체 할당 너무 많이 하는지 확인
- domain model 단순화
- 객체 재사용
- 가능한 primitive Type선호
- map -> mapPartitions
- Solution
- G1 GC사용 권장
- XX:UseG1GC
- InitiatingHeapOccupancyPercent
- 45% 미만이면 pause 횟수 좀 줄일 수 있음
- -XX:InitiatingHeapOccupancyPercent - ConcGCThread
- GC Thread 많을 수록 GC 빨리함
- CPU 많이 씀
- XX:ConcGCThread
Subscribe via RSS