6. 고급 스파크 프로그래밍

1. 소개

여기서는 두가지 공유 변수RDD 트랜스메이션들을 구축하는 고비용 연산들에 대한 batch 작업에 대해 설명할 것이다 !

  1. 어큐뮬레이터 ( Accumulator )

    정보들을 누산해주는 것 ( 누산은 ‘누적’ + ‘합산’ 이라고 생각하자)

  2. 브로드캐스트 변수 ( Broadcast Variable)

    많은 값들을 분산 시켜준다.

그런데.. 공유변수란?

  • 데이터베이스 연결 (경험 상 JDBC Connection를 무분별하게 하면 상당한 CPU와 메모리에 영향을 주는 것을 확인했다.)
  • 난수 생성기 같은 초기화, 셋업 시간이 걸리는 작업에 대해서는 한번만(setup) 하고 공유하는 것이 효율적이다.
  • 스파크에 직접 지원하는 언어들 ( Python, Scala, Java )외의 언어들로 쓰여진 프로그램도 스파크에서 호출해서 사용할 수 있다.
  • 스파크가 언어와 별개로 동작하는 pipe() 메소드를 사용한다면 Stdin/stdout을 통한 다른 프로그램 R과 연동할 수 있다.

2. 어큐뮬레이터

공부하자마자 든 생각은, Hadoop MR에서의 Counter와 동일한 역활을 한다는 느낌을 강하게 받았다.

  • map(), Filter()는 드라이버 프로그램에 정의된 변수를 사용할 수 있다.
  • 하지만 ! 클러스터에서 실행 중인 각각 작업들은 변수의 복사본을 받아 작업하므로.. 업데이터 된 내용을.. 다시 드라이버 프로그램을 보낼 수 없다.

  • 그래서 공유 변수를 사용하여, 집합 연산(Aggregation)과 브로드캐스팅을 사용하여 위의 문제를 해결하자

어큐뮬레이터란?

작업 노드에서 드라이버 프로그램으로 보내는 값의 집한 연산에 대해 간단한 문법 제공 작업 수행 중에서 발생하는 일에 대한 개수를 디버깅 목적으로 사용

val file = sc.textFile("file.txt")

val blankLines = sc.accumulator(0)
// 0으로 초기화한 Accumulator[Int] 생성

val callSigns = file.flatMap ( line => {
 if (line == "" ){
   blankLines += 1
 }
 line.split(" ")
})
callSigns.saveAsTextFile("output.txt")
println("Blanck Lines : " + blankLines.value)
  • accumulator업데이트는 오직 action내부에서만 수행된다.
  • 스파크는 이러한 태스크들의 업데이트들이 단 한번만 이루어 지는것을 보장한다.
  • 태스크를 재시작할 경우 value값을 업데이트 하지 않을 것이다.
  • transformation에서 사용자들은 각 태스크들의 업데이트를 수행할때 주의해야한다.
  • 만약 태스크나 잡 스테이지가 재 실행되는 경우에는 한번 이상 이루어질 수 있다.

여기서 조심하자

  • 스파크는 Lazy Execution을 사용하기 때문에 어큐뮬레이터 값 증가는 반드시 map()이 실행되는 시점인 ‘savaAsTextFile() Action’이 발동된 이후이다.
  • 또한 작업노드의 Task에서는 value에 접근할 수 없다
  • 그래서 blankLines에 대한 값은 이후에 확인이 가능하다.

3. 브로드캐스트 변수

개념상으로 봤을 때, 하둡의 ‘ 분산캐시’와 비슷한 것 같다

