4. 키/값 페어로 작업하기

배경

KeyValueRDD = 페어RDD

각 키에 대해 병렬로 처리 OR 네트워크상에서 데이터 그룹핑역할 RDD에서 필드값을 뽑아 내어 그 것을 패어 RDD 연산을 위한 키로 사용

e.g )

reduceByKey() : 각 키로 구분하여 집합연산 가능 Join() : 동일 키에 대한 데이터 끼리 분류해서 두 RDD를 합쳐줌

페어 RDD 생성

스칼라 ( map() )

키를 가지는 데이터를 위한 함수를 위해 Tuple를 리턴해야함 ` val pairs = lines.map( x=>( x.split(“”)(0), x) )`

자바 ( mapToPair() )

내장 튜플타입이 없음 Scala.Tuple2 클래스 사용하여 튜플 생성

페어RDD의 트랜스포메이션

페어RDD 은Tuple을 가지므로 개별 데이터를 다루는 함수 대신 ! ==튜플을 처리하는 함수== 사용 개별 데이터를 다루는 함수 또한 적용 가능

{ (1,2),(3,4),(3,6) }

함수명 설명 코드 결과
reduceByKey(Func) 동일 Key로 값을 합침 rdd.reduceByKey ( (x,y) => (x+y) { (1,2), (3,10) }
groupByKey() 동일 Key로 값을 그룹화 rdd.groupByKey() { (1,[2]), (3, [4,6])}
CombineByKey      
mapValues(Func) Key 변경없이 페어RDD값 변경 rdd.mapValues( x => x+1 ) { (1,3), (3,5), (3,7)}
flatMapValues(Func)      
keys() Key들 리턴 rdd.keys() { 1, 3, 3 }
values() Value 리턴 rdd.values() { 2, 4, 6 }
sortByKey() Key 기준 정렬 rdd.sortByKey() { (1,2), (3,4), (3,6) }

rdd = { (1,2),(3,4),(3,6) } other = {(3,9)}

함수명 설명 코드 결과
subtractByKey RDD의 key 기준으로 다른 RDD의 데이터 삭제 rrdd.substractByKey(other) {(1,2)}
join Key값 기반으로 InnerJoin rdrdd.join(other) { (1,[2]), (3, [4,6])}
rightOuterJoin   rdd.rightOuterJoin(other) { (3, (Some(4),9)) , (3, (Some(6),9)) }
leftOuterJoin   rdd.leftOuterJoin(other) {(1, (2,None)), (3,(4, Some(9))), (3,(6, Some(9)))}
cogroup 동일 키에 대해 양쪽 RDD 그룹화 rdd.cogroup(other) {(1,([2],[])) , (3, ([4]),[6],[9])}

집합연산

  • key value에서 동일 key에 대한 통계산출 작업 빈번하다.
  • 일반 RDD를 이용하요 페어RDD를 생성하기 때문에 일반 RDD 연산도 지원한다.
  • fold() , aggregate(), reduce()
  • 연산이라곤 하지만, RDD를 리턴하므로 Transformation
reduceByKey()
  1. 여러번의 연산 수행
  2. 하나의 key에 대한 작업
  3. 각 작업은 동일 key에 대한 value를 하나로 합침
  4. 각 키와 합에 대해 합쳐진 값으로 구성된 RDD return
foldByKey()
  1. 데이터 타입과 동일한 제로 밸류 (e.g 0 or 1 ) 와 함께 병합
reduceByKey() 와 foldByKey() 공통사항

1.각 키에 대한 총합을 계산 하기 전에 각 머신에서 자동으로 병합(combining) 수행 combiner지정할 필요없음

CombineByKey()
  1. Key별 집합 연산 함수 중 일반적으로 많이 사용
  2. 하나의 파티션 내의 데이터들을 하나씩 처리
    • 새로운 데이터 ? createCombiner()
    • 존재하는 데이터 ? mergeValue()
    • 파티션 병합 ? mergeCombiners()

병렬화 수준 최적화

  • 모든 RDD는 고정된 개수의 파티션을 갖고 있음
  • 이 것이 RDD에서 연산이 처리 될 때 동시 작업의 수준을 결정 ( 특정 파티션 사용하도록 요청 가능 )
  • default : 클러스터의 사이즈에 맞는 적절한 파티션 개수

  • RDD의 파티셔닝을 바꾸고 싶다?
  • repartition()
    • 네트워크를 통한 데이터교환 ( 셔플링 == 비쌈.. )
  • 최적화 버전 : coalesce()
    • 개념은 merge과 같음
    • 하나의 합치는 것이 아닌 더 적은 개수로 합친다는 뜻 = RDD의 파티션 개수를 줄이는 경우에는 셔플링 발생 X )
    • rdd.partiitions.size() 로 확인 가능

그룹화

Key를 가진 데이터

  • groupByKey()

    (,V) -> groupByKey() -> ( K, iterable[V] ) 의 RDD로 리턴 )

  • cogroup()

    (K,V) -> [K, (Iterable[V] , Iterable[W])]


