키/페어로 작업하기
by Jaesang Lim
4. 키/값 페어로 작업하기
배경
KeyValueRDD = 페어RDD
각 키에 대해 병렬로 처리 OR 네트워크상에서 데이터 그룹핑역할 RDD에서 필드값을 뽑아 내어 그 것을 패어 RDD 연산을 위한 키로 사용
e.g )
reduceByKey() : 각 키로 구분하여 집합연산 가능 Join() : 동일 키에 대한 데이터 끼리 분류해서 두 RDD를 합쳐줌
페어 RDD 생성
스칼라 ( map() )
키를 가지는 데이터를 위한 함수를 위해 Tuple를 리턴해야함 ` val pairs = lines.map( x=>( x.split(“”)(0), x) )`
자바 ( mapToPair() )
내장 튜플타입이 없음 Scala.Tuple2 클래스 사용하여 튜플 생성
페어RDD의 트랜스포메이션
페어RDD 은Tuple을 가지므로 개별 데이터를 다루는 함수 대신 ! ==튜플을 처리하는 함수== 사용 개별 데이터를 다루는 함수 또한 적용 가능
{ (1,2),(3,4),(3,6) }
함수명 | 설명 | 코드 | 결과 |
---|---|---|---|
reduceByKey(Func) | 동일 Key로 값을 합침 | rdd.reduceByKey ( (x,y) => (x+y) | { (1,2), (3,10) } |
groupByKey() | 동일 Key로 값을 그룹화 | rdd.groupByKey() | { (1,[2]), (3, [4,6])} |
CombineByKey | |||
mapValues(Func) | Key 변경없이 페어RDD값 변경 | rdd.mapValues( x => x+1 ) | { (1,3), (3,5), (3,7)} |
flatMapValues(Func) | |||
keys() | Key들 리턴 | rdd.keys() | { 1, 3, 3 } |
values() | Value 리턴 | rdd.values() | { 2, 4, 6 } |
sortByKey() | Key 기준 정렬 | rdd.sortByKey() | { (1,2), (3,4), (3,6) } |
rdd = { (1,2),(3,4),(3,6) } other = {(3,9)}
함수명 | 설명 | 코드 | 결과 |
---|---|---|---|
subtractByKey | RDD의 key 기준으로 다른 RDD의 데이터 삭제 | rrdd.substractByKey(other) | {(1,2)} |
join | Key값 기반으로 InnerJoin | rdrdd.join(other) | { (1,[2]), (3, [4,6])} |
rightOuterJoin | rdd.rightOuterJoin(other) | { (3, (Some(4),9)) , (3, (Some(6),9)) } | |
leftOuterJoin | rdd.leftOuterJoin(other) | {(1, (2,None)), (3,(4, Some(9))), (3,(6, Some(9)))} | |
cogroup | 동일 키에 대해 양쪽 RDD 그룹화 | rdd.cogroup(other) | {(1,([2],[])) , (3, ([4]),[6],[9])} |
집합연산
- key value에서 동일 key에 대한 통계산출 작업 빈번하다.
- 일반 RDD를 이용하요 페어RDD를 생성하기 때문에 일반 RDD 연산도 지원한다.
- fold() , aggregate(), reduce()
- 연산이라곤 하지만, RDD를 리턴하므로 Transformation
reduceByKey()
- 여러번의 연산 수행
- 하나의 key에 대한 작업
- 각 작업은 동일 key에 대한 value를 하나로 합침
- 각 키와 합에 대해 합쳐진 값으로 구성된 RDD return
foldByKey()
- 데이터 타입과 동일한 제로 밸류 (e.g 0 or 1 ) 와 함께 병합
reduceByKey() 와 foldByKey() 공통사항
1.각 키에 대한 총합을 계산 하기 전에 각 머신에서 자동으로 병합(combining) 수행 combiner지정할 필요없음
CombineByKey()
- Key별 집합 연산 함수 중 일반적으로 많이 사용
- 하나의 파티션 내의 데이터들을 하나씩 처리
- 새로운 데이터 ? createCombiner()
- 존재하는 데이터 ? mergeValue()
- 파티션 병합 ? mergeCombiners()
병렬화 수준 최적화
- 모든 RDD는 고정된 개수의 파티션을 갖고 있음
- 이 것이 RDD에서 연산이 처리 될 때 동시 작업의 수준을 결정 ( 특정 파티션 사용하도록 요청 가능 )
-
default : 클러스터의 사이즈에 맞는 적절한 파티션 개수
- RDD의 파티셔닝을 바꾸고 싶다?
- repartition()
- 네트워크를 통한 데이터교환 ( 셔플링 == 비쌈.. )
- 최적화 버전 : coalesce()
- 개념은 merge과 같음
- 하나의 합치는 것이 아닌 더 적은 개수로 합친다는 뜻 = RDD의 파티션 개수를 줄이는 경우에는 셔플링 발생 X )
- rdd.partiitions.size() 로 확인 가능
그룹화
Key를 가진 데이터
- groupByKey()
(,V) -> groupByKey() -> ( K, iterable[V] ) 의 RDD로 리턴 )
- cogroup()
(K,V) -> [K, (Iterable[V] , Iterable[W])]
조인
- 다른 Key값을 가진 데이터와 함께 사용 ( inner / left or right outer / cross )
-
outer 조인을 했을 때, key값이 없을 시에는 Option / Optional / None로 표시
- Optional?
- 구글의 구아바 라이브러리의 클래스 (= 존재하지 않을 수도 있는 값을 표시)
- isPresent() 값이 존재하는지? get() : 없음
정렬
sortByKey() default : 오름차순
RDD 연산
{ (1,2), (3,4), (3,6) }
countByKey()
각 키에 대한 값의 개수를 센다 rdd.countByCount() = { (1,1) , (3,2) }
collectAsMap()
검색을 위해 Map형태로 모음 rdd.collectAsMap() = Map{ (1,2), (3,4) , (3,6)}
lookup(key)
키에 대한 모든 값 return rdd.lookup(3) = [4,6]
RDD 파티셔닝
- 노드 간 데이터세트의 파티셔닝을 어떻게 제어하는가? 왜?
- 분산시스템은 통신비용 즉, 네트워크 부하를 최소화해야 성능이 좋아짐
- 네트워크 비용을 줄이기 위해 RDD의 파티셔닝 제어 방법 선택해야한다.
- 물론 항상 고려할 필요는 없음. 단 한번만 scan한다면 굳이.. 할 필요없지
- 파티셔닝은 조인 같은 키 중심의 연산에서 데이터세트가 여러번 재활용될 때 유용
-
각 키에 제공된 함수에 따라 값들을 그룹화
- 정확히 어느 노드에서 작업하라는 명시적인 제어는 제공하지 않음 왜?
- 특정 노드가 장애가 나더라도 전체적으로는 동작할 수 있도록 설계되어 있음
- 하지만!!!! 어떤 키의 모음들이 임의의 노드에 함께 모여있다는 것을 보장한다.
val userData = sc.textFile("hdfs://..").partitionBy( new HashPartitioner(100) ).persist()
- partitionBy()
- 트랜스포메이션
- 항상 새로운 RDD를 리턴, 기존의 RDD는 변경하지 않음
- 그래서 textFile이 아닌 partitionBy()에 대해 persist() 제공
- 100은 파티션 개수
- 얼마나 많은 병렬 작업들이 RDD에서 이후에 이루워질지 연산 작업을 수행하는 지 표현
- 통상적으로 CPU 코어 개수 이상으로 선정
- 자동적으로 partition을 포함하는 것도 있음
- sortByKey() - Hash partition
- groupByKey() - Range partition
- map() 종류의 연산들은 Key값의 변형할 여지가 있기 때문에 어떻게 파티션 되는지 결정해야한다.
RDD의 파티셔너 정하기
- RDD가 어떻게 파티션될지 partitioner 속성을 써서 결정
- partitioner은 scala.Option object return ; 특정 정보를 담기위한 컨테이너 역할
- isDefined() ? ( spark.Partitioner object = get() : 없음 )
pair.partitioner
res0 : Option [spark.partitioner] = None
import org.apache.spark.HashPartitioner
pairs.partitionBy( new HashPartitioner(2) )
pair.partitioner
res0 : Option [spark.partitioner] = Some (spark.HashPartitioner@134131d)
파티셔닝이 도움이 되는 연산들
- cogroup()
- groupWith()
- join()
- leftOuterJoin()
- rightOuterJoin()
- groupByKey()
- reduceByKey()
- CombineByKey()
- lookup()
- reduceByKey()
- cogroup() / join()
- reduceByKey()
이미 파티션되어 있는 데이터를 단일 머신에서 처리한 결과를 셔플링
- cogroup() / join()
2개 이상의 RDD 중 하나라도 파티셔닝을 한면 이득 둘다 동일한 파티션을 가지고 동일 노드에 캐시되어 있다면 셔플링 발생하지 않음
파티셔닝에 영향을 주는 연산들
- 스파크는 내부적으로 각각의 연산이 파티셔닝에 어떤 영향을 주는지 파악하고 있음
- join() 시, 동일 키의 데이터들은 동일 머신에 해시되어 모음
- (= 파티셔닝 = reduceByKey빠르게 작동가능)
- key/value를 가진 Hash된 RDD에 map()을 호출하면 , map()은 킷값을 변경할 수 있다는 가능성이 있으므로 partitioner를 가지지 않음
- 그러나 key값을 동일하게 유지하는 것을 보장하는 mapValue() , flatMapValue() 연산 제공
- RDD에 파티셔너가 지정되는 모든 연산들
- cogroup()
- groupWith()
- join()
- leftOuterJoin()
- rightOuterJoin()
- groupByKey()
- reduceByKey()
- CombineByKey()
- partitionBy()
- sort()
- mapValues()
- flatMapValues()
- filter()
- mapValues()
부모가 파티셔너를 가진 경우
- flatMapValues()
부모가 파티셔너를 가진 경우
- filter()
부모가 파티셔너를 가진 경우
- 결과 RDD는 부모 RDD의 파티셔너에 따라 결정!!
- 기본은 Hash, 개수는 연산의 병렬화 수준으로 세팅 ( default 200)
- 부모가 여러개 일때나면 첫 번째 부모를 기준으로..
Subscribe via RSS