브로드캐스트 변수?

  • 스파크 연산에 쓸 읽기 전용인 값을 모든 작업 노드에 효과적으로 전송하는 데 사용
  • 스파크가 클로저에서 쓰이던 모든 변수들을 작업 노드에 자동을 보내던 것을 생각해보자
  • 편리하다 .. 하지만!
    1. 작은 작업 사이즈에 최적화되어 있다..
    2. 병렬 작업에서 동일 변수를 사용할 수도 있으므로 효과적이지 못하다.
  • 단 한번만 Cluster에 보내진다.
  • 다른 protocol을 사용해서 보내진다. ( Torrent 방식인 듯 하다 )
  1. T 타입의 객체에 SparkContext.broadcast를 호출하여 Broadcast[T] 생성 (Serializable이라면 어떤 객체든 가능)
  2. value속성으로 값 접근
  3. 변수는 각 노드에 한번만 보내지며, 읽기 전용 취급 (변경하더라도 다른 노드들에는 영향을 주지 않음)
    • 읽기 전용이기 때문에 Immutable한 객체타입을 브로드캐스트하는 것이 안전
    • 드라이버 코드 밖에서는 브로드캐스트 변수의 값을 수정할 수 없다.

** 추가 상세 설명 **

  • Broadcast변수는 프로그래머가 읽기 전용의 변수값을 각 머신에 캐시할 수 있도록 해준다.
  • 각 노드에 효과적인 처리를 위해서 큰 입력 데이터셋의 복제본을 제공하는 것이다.
  • 또한 스파크는 broadcast 변수들을 효과적인 브로드 캐스트 알고리즘을 이용하여 커뮤니케이션의 비용을 줄이는 방향으로 분배를 시도한다.
  • 스파크 Action들은 stages의 집합을 통해서 수행된다. 이는 shuffle Operation을 통해서 분산되어 수행된다.
  • 스파크는 자동적으로 브로드캐스트를 통해 각 태스크들이 각 스테이지마다 필요한 공통 데이터를 분배한다.
  • 데이터 브로드캐스트는 각 태스크가 실행되기 이전에 Serialization된 형태의 캐시된 데이터를 Deserialization 한다.
    • 이는 명시적으로 브로드캐스트 변수들을 생성하는 것은 동일한 데이터에 대해서 복수개의 스테이지를 통해서 태스크가 수행될때만 유용하거나 혹은 Deserialization된 데이터 형태로 캐시될때 중요하다.
  • 브로드캐스트 변수들은 변수 v로 부터 SparkContext.broadcast(v)를 이용하여 생성된다.
  • 브로드캐스트 변수는 v를 감싸고 있으며, 변수는 value 메소드를 통해서 접근이 가능하다.
>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]

브로드 캐스트 최적화

  • 바이트 사이즈가 큰 값들을 브로드캐스팅할 때 값을 Serialization하거나 Serialization된 값을 네트워크로 보내는 시간이 오래걸린다면 병목현상 발생한다 ㅠㅠ
  • 즉, 빠르고 작은 단위의 데이터 직렬화 포멧을 선택하자
  • 기본적으로 쓰는 Java Serialization보다는 Kyro Serialization 사용하자

4. 파티션별로 작업하기

  • 각 데이터 셋에 대해 Setup 절차의 반복을 피하게 된다
    • JDBC connection 같은 것들
  • 파티션 기반 버전의 map과 foreach를 제공하여 RDD의 각 파티션에서 한번만 코드를 실행하게 해준다.

val = contactsContactLists = validSigns.distinct().mapPartitions{
 signs =>
 val mapper = createMapper()
 val client = new HttpClient()
 client.start()
 
 signs.map{
  sign -> createExchangeForSign(sign)
 }.map{
  case ( sign,exchange ) => (sign,readExchangeCalllog(mapper,exchange))
 }.filter( x=> x._2 != null)
}
함수 이름 호출 시 주어지는 것 리턴할 것 RDD[T]에 대한 함수 원형
mapPartitions() 각 파티션에 있는 데이터의 반복자 결과 값에 대한 반복자 f : (Iterator[T]) -> Iterator[U]
mapPartitionsWithIndex() 정수로 된 파티션 번호와 데이터의 반복자 결과 값에 대한 반복자 f : (Int, Iterator[T]) -> Iterator[U]
foreachPartitions() 데이터의 반복자 없음 f : (Iterator[T]) -> Unit