조인

  • 다른 Key값을 가진 데이터와 함께 사용 ( inner / left or right outer / cross )
  • outer 조인을 했을 때, key값이 없을 시에는 Option / Optional / None로 표시

  • Optional?
    • 구글의 구아바 라이브러리의 클래스 (= 존재하지 않을 수도 있는 값을 표시)
    • isPresent() 값이 존재하는지? get() : 없음

정렬

sortByKey() default : 오름차순

RDD 연산

{ (1,2), (3,4), (3,6) }

countByKey()

각 키에 대한 값의 개수를 센다 rdd.countByCount() = { (1,1) , (3,2) }

collectAsMap()

검색을 위해 Map형태로 모음 rdd.collectAsMap() = Map{ (1,2), (3,4) , (3,6)}

lookup(key)

키에 대한 모든 값 return rdd.lookup(3) = [4,6]

RDD 파티셔닝

  • 노드 간 데이터세트의 파티셔닝을 어떻게 제어하는가? 왜?
    • 분산시스템은 통신비용 즉, 네트워크 부하를 최소화해야 성능이 좋아짐
    • 네트워크 비용을 줄이기 위해 RDD의 파티셔닝 제어 방법 선택해야한다.
    • 물론 항상 고려할 필요는 없음. 단 한번만 scan한다면 굳이.. 할 필요없지
  • 파티셔닝은 조인 같은 키 중심의 연산에서 데이터세트가 여러번 재활용될 때 유용
  • 각 키에 제공된 함수에 따라 값들을 그룹화

  • 정확히 어느 노드에서 작업하라는 명시적인 제어는 제공하지 않음 왜?
    • 특정 노드가 장애가 나더라도 전체적으로는 동작할 수 있도록 설계되어 있음
    • 하지만!!!! 어떤 키의 모음들이 임의의 노드에 함께 모여있다는 것을 보장한다.
val userData = sc.textFile("hdfs://..").partitionBy( new HashPartitioner(100) ).persist()

  • partitionBy()
    • 트랜스포메이션
    • 항상 새로운 RDD를 리턴, 기존의 RDD는 변경하지 않음
    • 그래서 textFile이 아닌 partitionBy()에 대해 persist() 제공
    • 100은 파티션 개수
    • 얼마나 많은 병렬 작업들이 RDD에서 이후에 이루워질지 연산 작업을 수행하는 지 표현
    • 통상적으로 CPU 코어 개수 이상으로 선정
  • 자동적으로 partition을 포함하는 것도 있음
    • sortByKey() - Hash partition
    • groupByKey() - Range partition
  • map() 종류의 연산들은 Key값의 변형할 여지가 있기 때문에 어떻게 파티션 되는지 결정해야한다.
RDD의 파티셔너 정하기
  • RDD가 어떻게 파티션될지 partitioner 속성을 써서 결정
  • partitioner은 scala.Option object return ; 특정 정보를 담기위한 컨테이너 역할
  • isDefined() ? ( spark.Partitioner object = get() : 없음 )
pair.partitioner
res0 : Option [spark.partitioner] = None
 
import org.apache.spark.HashPartitioner
pairs.partitionBy( new HashPartitioner(2) )
 
pair.partitioner
res0 : Option [spark.partitioner] = Some (spark.HashPartitioner@134131d)

파티셔닝이 도움이 되는 연산들
  • cogroup()
  • groupWith()
  • join()
  • leftOuterJoin()
  • rightOuterJoin()
  • groupByKey()
  • reduceByKey()
  • CombineByKey()
  • lookup()
  • reduceByKey()
  • cogroup() / join()

  • reduceByKey()

    이미 파티션되어 있는 데이터를 단일 머신에서 처리한 결과를 셔플링

  • cogroup() / join()

    2개 이상의 RDD 중 하나라도 파티셔닝을 한면 이득 둘다 동일한 파티션을 가지고 동일 노드에 캐시되어 있다면 셔플링 발생하지 않음


파티셔닝에 영향을 주는 연산들
  • 스파크는 내부적으로 각각의 연산이 파티셔닝에 어떤 영향을 주는지 파악하고 있음
  • join() 시, 동일 키의 데이터들은 동일 머신에 해시되어 모음
  • (= 파티셔닝 = reduceByKey빠르게 작동가능)
  • key/value를 가진 Hash된 RDD에 map()을 호출하면 , map()은 킷값을 변경할 수 있다는 가능성이 있으므로 partitioner를 가지지 않음
  • 그러나 key값을 동일하게 유지하는 것을 보장하는 mapValue() , flatMapValue() 연산 제공
  • RDD에 파티셔너가 지정되는 모든 연산들
    • cogroup()
    • groupWith()
    • join()
    • leftOuterJoin()
    • rightOuterJoin()
    • groupByKey()
    • reduceByKey()
    • CombineByKey()
    • partitionBy()
    • sort()
    • mapValues()
    • flatMapValues()
    • filter()
  • mapValues()

    부모가 파티셔너를 가진 경우

  • flatMapValues()

    부모가 파티셔너를 가진 경우

  • filter()

    부모가 파티셔너를 가진 경우

  • 결과 RDD는 부모 RDD의 파티셔너에 따라 결정!!
  • 기본은 Hash, 개수는 연산의 병렬화 수준으로 세팅 ( default 200)
  • 부모가 여러개 일때나면 첫 번째 부모를 기준으로..