효율적인 트랜스포메이션

  • 이 글에서 설명하는 트랜스포메이션은 스파크 코어에서 서용하는 RDD객체들의 것
  • 스파크 SQL은 카탈리스트 옵티마이져를 사용하기 때문에 이 글에서 다룰 개념들이 모두 스파크 SQL에 적용되는 것이 아님
  • 스파크의 성능의 상당 부분은 ‘트랜스포메이션’

좁은 트랜스포메이션 VS 넓은 트랜스포메이션

  • 2012년 스파크 평가 의미론을 다룬 논문에서의 정의
    • 자식 RDD보다는 ‘부모 RDD의 종속성 위주’로 좁고 넓음을 정의함

1. 좁은 종속성의 트랜스포메이션

  • 부모 RDD의 각 파티션이 자식 RDD의 최대 하나의 파티션에 의해 사용되는 것
  • 각 자식 파티션은 하나의 부모 파티션에만 의존

2. 넓은 종속성의 트랜스포메이션

  • 부모 RDD의 각 파티션마다, 여러 자식 파티션이 종속되어 있는 것
  • 자식 파티션들은 부모 RDD의 다수의 파티션에 기반
  • 우리가 개발할 때는 입력 데이터(부모RDD) ~ 출력 데이터(자식RDD) 방향으로 프로그램 설계
  • 하지만, 스파크 평가엔진은 DAG 출력(=마지막 액션)으로부터 입력 RDD로 실행 계획을 역으로 구축
  • 그래서, 스파크 제작자들은 ‘스파크가 평가되는 방식’을 반영하여 다음의 두가지 중요한 측면에서 볼 때 더욱 정확하다고 볼 수 있음
  1. 좁은 종속성에서 여러 자식을 가진 한 부모의 파티션의 경우는 제외
    • coalesce 같이 파티션을 줄일 때만 ‘좁은 종속성’, 늘릴 때는 제외되는 이유
  2. RDD를 평가할 때 연산을 완료하는 task개수가 왜 입력 파티션이 아니라 출력 파티션과 관계가 있는지 설명해
  • 데이터 reparitioning은 파티션들이 동일한 머신 안에 존재하므로 머신간의 데이터 이동을 필요하지 않음
  • 레코드의 파티션이 executor간의 데이터 이동이 필요하면 레코드들은 executor 사이에서 직접 이동이 아닌, driver를 통해 전달된다

성능에 대한 고려 상황

  • 좁은 종속성은 파티션 간에 데이터의 이동을 요구하지 않음
    • 이 뜻은, 좁은 트랜스포메이션은 driver가 보낸 명령어들을 몇 개의 레코드 집합 위에서 실행할 수 있음
  • 즉, 좁은 트랜스포메이션들의 각 모음이 질의 실행 계획에서 하나의 Stage 안에서 연산할 수 있다. 라고 정의할 수 있음

  • 하나의 넓은 종속성과 엮인 셔플은 RDD의 평가에 새로운 Stage로 표시된다
    • Task들은 하나의 파티션에서 계산해야함
    • 넓은 족송성의 각 파티션에서 계산해야하는 데이터들은 대부분 여러 머신에 나눠져 있으므로 데이터 이동이 필요
    • 즉, 이후에 오는 연산들은 셔플이 완료되기 전에는 시작되지 않음!
  • sortByKey 함수는 넓은 종석성으로, 정렬 이후에 오는 트랜스포메이션은 각 테이거가 변경될 수 있으므로 셔플 끝나기전에는 수행되지 않음

  • stage 경계는 성능에 있어 중요한 사항
  • join과 같은 다중 RDD 연산의 경우를 제외하면, 한 RDD와 연계된 Stage들은 일렬로 수행되어야함
  • 셔플은 데이터 이동과 잠재적인 디스크 입출력(파일 셔플링을 위해) 요구되므로 비용이 비쌀 뿐아니라, 병렬 수행도 제한된다

장애 내구성의 고려 사항

  • 넓은 종속성에서 한 파티션의 실패는 재연산이 필요한 개수가 훨씬 많으므로 좁은 종속성의 파티션 실패보다 더 많은 비용이 요구

e.g) 좁은 종속성에서의 장애 내구성 상황

  • mappedRDD의 부모 중 한개의 파티션이 실패했다면 자식들중에도 한 파티션만 재연산되어야고
  • 이 재연산 속도를 높이기 위해 해당 자식 파티션들의 Task가 executor에게 분산되어 실행됌

e.g) 넓은 종속성에서의 장애 내구성 상황

  • 정렬된 RDD의 부모가 파티션을 잃어버리게 된다며, 모든 자식 파티션을 재연산해야할 수 도 있음
  • 넓은 종속성의 트랜스포메이션들을 연속으로 놓고 실행하는 것은 굉장히 높은 부하의 재연산을 불러올 수 있는 위험성을 높이는 것
  • 특히 메모리 오류를 일으킬 확률이 높은 것들이 있다면 더더욱!!
  • 이 특정 경우에서는 RDD를 checkpointing함으로써 중간 결과를 저장하는 것이 충분히 가치가 있을 수 있음

coalesce의 특별한 경우

  • coalesce로 결과 파티션 개수를 줄이면 자식 파티션들이 여러 부모 파티션들을 통합하게 되어 각 부모 파티션은 정확히 하나의 자식 파티션을 위해 활용
  • coalesce로 파티션을 늘린다면 넓은 종속성의 트랜스포메이션
    • 사실상, coalesce(100) 으로 파티션을 늘린다면, 자식 RDD의 파티션은 100개가 아니다.
    • 그 이유는 coalesce함수 내부를 보면 확인할 수 있는데, default로 함수의 두번쨰 인자, shuffle=false로 설정되어 있기 때문
    • 늘릴라면 coalesce(100,true)로 줘야하, 이렇게 되면 repartition(100) 주는 것과 동일하다
  /**
   * Return a new RDD that has exactly numPartitions partitions.
   *
   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
   * a shuffle to redistribute data.
   *
   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
   * which can avoid performing a shuffle.
   */
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }
  • coalesce함수는 자식 파티션에게 데이터를 공평하게 분산하는 것을 우선 순위로 둠
  • 따라서, 각 입력 파티션에 데이터가 얼마나 많이 들어있냐에 따라서, 데이터의 최종 위치가 달라지게 되므로, 초반에는 결과 레코드들의 위치를 알 수 없음
  • 데이터를 읽거나, 초반 트랜스포메이션을 평가하기 전에는 각 파티션의 래코드 개수 또한 알 방법이 없으므로, 더 큰 파티션 숫자로 coalesce나 repartition을 호출하는 것은 셔플이 필요함