1. 스파크 잡에 대해 알아보자

  • 이 내용은 High Performace Spark 챕터 2. 스파크느는 어떻게 동작하는가? 를 정리한 내용

1-1. 스파크 잡 스케줄링

  • 스파크 애플리케이션은 2가지 프로세스로 구성되어 있음
    1. 고수준 스파크 로직이 작성되어 있는 드라이버 프로세스
    2. 클러스터의 노드들에 나위어 분포된 이그제큐터 프로세스
  • 하나의 스파크 클러스터에 여러개의 스파크 애플리케이션을 동시에 실행할 수 있음
  • 이 애플리케이션들은 클러스터 매니저에 의해 스케줄링되고, 각각 하나의 SparkContext를 가짐
  • 스파크 애플리케이션들은 공존하는 여러개의 잡을 차례로 실행할 수 있음
    • 여기서의 ‘잡’은 한 RDD가 호출하는 각 액션에 대응

1-2. 애플리케이션 간의 자원 할당

  • 스파크는 각 애플리케이션에 정적 할당/ 동적 할당 두가지 방법으로 자원 할당
  • 정적할당
    • 각 애플리케이션이 클러스터의 자원을 최대한 할당하고 실행되는 동안 그 리소스를 예약
  • 동적할당 ( DynamicAllocation)
    • 이그제큐터가 애플리케이션의 필요에 따라 필요 자원량의 대략적인 예측치에 기초하여 자원을 추가, 줄임

1-3. 스파크 애플리케이션

  • 스파크 애플리케이션, 이와 연계된 스파크 잡들은 드라이버 프로그램의 SparkContext에 정의되어 있음
  • 즉, SparkContext가 생기면 스파크 애플리케이션이 구동
  • SparkContext를 실행하면 드라이바와 이그제큐터들이 클러스터의 각 작업 노드에서 구동
  • 각 이그제큐터는 각자의 JVM을 가지고, 한 노드에 여러개의 이그제큐터가 존재할 수 있음, 하지만 하나의 이그제큐터가 여러 노드에 걸쳐서 존재하지 않음
  • SparkContext는 각 이그제큐터에게 얼마나 많은 자원을 할당할지 결정
  • 스파크 잡이 실행될 때, 각 이그제큐터는 RDD를 계산할 Task 실행을 위한 slot를 가짐
  • 이를 통해 하나의 SparkContext는 스파크 잡 실행을 위한 설정 변수들의 한 집합으로 생각할 수 있음
  • RDD는 애플리케이션끼리 공유할 수 없으므로, Join과 같은 여러 RDD를 쓰는 트랜스포메이션들은 동일한 SparkContext를 가져야함

SparkContext 시작할 때 생기는 일

  1. 드라이버 프로그램은 클러스터 매니저에게 신호를 보냄 (ping)
  2. 클러스터 매니저는 워커 노드에 다수의 이그제큐터를 띄운다
  3. 하나의 RDD는 파티션들이 존재하는 이그제큐터에 의해 평가될 것
    • 각 이그제규터는 여러개의 파티션을 가질 수 있지만, 하나의 파티션을 여러 이그제큐터로 분산 시킬 수 없음

2. 스파크 잡 해부

  • 스파크의 지연 평가 패러다임에서 스파크 애플리케이션은 드라이버 프로그램이 액선을 호출하기 전까지 아무것도 안함
  • 각 액션마다 스파크 스케줄러는 실행 그래프를 만들고 스파크 잡(Job)을 시작
  • 각 잡은 최종 RDD를 만들어내는데 필요한 변환의 각 단계, Stage들로 구성
  • 각 Stage는 각 병렬 연산의 한 단위이며, 이그제큐터들 위에서 실행하는 다수의 Task로 구성

  • 각 애플리케이션은 하나의 SparkContext/SparkSession을 시작하는 것과 대응
  • 각 애플리케이션은 하나의 RDD 액션과 대응하는 Job을 여러개 가짐
  • 각 Job은 각각 넓은 트랜스포메이션에 대응되는 Stage를 여러개 가짐
  • 각 Stage는 Stage에서 수행되는 병렬 연산의 한 단위 Task를 여러개 가짐
  • Stage의 결과 RDD에 있는 하나의 파티션마다 하나의 Task가 존재

2-1. DAG

  • 클러스터 접속, 설정 변수, 스파크 잡 시작과 과련된 에러들은 DAG 스케줄러 에러로 나타남.
    • 왜? 스파크의 Job의 실행이 DAG에 의해서 이루어지기 때문!
  • DAG는 각 Job의 Stage의 그래프를 만들고, 각 Task가 실행될 위치를 결정하며, 클러스터에서 Task 실행을 관리하는 TaskScheduler에게 정보 전달
  • TaskScheduler는 파티션들간의 종속성 그래프를 만든다.

  • 스파크의 고수준 스케줄링 레이어는 각 스파크 Job의 Stage의 지향성 비순한 그래프(DAG)를 만들기 위해 RDD 종속성 정보를 사용
  • 스파크 API에서 이는 DAG 스케줄러로 불림