5. 외부 프로그램과 파이프로 연결하기

  • 스파크는 Scala, Python, Java를 제외하고 다른 옵션으로도 원하는 작업을 수행할 수 잇도록 파이프 메커니즘을 제공한다.
  • 대표적으로 R 스크립트

  • pipe() 메소드를 통해 Unix 표준 입출력 스트림으로 읽고 쓸 수 있는 모든 언어로 작업의 일부 작성 가능
  • pipe()를 쓰면 각 RDD의 데이터를 STDIN으로 부터 String으로 읽어 들이는 Transformation을 만들 수 있고 그 결과를 String으로 써줄 수 있다.
  • 대부분은 RDD의 데이터를 파이프를 통해 외부 프로그램 또는 스크립트로 전달할 때 사용
val distScript="./src/R/finddistance.R"
val distScriptName = "finddistance.R"
sc.addFile(distScript)
val distances = ....().pipe(Seq(SparkFiles.get(distScriptName)))
println(distances.collect().toList)

Sparkcontext.addFile(path)

  • 스파크 작업 중에 각 작업 노드에서 다운로드받을 파일 리스트 생성
  • 해당 작업은 Action이 실행되면 각 노드에서 다운로드 된다.
  • sc.getRootDirectory OR sc.get(fileName)으로 찾는다
  • pipe()가 각 작업 노드에서 스크립트를 찾을 수 있는 방법
  • RDD.pipe(Seq(SparkFiles.get(“finddistance.R”),”,”)) = 구분자

6. 수치 RDD 연산들

  • 수치 데이터를 가지고 있는 RDD에 대해 직관적인 통계 연사 제공

수치 연산

  • 한 번에 하나씩 처리하는 방식으로 수치 모델을 처리하도록 하는 Streaming 방식 알고리즘으로 구현
  • 데이터 위에서 한 단계 처리된 후 state() 메소드 호출에 의해 StatsCounter 객체 반환
  1. count()
  2. mean()
  3. sum()
  4. max()
  5. min()
  6. variance()
  7. sampleVariance()
  8. stdev()
  9. sampleStdev()

참고 사항


분산캐시란?

  • 사이드 데이터 : Job이 주요 데이터셋을 처리하는데 필요한 별도의 읽기 전용 데이터
  • 잡 환경 설정 ( Configuration ) 에서 사이드 데이터를 직렬화하는 방법 보다, 분산 캐시 기법으로 분산하는 것이 더 좋다
  • 분산 캐시는 실행 시점에 파일과 아카이브의 사본을 Task 노드에 복사하여 이용하는 서비스
  • 네트워크 대역폭을 줄이기 위해, 파일은 잡 단위로 특정 노드에 복사한다.
  • GenericOptionsParser
    • files
    • archives (.jar , .tar, .zip )
    • libjars jar파일을 매퍼, 리듀서 태스크에 추가
  • 일반적으로 위의 모든 속성들은 파일을 HDFS에 복사한다
  • 그리고 태스크가 실행되기 전에 파일에 접근할 수 있도록 nodeManger가 HDFS에 있는 파일을 localDisk(캐시)에 복사한다 (= 로컬화)
  • NodeManger는 파일을 사용하는 Task수를 파악하기 위해 ‘참조 빈도수’ 관리
  • 참조 빈도수가 0이면 파일 삭제
  • 노드의 전체 캐시 크기가 10GB 공간 확보 = LRU 정책으로 삭제

** 클로저?**

  • 스파크가 어떤 단계를 실행하면 그 단계에서 태스크를 싱행하는데 필요한 모든 정보를 바이너리 형태로 만든다.
  • 이 바이너리를 실행할 함수의 ‘클로저(closure)’라고 한다
  • 클로저는 함수가 참조하는 모든 자료구조를 포함하고, 스파크는 클로저를 클러스터 상의 모든 Executor에 배포한다
  • spark.broadcast.BraoadCast[T] 타입의 객체
  • 작업 수행 시 Broadcast 변수에 value를 호출해서 값을 가져온다
  • 이 객체는 노드에 딱 한번만 보내진다.
  • Broadcast Variables
  • 일반적으로 Spak에서
    • 모든 function 변수들은 그들을 사용하는 execcutor로 보내진다.
    • 다른 operation에서 다시 사용된다면 같은 변수가 다시 보내진다.