2024. 2. 5. 15:38ㆍBig Data
쓰면서 늘 헷갈리는게 foreachPartition인 것 같다.
이게 무슨 용도로 있는 함수인가...
그래서 이번 기회에 mapPartitions와 foreachPartition을 비교해 보기로 했다.
아래와 같이 코드를 작성한다.
num_rdd = sc.parallelize(range(100000), 10)
num_rdd.getNumPartitions()
getNumPartitions를 찍어보면 10개가 찍히는 걸 확인할 수 있다.
이제 map으로 돌릴 함수를 선언하자
def square(num_list):
for num in num_list:
yield num * num
mapPartitions의 경우 yield 값을 넘겨주어야 iterate 동작을 할 수가 있다.
이제 이 함수를 mapPartitions와 foreachPartition으로 각각 호출해 보자.
우선 mapPartitions로 호출한 결과이다.
num2_rdd = num_rdd.mapPartitions(lambda x: square(x))
num2_rdd.collect()
10개의 partition이 병렬적으로 동작하고 각 partition 내에 있는 item들이 제곱으로 곱해진 걸 확인 할 수가 있다.
yarn ui 창을 보면 10개의 executor가 생성되어 동시에 동작이 된 걸 확인해 볼 수 있다.
그리고 collect나 count 등의 실행 명령을 주어야 실행되는 걸 볼 수 있다.
이젠 foreachPartition을 실행해보자.
num3_rdd = num_rdd.foreachPartition(lambda x: square(x))
num3_rdd.collect()
num3_rdd.count()
mapPartitions를 호출한 것과 동일한 방식으로 호출해봤다.
처음에는 foreach기 때문에 partition이 병렬로 도는게 아니라 1개씩 순차적으로 도는 건가? 라고 생각을 했는데, yarn ui 화면을 보니 그렇지 않았다. foreachPartition도 mapPartitions와 마찬가지로 10개의 executor가 동시에 실행된 걸 확인할 수 있었다.
그리고 num3_rdd 결과가 None으로 찍히기 때문에, collect와 count 함수가 모두 에러가 발생했다.
그럼 foreachPartition의 경우 실행 시점이 언제지?? 라고 의문이 들었는데, 실행 시점은 foreachPartition을 호출한 바로 그 시점에서 함수가 실행이 된다.
구글링을 해보니 mapPartitions의 경우 map의 값을 변경 후 변경된 값을 쓰고 싶은 경우에 사용하고, foreachPartition의 경우 값 변경 없이 해당 map의 값을 실행시키는 용도일 경우 사용하는 것 같다.