2-2. Job

  • RDD 트랜스포메이션을 계산하는 프로세스
  • = 하나의 최종 결과를 연산해 해는 데 필요한 RDD 트랜스포메이션들의 집합

  • 스파크의 실행 구성에서 가장 높은 단계의 요소
  • 각 스파크 잡은 하나의 액션에 해당되며, 각 액션은 스파크 애플리케이션의 드라이버 프로그램에서 호출
  • 액션은 ‘데이터를 스파크 RDD 세상 바깥으로 갖다 놓는 그 무엇’로 정의할 수 있음
  • 스파크 실행 그래프의 간선(edge)들은 RDD 트랜스포메이션에서 파티션들 간의 종속성을 기반으로 생성함
  • 그러므로 RDD가 아닌 무언가를 되돌려 주는 연산은 자식을 가질 수 없다
  • 그래프 이론으로 말하면 액션은 DAG에서 Leaf 노드 형태
  • 수 많은 트랜스포메이션의 한 집합은 하나의 실행 그래프로 구성할 수 있음
  • 하지만 액션이 한번 호출된 후, 더 이상 그래프에 추가를 할 수 없음
  • 애플리케이션은 액션을 호출한 최종 RDD를 평가하기 위해 필요한 트랜스포메이션들 포함한 Job을 바로 실행

2-3. Stage

  • 드라이버의 도움 없이 해당 부분을 완료할 수 있음
  • 하나의 Stage는 파티션끼리 데이터가 전송되는 일 없이 연산이 가능한 단위!
  • 하나의 잡은 하나의 액션을 호출하는 것을 정의되며, 액션은 하나의 이상의 트랜스포메이션을 가지며, 넓은 트랜스포메이션은 잡의 부분들을 Stage로 정의
  • 각 Stage는 넓은 트랜스포메이션에 의해 생성되는 셔플 의존성에 대응
  • 하나의 Stage는 다른 이그제큐터나 드라이버와 통신없이 하나의 이그제규터에서 계산이 가능한 Task들의 집합
  • 즉, 하나의 새로운 Stage는 언제든지 작업 노드들 사이에서 셔플이 될 때 마다 시작된다. (?)

  • Stage의 경계를 구분하는 종속성 : ShuffleDependencies
  • 셔플은 sort, groupByKey 같은 넓은 트랜스포메이션에 의해 발생하고, 이는 데이터를 파티션들에 걸쳐 재분배한다는 것을 의미
  • 스파크는 동일 RDD가 동일 파티셔너에 의해 두번 이상 파티션되지 않도록, 어떻게 RDD가 파티션되었는지에 대한 정보를 유지함
  • 파티셔너를 가진 RDD와 그렇지 않은 RDD에 대한 동일한 연산을 하더라도, 알려진 쪽은 셔플할 필요없음
  • Stage 경계에서는 드라이버와 통신이 요구되기 때문에, 하나의 잡과 연계된 Stage들은 병렬적으로 실행되기보다는 순차적으로 실행되어야함
  • Join 처럼 이후의 트랜스포메이션에서 연결이 되는 서로 다른 RDD들을 연산하는 경우에는 병렬적으로 처리 가능
  • But, 하나의 RDD를 연산하기 위한 넓은 트랜스포메이션들은 순차적으로 실행되어야함
  • 당연한 이야기이지만, 최소한 셔플로 수행되게 설계하는 것이 중요

2-4. Task

  • 데이터의 각 파티션에 대한 작업을 수행하는 단위들
  • Task는 실행 계층에서 가장 작은 단위이며 하나의 local 연산을 표현
  • 한 Stage의 모든 Task들은 서로 다른 데이터를 대상으로 동일한 코드를 실행
  • 하나의 Task는 둘 이상의 이그제큐터에서 실행될 수 없음
  • But, 각 이그제큐터는 Task 실행을 위해 동적으로 할당된 여러개의 Slot을 가지며, 이그제큐터가 떠 있는 동안 여러 Task들을 Slot만큼 동시 실행할 수 있음
  • Stage 당 Task 개수는 해당 Stage의 결과 RDD의 파티션 개수와 대응
  • 클러스터는 각 Stage마다 필요한 모든 Task를 동시에 실행할 수 없음
  • 각 이그제큐터가 사용하는 core 개수에 대응되며, 스파크는 할당된 이그제큐터 코어 개수의 총합보다 더 많은 Task를 동시에 실행할 수 없음
    • 병렬처리가능한 task 수 = ( num of executor * num of core per executor )
    • Task 실행을 위한 Slot보다 많은 파티션, 즉 Task가 있다면, 추가적인 Task는 최초 실행된 Task가 끝난 다음 자원 사용이 가능할 때 실행
  • 하나의 Stage가 시작하기 전에, 이전 Stage의 모든 Task가 완료되어야만하고, 이런 Task를 분산해주는 과정은 TaskScheduler가 담당
    • TaskScheduler가 FIFO인지는 확인해봐야함