본문 바로가기
Big Data

[SPARK] word count (pyspark)

by 유주원 2015. 11. 25.

이번에는 spark를 이용하여 word count를 해보려고 한다. 


참고 reference는 아래와 같다.


http://nbviewer.ipython.org/github/spark-mooc/mooc-setup/blob/master/lab1_word_count_student.ipynb#-(4f)-Count-the-words-


기본적인 개요는 아래와 같다.


1. base RDD와 pair RDD 만들기


2. pair RDD counting 하기


3. 단어 개수 세기와 평균 값 구하기.


4. 파일을 읽어서 word count 하기


1. base RDD와 pair RDD 만들기


아래와 같이 base RDD를 만들어 본다.


from pyspark import SparkContext


sc = SparkContext()


wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']

wordsRDD = sc.parallelize(wordsList, 4)


base RDD가 만들어졌다면, 이젠 pair RDD를 만든다. pair RDD는 각각의 element가 pair 튜플 형태로 구성되어 있는 RDD를 만한다. 우리가 만들 pair RDD 역시 ('<word>', 1) 형태의 튜플을 띄면 된다.


wordPairs = wordsRDD.map(lambda x:(x,1))


위와 같이 따라했다면 아마도 pair RDD가 [('cat',1), ('elephant',1), ('rat',1), ('rat',1), ('cat',1)] 형태로 만들어졌을 것이다.


2. pair RDD counting 하기


이젠 이렇게 만들어진 pair RDD를 word를 기준으로 counting 할 것이다. counting을 하기 위해 몇몇의 접근 방법이 있는데 

첫 번째는 바로 collect() 함수를 이용하는 것이다. 이 접근법은 data가 작을 경우에 사용이 가능하며, work에서 병렬 처리를 하는 것보다 수행 속도가 느리다.


두 번째는 groupByKey()를 이용한 방법이다. groupByKey()에는 두 가지 문제점이 있는데 

- 적절한 파티션으로 데이터 이동 시 많은 양의 데이터 트래픽이 요구된다.

- 결과 리스트가 매우 클 수가 있다. 결과 리스트가 매우 클 경우에 메모리 overflow가 발생한다. 예를들어 위키피디아에서 a나 the의 count를 센다면 그 양은 어마어마할 것이다.


wordsGrouped = wordPairs.groupByKey()

wordCountsGrouped = wordsGrouped.mpa(lambda args: (args[0], len(args[1])))

print(wordCountsGrouped.collect())


세 번째는 reduceByKey()를 이용한 방법이다. 가장 효율적인 방법으로, 일단 각자의 파티션 내에서 reduceByKey()를 적용하고 셔플을 하기 때문에 큰 dataset에서도 효율적이다.


wordCounts = wordPairs.reduceByKey(lambda x,y: x+y)

print(wordCounts.collect())


3. 단어 개수 세기와 평균 값 구하기


유니크한 단어 개수를 아래와 같이 계산한다.


uniqueWords = wordCounts.count()

print(uniqueWords)


단어 개수가 계산되면 reduce를 이용해서 mean을 구한다.


from operator import add


totalCount = wordCounts.map(lambda args: (args[1])).reduce(add)

average = totalCount / float(uniqueWords)

print(totalCount)

print(round(average,2))


4. 파일을 읽어서 word count 하기


위에서 작업한 코드들을 이용해서 word를 count하는 함수를 하나 만든다.


def wordCount(wordListRDD):

    wordPairs = wordsRDD.map(lambda x:(x,1))

     wordCounts = wordPairs.reduceByKey(lambda x,y: x+y)


     return wordCounts


print(wordCount(wordsRDD).collect())


대문자를 소문자로 변환해주고, 특수 문자를 제거하는 함수를 만든다.


import re


def removePunctuation(text):

    return re.sub("['!,@#$%^&*()_.:-]", "", text).lower()


print(removePunctuation('Hi, you!'))

print(removePunctuation(' No unser_score!"))


이제 파일을 읽은 다음 위에 만든 특수 문자를 제거하는 함수를 적용해 보자.


shakespeareRDD = (sc.textFile('pg100.txt', 8).map(removePunctuation)

print('\n'.join(shakespeareRDD.zipWithIndex().map(lambda args: '{0}: {1}'.format(args[0], args[1])).take(15)))


단어를 획득하기 위해서 스페이스 별로 단어를 구분해 줄 필요가 있고, 빈 라인은 제거할 필요가 있다.


shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x: x.split(' '))

shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x!= '')


이제 단어를 counting 해 본 후 가장 많이 나오는 단어 15개만 추려서 출력해 보자.


top15WordsAndCounts = shakeWordsRDD.map(lambda x:(x,1)).reduceByKey(add).takeOrdered(15, lambda args:-args[1])

print('\n'.join(map(lambda args:'{0}: {1}'.format(args[0], args[1]), top15WordsAndCounts)))


https://github.com/KonanAcademy/spark/blob/master/part3/week4/Building_a_word_count_application.ipynb