Spark Join에 대한 정리
by Jaesang Lim
Spark Join
1. 조인 표현식
- 스파크는 왼쪽과, 오른쪽 데이터셋에 있는 하나 이상의 키값을 비교함
- 왼쪽 데이터셋과 오른쪽 데이터셋의 결합 여부를 결정하는 조인 표현식 ( join Expression)의 평가 결과에 따라 두개의 데이터셋을 조인함
2. 조인 타입
- 조인 표현식은 두 로우의 조인 여부를 결정
- But, 조인 타입은 결과 데이터셋에 어떤 데이터가 잇어야 하는지 결정
- 내부 조인 ( Inner Join )
- 왼쪽과 오른쪽 데이터셋에 키가 있는 로우 유지
- 외부 조인 ( Outer Join )
- 왼쪽이나, 오른쪽 데이터셋에 키가 있는 로우 유지
- 왼쪽 외부 조인 ( Left Outer Join )
- 왼쪽 데이터셋에 키가 있는 로우 유지
- 오른쪽 외부 조인 ( Right Outer Join )
- 오른쪽 데이터셋에 키가 있는 로우 유지
- 왼쪽 세미 조인 ( Left Semi Join )
- 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 있는 경우에는 키가 일치하는 왼쪽 데이터셋만 유지
- 왼쪽 안티 조인 ( Left Anti Join )
- 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 없는 경우에는 키가 일치하지 않은 왼쪽 데이터셋만 유지
- 자연 조인 ( Natural Join )
- 두 데이터셋에서 동일한 이름을 가진 컬럼을 암시적으로 결합하는 조인
- 교차 조인 ( Cross Join / Cartesian Join )
- 왼쪽 데이터셋의 모든 로우와 오른쪽 데이터셋의 모든 로우를 조합
// 임시 데이터 셋
val person = Seq(
(0, "Bill Chambers", 0, Seq(100)),
(1, "Matei Zaharia", 1, Seq(500, 250, 100)),
(2, "Michael Armbrust", 1, Seq(250, 100)))
.toDF("id", "name", "graduate_program", "spark_status")
val graduateProgram = Seq(
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley"))
.toDF("id", "degree", "department", "school")
val sparkStatus = Seq(
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor"))
.toDF("id", "status")
3. 내부 조인
- Dataframe이나 테이블에 존재하는 키를 평가하고, True로 평가되는 로우만 결합
val joinExpression = person.col("graduate_program")
=== graduateProgram.col("id")
val innerjoinResult = person.join(graduateProgram, joinExpression)
innerJoinResult.show()
val joinType = "inner"
val innerjoinResult = person.join(graduateProgram, joinExpression, joinType)
innerJoinResult.show()
4. 외부 조인 / 왼쪽 / 오른쪽
- Dataframe이나 테이블에 존재하는 키를 평가 ( true Or false )한 로우를 포함시킴
- 왼쪽 외부 조인일 경우, 왼쪽의 모든 로우와, 왼쪽 DataFrame과 오른쪽 DataFrame의 로우를 함께 포함
- 오른쪽 외부 조인의 경우, 오른쪽의 모든 로우와, 오른쪽 DataFrame과 왼쪽 DataFrame의 로우를 함께 포함
- 일치하는 로우가 없다면 null
val joinExpression = person.col("graduate_program")
=== graduateProgram.col("id")
val joinType = "outer" OR "left_outer" OR "right_outer"
val outerJoinResult = person.join(graduateProgram, joinExpression, joinType)
innerJoinResult.show()
5. 왼쪽 세미 조인
- 오른쪽 Dataframe의 어떤 값도 포함하지 않기 때문에, 다른 조인타입과는 약간 다름
- 단지 오른쪽 Dataframe은 값이 존재하는지 확인하귀 위해 값 비교 용도로 사용
- 기존 조인과 달리 Dataframe의 필터 정도로 볼 수 있음
6. 왼쪽 안티 조인
- 왼쪽 세미 조인과 반대, 비교키가 없는 로우만 반환
- SQL의 NOT IN
7. 조인 사용 시 문제점
7.1. 복합 데이터 타입의 조인
- Boolean을 변환하는 모든 표현식은 조인 표현식으로 간주
- 배열 값이 포함되는가?
person.withColumnRenamed("id","persionId") .join(sparkStatus, expr("array_cotains(spark_status,id)")) .show()
7.2. 중복 컬럼명 처리
- Dataframe의 각 컬럼은 스파크 SQL 엔진인 카탈리스트 내에 고유 ID가 있음
- 고유 ID는 카탈리스트 내부에서만 사용할 수 있으며, 직접 참조할 수 는 없음
문제상황
- 조인에 사용할 DataFrame의 특정 키가 동일한 이름을 가지며, 키가 제거되지 않도록 조인 표현식에 명시하는 것
- 조인 대상이 아닌 두 개의 컬럼이 동일한 이름을 가진 경우
val gradProgramDupe = graduateProgram
.withColumnRenamed("id", "graduate_program")
val joinExpr = gradProgramDupe.col("graduate_program")
=== person.col("graduate_program")
person.join(gradProgramDupe, joinExpr)
.select("graduate_program")
.show()
// 오류 발생 - Reference 'graduate_program' is ambigous ...
해결방안
- 다른 조인 표현식 사용
- 불리언 형태의 조인 표현식 대신, 문자열이나 시권스 형태로 바꾸는 일
- 이렇게 하면 조인을 할 때, 두 컬럼 중 하나는 자동으로 제거
- 조인 후 컬럼 제거
- 조인 전 컬럼명 변경
// 다른 조인 표현식 사용 예제 코드
val joinExpr = gradProgramDupe.col("graduate_program")
=== person.col("graduate_program")
person.join(gradProgramDupe, joinExpr).show()
// 위의 joinExpr 대신 문자열로 넣자
person.join(gradProgramDupe, "graduate_program")
.select("graduate_program")
.show()
// 조인 후 컬럼 제거 예제 코드
person.join(gradProgramDupe, joinExpr)
.drop(person.col("graduate_program"))
.select("graudate_program").show()
8. 스파크의 조인 수행 방식
- 스파크가 조인을 수행하는 방식을 이해하기 위해서 실행에 필용한 두가지 핵심 전략
- 노드간 네트워크 통신 전략
- 노드별 연산 전략
8.1. 네트워크 통신 전략
- 스파크는 조인 시 두가지 클러스터 통신 방식을 활용
- 셔플 조인 ( Shuffle Join )
- 전체 노드 간 통신을 유발
- 브로드캐스트 조인 ( Broadcast Join )
- 통신을 유발하지 않음
- 셔플 조인 ( Shuffle Join )
8.1 큰 테이블과 큰 테이블 조인 ( Shuffle Join )
- 셔플 조인은 전테 노드 간 통신이 발생
- 그리고 조인에 사용하는 특정 키나, 키 집합을 어떤 노드가 가졌는지에 따라 해당 노드와 데이터를 공유
- 네트워크 복잡해지고 많은 자원 사용
- 즉, 전체 조인 프로세스가 진행되는 동안 모든 워커 노드에서 통신이 발생
8.2 큰 데이블과 작은 테이블 조인 ( Broadcast Join )
- 테이블이 단일 워커 노드의 메모리 크기에 적합할 정도로 충분히 작다면, 조인 연산 최적화가능
- 브로드케이스 조인으로, 작은 Dataframe을 클러스터의 전체 워커노드에 복제
- 자원을 많이 사용하는 것 처럼 보일 수 있지만, 조인 프로세스 내내 전체 노드가 통신하는 현상을 방지 할 수 있음
- 조인 시작 시, 단 한번만 복제가 수행되고, 그 이후로는 개별 워커가 다른 워커를 기다리거나 통신할 필요없이 작업 수행
- 브로드케이스 조인은 이전 조인과 마찬가지로 대규모 노드 간 통신 발생
- 하지만 그 이후로는 노드사이의 추가적인 통신 발생하지 않음
- 따라서 모든 단일 노드에서 개별적으로 조인이 수행되므로, CPU가 큰 병목구간이 됌
- DataFrame API 수행하면 옵티마이저에서 브로드케이스트 조인을 할 수 있게 Hint 줄 수 있음
import org.apache.spark.sql.functions.broadcast
val joinExpr = person.col("graduate_program") === graduateProgram.col("id")
person.join(broadcast(gradudateProgram), joinExpr).show()
정리
- 조인 전에 데이터를 적절하게 분할하면 셔플이 계획되어 있더라도 동일한 머신에 두 DataFrame이 있을 수 잇음
- 따라서 Shuffle을 피할 수 있음
Subscribe via RSS