본문 바로가기
Big Data

[SPARK] tutorial (pyspark)

by 유주원 2015. 11. 24.

Apache Spark with the Python


http://nbviewer.ipython.org/github/spark-mooc/mooc-setup/blob/master/spark_tutorial_student.ipynb 사이트에 있는 내용을 번역함.


Spark Context


- spark에서 통신은 driver와 executor 사이에서 발생한다. driver는 실행에 필요한 spark job들을 가지고 있으며, 이러한 spark job들은 executor에서 실행되기 위해 task 단위로 쪼개진다.


- spark와 API를 사용하기 위해서는 SparkContext 사용이 필요하다. SparkContext가 생성되면, 마스터에게 동작 가능한 core들을 요청한다. 마스터가 동작 가능한 core들을 설정하면 설정된 core들은 다른 application에서는 사용되지 않는다.


- Spark Context는 Spark에 있어서 중요한 entry point이며 Resilient Distributed Datasets(RDD)를 생성하는데 사용될 수 있다.

- SparkContext는 아래의 코드를 이용하여 가져올 수 있다. (pyspark 이용)


from pyspark import SparkContext


sc = SparkContext()


- 그 밖의 기본 명령어


type(sc)                       # sparkcontext 타입 확인하기

dir(sc)                                # sparkcontext 함수와 attribute 리스트 확인하기

help(sc)                             # sparkcontext 도움말 보기

sc.version                          # sparkcontext 버전 확인하기




Using RDDs


- spark 사용에 있어 첫 번째 단계는 immutable 형태의 base RDD를 만드는 것이다. 그 후에 base RDD에 여러 형태의 transformation을 적용해 볼 수가 있다. 생성된 RDD는 immutable이기 때문에 딱 한번만 생성되고 변경될 수가 없다. 결과적으로 base RDD에 적용된 각각의 transformation은 각각의 새로운 RDD를 생성하게 된다.


- spark에서는 lazy evaluation을 사용하기 때문에, transformation은 실제 action이 발현되기 전까지는 실행되지 않는다.


- 다음 예제에서는 range와 xrange를 이용해서 integer list를 만들것이다. (실제로는 python3를 사용하기 때문에 range가 곧 xrange임). xrange와 range의 차이를 설명하자면, xrange는 generator를 생성하고 range는 범위에 포함되는 모든 숫자 리스트를 생성한다.

  ex) xrange => stream(x){list[x , stream(x+1)]}

       range => [1,2,3,4,5,6,7, ......]


- spark에서 dataset은 집합 리스트로 표현될 수가 있는데, 이 집합 리스트는 각기 다른 머신 상에 저장되기 위해 각각의 파티션으로 분할이 된다.


- 각각의 파티션은 리스트 내의 유일한 집합 서브셋을 가진다. (중복이 안된다는 말)


- spark에서는 이렇게 저장되는 dataset을 "Resillient Distributed Datasets" 이라고 한다.


- spark 특징 중 하나가 (Hadoop과 비교해서 설명하자면) disk가 아닌 메모리 상에 데이터가 저장된다는 것이며, 이 점은 spark를 더 빠르게 실행될 수 있음을 설명해 준다.


- RDD를 생성하기 위해서는 sc.parallelize()를 사용해야 한다. parallelize()를 사용하는 것은 spark에서 input data의 새로운 집합을 생성하라고 말하는 것과 같은데 예를 들어 sc.parallelize(data, 8)이라고 하면, 데이터가 메모리에 저장이 될 때 데이터를 8조각으로 쪼개서 메모리에 저장하라는 뜻이랑 같다.


- RDD 사용 시에는 xrange가 range보다 더 효율이 좋다. (아무래도 메모리라는 한정적인 공간이다보니 generator를 올리는게 더 효율적일듯)


data = range(1, 10001)                                                           1부터 10000까지의 숫자 리스트 생성.

rangeRDD = sc.parallelize(data, 8)                                         # 해당 데이터를 8개의 파티션으로 나눔

print('type of rangeRDD: {0}'.format(type(rangeRDD)))          # 해당 RDD 타입 확인

help(sc.parallelize)                         

rangeRDD.getNumPartitions()                                                 # 해당 RDD의 파티션 숫자 확인

print(rangeRDD.toDebugString())                                            

print('rangeRDD id: {0}'.format(rangeRDD.id()))                     # RDD id 확인

rangeRDD.setName('My first RDD')                                       # RDD 에 이름 지정

print(rangeRDD.toDebugString())

help(rangeRDD)


- 지금까지 하나의 데이터를 여러 개의 파티션으로 분할하는 dataset을 생성해 봤다. 이제는 각각의 파티션에서 해당 dataset에 연산 작용을 할때 어떤 상황이 발생하는 지를 살펴보자.


