2015. 11. 25. 15:35ㆍBig Data
이번에는 spark를 이용하여 word count를 해보려고 한다.
참고 reference는 아래와 같다.
기본적인 개요는 아래와 같다.
1. base RDD와 pair RDD 만들기
2. pair RDD counting 하기
3. 단어 개수 세기와 평균 값 구하기.
4. 파일을 읽어서 word count 하기
1. base RDD와 pair RDD 만들기
아래와 같이 base RDD를 만들어 본다.
from pyspark import SparkContext
sc = SparkContext()
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
base RDD가 만들어졌다면, 이젠 pair RDD를 만든다. pair RDD는 각각의 element가 pair 튜플 형태로 구성되어 있는 RDD를 만한다. 우리가 만들 pair RDD 역시 ('<word>', 1) 형태의 튜플을 띄면 된다.
wordPairs = wordsRDD.map(lambda x:(x,1))
위와 같이 따라했다면 아마도 pair RDD가 [('cat',1), ('elephant',1), ('rat',1), ('rat',1), ('cat',1)] 형태로 만들어졌을 것이다.
2. pair RDD counting 하기
이젠 이렇게 만들어진 pair RDD를 word를 기준으로 counting 할 것이다. counting을 하기 위해 몇몇의 접근 방법이 있는데
첫 번째는 바로 collect() 함수를 이용하는 것이다. 이 접근법은 data가 작을 경우에 사용이 가능하며, work에서 병렬 처리를 하는 것보다 수행 속도가 느리다.
두 번째는 groupByKey()를 이용한 방법이다. groupByKey()에는 두 가지 문제점이 있는데
- 적절한 파티션으로 데이터 이동 시 많은 양의 데이터 트래픽이 요구된다.
- 결과 리스트가 매우 클 수가 있다. 결과 리스트가 매우 클 경우에 메모리 overflow가 발생한다. 예를들어 위키피디아에서 a나 the의 count를 센다면 그 양은 어마어마할 것이다.
wordsGrouped = wordPairs.groupByKey()
wordCountsGrouped = wordsGrouped.mpa(lambda args: (args[0], len(args[1])))
print(wordCountsGrouped.collect())
세 번째는 reduceByKey()를 이용한 방법이다. 가장 효율적인 방법으로, 일단 각자의 파티션 내에서 reduceByKey()를 적용하고 셔플을 하기 때문에 큰 dataset에서도 효율적이다.
wordCounts = wordPairs.reduceByKey(lambda x,y: x+y)
print(wordCounts.collect())
3. 단어 개수 세기와 평균 값 구하기
유니크한 단어 개수를 아래와 같이 계산한다.
uniqueWords = wordCounts.count()
print(uniqueWords)
단어 개수가 계산되면 reduce를 이용해서 mean을 구한다.
from operator import add
totalCount = wordCounts.map(lambda args: (args[1])).reduce(add)
average = totalCount / float(uniqueWords)
print(totalCount)
print(round(average,2))
4. 파일을 읽어서 word count 하기
위에서 작업한 코드들을 이용해서 word를 count하는 함수를 하나 만든다.
def wordCount(wordListRDD):
wordPairs = wordsRDD.map(lambda x:(x,1))
wordCounts = wordPairs.reduceByKey(lambda x,y: x+y)
return wordCounts
print(wordCount(wordsRDD).collect())
대문자를 소문자로 변환해주고, 특수 문자를 제거하는 함수를 만든다.
import re
def removePunctuation(text):
return re.sub("['!,@#$%^&*()_.:-]", "", text).lower()
print(removePunctuation('Hi, you!'))
print(removePunctuation(' No unser_score!"))
이제 파일을 읽은 다음 위에 만든 특수 문자를 제거하는 함수를 적용해 보자.
shakespeareRDD = (sc.textFile('pg100.txt', 8).map(removePunctuation)
print('\n'.join(shakespeareRDD.zipWithIndex().map(lambda args: '{0}: {1}'.format(args[0], args[1])).take(15)))
단어를 획득하기 위해서 스페이스 별로 단어를 구분해 줄 필요가 있고, 빈 라인은 제거할 필요가 있다.
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x: x.split(' '))
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x!= '')
이제 단어를 counting 해 본 후 가장 많이 나오는 단어 15개만 추려서 출력해 보자.
top15WordsAndCounts = shakeWordsRDD.map(lambda x:(x,1)).reduceByKey(add).takeOrdered(15, lambda args:-args[1])
print('\n'.join(map(lambda args:'{0}: {1}'.format(args[0], args[1]), top15WordsAndCounts)))