8.스파크 최적화 및 디버깅

이번 장에서는 스파크 어플리케이션이 어떻게 설정하고 실제 작업 환경을 어떻게 최적화하고 디버깅하는지에 대해 알아볼 것이다.

1. SparkConf로 스파크 설정하기

스파크의 최적화는 ‘스파크 애플리케이션의 실행 설정을 바꾸는 것 이라고 생각해도 된다.

주된 설정 메커니즘은 SparkConf 클래스 이다.

SparkConf 객체

  • Key,Value 형태의 쌍을 이루고 있다
  • SparkConf 값들은 애플리케이션 코드 안에 프로그래밍으로 정의

val conf = new SparkConf().setMaster(local).setAppName(Spark SCALA)
val sc = new SparkContext(conf)

spark-submit

  • 동적으로 설정값 지정할 수 있음
  • 새로운 SparkConf가 만들어질 때 자동으로 감지되고 값을 채워지게 된다.
  • 즉, spark-commit을 사용한다며, 사용자 어플리케이션은 그냥 ‘텅 빈’SparkConf를 만들어지고, –conf 플래그를 이용하여 설정값을 설정한다.

bin/spark-submmit 
  -- class com.example.MyApp
  -- master local[4]
  -- name "My Spark App"
  -- conf spark.ui.port=36000
  maApp.jar

conf/spark-defaults.conf

  • spark-commit으로 설정값을 파일에서 읽어 올 수 있음
  • 기본으로 설정파일을 찾은 뒤, 파일에서 공백으로 구분된 Key,valye 쌍을 찾는다.

bin/spark-submmit 
  -- class com.example.MyApp
  -- properties-file my-config.conf
  maApp.jar

참고!

  • 실행하는 애플리케이션의 SparkConf는 한번 SparkContext의 Contructor에 넘겨지고나면 수정이 불가능하다
  • 즉, SparkContext가 초기화되기 전에 모든 설정값이 결정되어야한다. ==

그렇다면.. 위의 3가지 방법으로 같은 설정값을 주었을 때, Spark 어플리케이션은 어떤 설정값을 물고 실행할까???

  1. 코드상의 SparkConf.set()

  2. spark-submit의 flag

  3. 설정 파일 ( default )

이렇게 정해졌으며, 실제 적용됬는지는 웹UI를 통해 확인할 수 있다.

  • 데이터를 셔플하는 데에 쓸 local 저장 디렉터리를 지정하기 위한 SPARK_LOCAL_DIRS 환경변수를 conf/spark-env.sh 안에 쉼표로 구분된 경로들로 export 해줘야한다.

    -물리적인 서버들 간에 위치가 달라지므로…


2. 실행을 구성하는 것 : 작업 (job) , 태스크 (task), 작업 단계 (stage)

이제부터 스파크 시스템의 내부 설계를 좀 더 자세히 이해해보자

스파크는 RDD의 논리적 표현들을 여러 개의 연산들을 태스크로 합쳐 물리적 실행 계획으로 바꾼다.

  1. 논리적 표현( Transformation ) : DAG 생성

  2. 모여서 Stage를 만든다

  3. 물리적 실행 계획 생성 ( Action )

  4. DAG 추적해 Action을 수행

RDD의 가계도 출력하는 toDebugString() 메소드 제공한다.

  • 가계도란? 현재 RDD가 Transforamtion 을 통해 지금의 결과가 나오기 까지의 RDD의 history라고 생각하자.
  • 이런 가계도 ( lineage )가 있기 때문에 RDD( Resilient )는 문제가 발생해도 가계도를 보면서 원래의 RDD 형태로 복원이 가능한 것이다.

  • Job은 여러개의 stage로 구성되며 , 이 stage는 여러개의 Task로 구성된다. 즉, Task 는 Spark 에서 수행되는 가장 작은 단위이다.

  • 각 파티션에 있는 데이터는 다 다르지만, 같은 일을 수행하는 태스크를 실행시킨다.

  • 태스크 실행 순서

    1. 저장 장치나 존재하는RDD ( caching 된 것) 나 셔플 결과물로 부터 입력 데이터를 가져온다

    2. RDD를 계산하기 위해 필요한 연산들을 수행 (filter() map())

    3. 결과를 셔플 or 외부 저장장치 쓰거나 or 드라이버 ( count() 같은 action 값) 에게 돌려준다


스파크 실행 순서

  1. 사용자 코드가 RDD의 DAG를 정의한다.

    • RDD의 연산들은 새로운 RDD를 만들고, 이 것은 부모를 참조해가면서 그래프가 만들어진다.
  2. DAG가 Action의 실행 계획으로 변환하게 된다.

    • RDD에서 Action을 호출하면 반드시 연산을 수행해야한다.
    • 스파크의 스케쥴러는 RDD들이 필요한 모든 연산을 수행하도록 Job을 제출
    • 이 Job은 하나 이상의 Stage를 가지며, 아는 태스크들로 구성된 병렬 집단 연산들이다.
    • 각 Stage는 DAG에서 하나의 이상의 RDD와 연계된다.
  3. Task들이 스케쥴링되고 클러스터에서 실행된다.

    • Stage은 순서대로 실행되면 RDD의 조각들( 파티션 )을 연산하기 위한 Task를 싱행한다.

3. 정보찾기 ( = 스파크 웹 UI )

  1. Job
    • 진행 상황과 작업 단계, 태스크 등에 대한 수치들
  2. Storage
    • 드라이버와 익스큐터 로그 영속화된 RDD의 정보 ( cache() or persist() )
  3. Executor
    • 애플리케이션에 존재하는 Executor 목록
  4. Environment
    • 스파크 설정 디버깅
  5. 드라이버와 익스큐터 로그

4. 성능에 관한 핵심 고려사항

  • 성능으로 올리기 위해 코드레벨 ( 위의 3개 ) & 클러스터와 환경 최적화 (1개)

병렬화 수준

RDD의 논리적 표현은 객체들의 모음이다.

물리적 실행 간에 RDD는 여러개의 Partition으로 나뉘고, 각 Parition은 전체 데이터의 일부분만 가지고 있다.

스파크는 각 Partition 당 저장된 데이터를 처리할 Task를 하나씩 만든다.

Task는 기본적으로 한의 Core를 요청한다.

파티션?

HDFS로 읽은 RDD는 블럭(128MB)를 하나의 Partition을 가진다.

다른 RDD에서 읽은 파일은 부모RDD의 크기에 맞춰 병렬화 수준이 정해진다.

병렬화 개수가 적다. ( = 스파크가 리소스들을 놀게 만든다. ) ex) 하나의 application에 1000개의 코어를 설정했는데 , 30개의 Task만 실행하는 작업이라면.. 늘리자..

병렬화 수준 조정 방법

  1. 병렬화 정도 인자로 전달

  2. 이미 존재하는 RDD에 대한 파티션 재배치

    repartition() / coalesce()

직렬화 포맷

네트워크로 데이터를 전송하거나 디스크에 쓸 떄, 갹채를 직렬화해 바이너리포맷으로 변환시켜야한다.

자바의 내장 직렬화 / Kryro

메모리 관리

Executor 내부의 메모리의 목적

  1. RDD 저장용

    RDD.persist() / RDD.cache() 호출할 떄 그 RDD를 메모리 버퍼에 저장

  2. 셔플 및 집합 연산 버퍼

    셔플 연산

하드웨어 프로비저닝