- 가장 보편적인 transformation인 map(f)를 예로 들자면, dataset에 있는 각각의 item에 함수 f가 적용이 되고 task 단계가 시작이 된다. task는 각각의 파티션에서 시작이 되고 서로 다른 input data지만 동일한 로직을 실행하게 된다. task가 완료되면 새로운 파티션이 출력된다.



- map transformation이 적용되면 base RDD에 있는 각각의 item은 새로운 RDD로 재탄생한다. 가령 base RDD가 20개의 elements를 가지고 있다면 새로운 RDD 역시 20 개의 elements를 가지게 된다.


Collect()


- spark는 lazy evaluation 방식이라 transformation이 발생해도 실제 데이터는 변경되지 않는다고 위에서 언급하였다. 그렇다면 어떻게 해야 실제 데이터가 적용이 될까?


- collect() 함수를 통해서 분배된 데이터들을 새로운 list로 합치고 transformation도 적용시킬 수가 있다.


- 하지만 collect() 함수는 적은 양의 데이터 일 경우에만 사용하도록 하자!


- collect() 함수가 바로 action이며, 실제로 collect() 함수가 호출이 되면 RDD가 메모리에 올려져서 계산이 이루어 진다.


- 아래의 그림을 보면 dataset이 4개의 파티션으로 분할되었고 4개의 collection() task가 시작되었다. 각각의 task는 자신이 속한 파티션의 entry들을 수집하고 그 결과를 SparkContext에 전송한다.

def sub(value):

    return (value - 1)


subRDD = rangeRDD.map(sub)              # RDD에 sub 함수를 적용시킨다.

subRDD.collect()                                      # 실제 action이 이루어지고 sub 함수가 적용된 숫자 리스트가 리턴된다.




Count()


- RDD 내에 있는 요소들의 개수를 세는 함수. 이 함수 역시 action에 해당한다.


- collect와 마찬가지로 각각의 task가 자신의 파티션에 있는 entry의 개수를 센 후 그 결과를 SparkContext에 전송한다.

print(rangeRDD.count())

print(subRDD.count())


filter()


- 함수 결과가 참인 경우에만 요소들을 통과시키는 함수. 결과로 새로운 RDD를 생성한다. action 아님.

def ten(value):

    if(value < 10):

        return True

    else:

        return False


filteredRDD = subRDD.filter(ten)

print(filteredRDD.collect())


Lambda()


- 런타임에서 이름을 할당 받을 필요가 없는 한 줄 짜리 익명 함수를 만들고 싶은 경우에 사용. 


lambdaRDD = subRDD.filter(lambda x:x < 10)

lambdaRDD.collect()


evenRDD = lambdaRDD.filter(lambda x: x % 2 == 0)

evenRDD.collect()




그 밖의 함수


- first() : dataset에서 첫 번째 entry만 가지고 오고 싶다.


take() : dataset에서 해당 개수만큼 가져오고 싶다.


- takeOrdered() : 해당 개수만큼 데이터를 가져오는데 정렬해서 가져올 수가 있다.(오름차순, 내림차순)


- top() : takeOrdered()와 비슷.


- first()와 take()의 결과는 RDD가 얼마나 partition 되었느냐에 따라 그 결과 값이 달라 질 수 있다. 또한 collect 대신 take(n)을 써서 값을 가져올 수도 있다.


- reduce()는 계산된 결과를 하나의 값으로 합쳐준다. reduce는 파티션 레벨에서 한 번 적용되고 파티션으로부터 결과를 받아 다시 합쳐진다.


print(filteredRDD.first())

print(filteredRDD.take(4))

print(filteredRDD.takeOrdered(4, lambda s: -s))

print(filteredRDD.top(4, lambda s:-s))


from operator import add

print(filteredRDD.reduce(add))

print(filteredRDD.reduce(lambda a,b:a+b))

print(filteredRDD.reduce(lambda a,b,:a-b))

print(filteredRDD.repartition(4).reduce(lambda a,b:a-b))                 # repartition을 하면 값이 -45가 아니라 21이 나오는데

                                                                                          왜 그러는지 잘 모르겠음

print(filteredRDD.repartition(4).reduce(lambda a,b: a+b))                


- takeSample() : dataset으로부터 랜덤으로 원소들을 리턴한다. withReplacement 파라미터가 있는데 True일 경우 동일한 원소가 여러번 리턴될 수 있다. seed 파라미터는 랜덤 넘버를 생성할 때 seed 값으로 설정할 수 있다. action이기 때문에 실제 메모리에서 계산이 이루어진다.


- countByValue() : 값으로 그룹화 한 후 그 개수를 count한다.


print(filteredRDD.takeSample(withReplacement=True, num=6, seed=500))

