[HP-Spark] 파티셔너와 키/값 데이터
by Jaesang Lim
파티셔너와 키/값 데이터
- 스파크의 파티션 하나는 하나의 태스크와 연관되는 병렬 실행의 한 단위
- 명시적 파티셔너가 없는 RDD는 데이터 크기와 파티션 크기에 의해서만 데이터를 파티션들에 할당함
RDD 파티션되는 방식을 변경하는 메소드
- repartition / coalesce
- RDD의 값에 개의치 않고 파티션 개수를 바꾸는데 쓸 수 있음
- repartition은 RDD를 해시파티셔너와 주어진 파티션 개수와 함께 셔플함
- coalesce는 요구하는 파티션 개수보다 현재 파티션 개수가 적을 경우, 전체 셔플링을 회피하기 위한 repartition은 최적화 버전
- partitionBy
- 파티션 개수가 아니라, 파티셔너 객체를 받아 새로운 파티셔너로 RDD를 셔플
- 파티셔너는 키의 값을 기반으로 파티션을 레코드에 할당
- repartition / coalesce는 명시적으로 파티셔너를 RDD에 할당하지 않음
스파크 파티셔너 객체 사용하기
파티셔너란?
- 레코드를 어떻게 분산하고 그로 인해 각 태스크가 어떤 레코드들을 처리할 것인가를 정의하는 것
- numPartitions, getPartition 두 함수를 가지는 추상 클래스
- numPartitions
- 파티셔닝이 끝난 후 RDD의 파티션 개수를 정의
- getPartition(key)
- 키에 대해 키와 연계된 레코드가 전송되는 파티션의 정수 인덱스와의 매핑을 정의
- numPartitions
해시 파티셔닝
- 페어 RDD 연산의 기본 파티셔너, HashPartitioner
- 키의 해시값을 기반으로 자식 파티션의 인덱스를 결정
- 인자로 partitions 사용, RDD의 파티션 개수와 해싱 함수에 쓰일 버킷 개수를 결정
- partition를 정의안하면, spark.default.parallelism 값 사용
/**
* A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
* Java's `Object.hashCode`.
*
* Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
* so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
* produce an unexpected or incorrect result.
*/
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
레인지 파티셔닝
- 동일한 범위의 키들에 대한 레코드들을 주어진 파티션에 할당
- 레코드들을 정렬해서 해당 파티션 안에 있는지 확인할 수 있도록 정렬을 요구하므로, 전체 RDD가 정렬될 것
- 샘플링과 전체 파티션들에 레코드를 균등하게 분배시키는 최적화에 의해 각 파티션에 지정될 범위를 결정
- 한 키에 대한 모든 레코드 중 중복 키가 너무 많으면 해시 파티셔닝 처럼 메모리 에러를 발생시킬 수 있음
- 인자로 partitions 뿐 아니라, 샘플을 위한 실제RDD를 필요로함
- RDD는 튜플이여야하며, 키들은 정의된 순서를 갖고 있어야함
- 샘플링은 실제로 RDD를 부분적으로 평가하며, 실행 그래프를 끊게 됨
- 그래서, 트랜스포메이션이면서 액션이기도 함
/**
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
*
* @note The actual number of partitions created by the RangePartitioner might not be the same
* as the `partitions` parameter, in the case where the number of sampled records is less than
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
// too long..
}
사용자 파티셔
- 키의 해시값, 순서에 의한 것이 아닌 데이터 파티셔닝을 위한 특별한 함수 정의할 떄 사용
다음과 같은 메소드를 구현해야함
- numPartitions
- 파티션의 개수를 돌려주는 메소드
- getPartition(key : Any)
- 키를 받아 키의 레코드가 속한 파티션의 인덱스 리턴
- equals
- 파티셔너 간의 동질성을 정의하는 메서드
- HashPartitioner는 파티션 개수가 같으면 true 리턴
- RangePartitioner는 범위 구분이 같다면 true 리턴
- 조인이나 cogroup에서 RDD가 어떤 파티셔너에 의해 파티셔닝이 되어있다면, 재파티셔닝을 하지 않기 때문에 중요함
- hashcode
- HashPartitioner은 파티션의 개수
- RangePartitioner은 각 범위 구분값의 타입에서 가져온 해시 함수를 그대로 사용
- 모든 범위 경계값에 대해 해시함수 호출하고 이들을 합쳐서 하나의 값으로 만듬
- 같은 범위들을 지닌다면 동일한 hashcode가 나오고, 범위가 하나라도 달라지면 역시 달라짐
파티셔닝을 유지하는 좁은 트랜스포메이션들
키/값 쌍의 값 부분만 바꾸는 종류가 아니라면, 결과 RDD는 파티셔너를 가지지 않음
- map/flatMap
- 키를 바꿀 가능성이 있으므로, 키를 변경하지 않더라고 결과 RDD는 파티셔너를 가지지 않ㄷ음
- mapValues
- 파티셔너 보존
- mapPartitions
- perservesPartitioning 플래그가 true로 설정하면 보존함
/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
val sortedData = data.sortByKey()
val mapValues: RDD[(Double,Strring)] = sortedData.mapValues(_.toString)
assert(mapValues.partitioner.isDefined, " Using Map Values perserves partitioning")
val map = sortedData.map(pair => (pair._1, pair._2.toString))
assert(map.partitioner.isEmpty, " Using map deoest not perserve partitioning")
Subscribe via RSS