repetitiveRDD = sc.parallelize([1,2,3,1,2,3,1,2,1,2,3,3,3,4,5,4,6])

print(repetitiveRDD.countByValue())


- flatmap() : map()을 수행할 때 때때로 collection이나 튜플이 리턴될 수도 있다. map()이 적용된 RDD는 iterator로 만들어진 새로운 RDD를 얻는데 iterator 안에 포함된 값으로 RDD를 구성하기 원할 경우에 flatmap()을 사용한다.


wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']

wordsRDD = sc.parallelize(wordsList, 4)


singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x+ 's'))

print(singularAndPluralWordsRDDMap.collect())

singularAndPluralWordsRDDMap = wordsRDD.flatMap(lambda x: (x, x+ 's'))

print(singularAndPluralWordsRDDMap.collect())


simpleRDD = sc.parallelize([2,3,4])

print(simpleRDD.map(lambda x: range(1,x)).collect())

print(simpleRDD.flatMap(lambda x: range(1,x)).collect())


- reduceByKey() : pair RDD로 이루어져 있는 경우에 적용 가능. ex) [(a,1), (b,2)]. 키는 튜플의 첫 번째 원소가 되고 값은 두 번째 원소가 된다.


- reduceByKey()는 대규모 분산 dataset에 대해 매우 효과적으로 동작한다. 그 이유는 node를 통해 data가 셔플이 일어나기 전에 각각 파티션에서 키를 통해 출력 데이터를 결합할 수 있기 때문이다.


- 아래 그림은 reduceByKey 동작을 잘 설명하고 있다. data 셔플이 일어나기 전에 lambda 함수를 적용해서 출력을 결합시키고 그 이후에 최종 결과물을 만들기 위해 lambda 함수를 다시 적용해서 결과를 출력한다.

- groupByKey() : 모든 key-value 쌍이 셔플한다. 네트워크 상에서 불필요한 데이터 전송을 야기한다. 또한 하나의 키가 많은 key-value 쌍을 가지고 있다면 out-of-memory를 발생시킬 수도 있다. (spark에서는 메모리가 차면 disk로 자동 swap 시키긴 하지만 이 작업은 자주 일어나는게 아니라 한번에 일괄적으로 동작하기 때문에 out-of-memory가 발생할 수 있음)


import math

pairRDD = sc.parallelize([('a',1), ('a',2), ('b',1)])

print(pairRDD.groupByKey().mapValues(lambda x: list(x)).collect())

print(pairRDD.groupByKey().mapValues(lambda x: math.fsum(x)).collect())

print(pairRDD.reduceByKey(add).collect())




Caching RDDs


- 효율적인 RDD 메모리 관리를 위해서는 컨텐츠를 메모리에 보관하는게 유리하다. 하지만 메모리는 제한적이기 때문에 너무 많은 메모리가 저장되어 있으면 spark에서는 자동적으로 RDD를 삭제할 것이다.


- cache() 함수를 통해 만들어진 RDD를 메모리에 상주 시킬 수가 있다. 너무 많은 RDD를 메모리에 상주시키면 spark가 뻑나고, 가장 적은 빈도로 사용했던(LRU) RDD 부터 삭제를 시작한다. 해당 RDD에 접근할 때에 spark에서는 RDD를 다시 생성 시키긴 하지만 생성 시간이 오래 걸린다.


filteredRDD.setName('My Filtered RDD')

filteredRDD.cache()

print(filteredRDD.is_cached)


- spark는 자동으로 RDD를 관리하고 메모리가 꽉차면 disk로 옮겨 쓴다. 가장 효율적인 방법은 RDD 사용을 끝냈으면 unpersist() 함수를 써서 RDD를 메모리에 caching 하는 것을 막는 것이다.


- toDebugString() 함수는 현재 생성된 RDD 정보를 보여준다.


- getStorageLevel()은 RDD가 현재 어느 위치에 저장되어 있는지(메모리 or disk) 보여준다.


print(filteredRDD.toDebugString())

filteredRDD.unpersist()

print(filteredRDD.getStorageLevel())

filteredRDD.cache()

print(filteredRDD.getStorageLevel())


pyspark 구동 방식


- 내부적으로 spark JVM에서 돌아가고, pySpark는 Py4J를 사용하는 JVM 안의 python code 에서 돌아간다.


- Py4J는 python 프로그램이 JVM 안의 자바 object를 동적으로 접근 가능하게 도와준다.


- 함수는 python interpreter 안에 java object가 있고 java collection이 표준 python collection method를 통해 접근 가능할 때 호출이 가능하게 된다.


https://github.com/KonanAcademy/spark/blob/master/part3/week3/Learning_Apache_Spark.ipynb