[PYSPARK] linear regression

2016. 4. 5. 17:48machine learning

이번 시간에는 pyspark를 이용하여 linear regression을 배워볼 것이며 million song dataset이라는 데이터를 사용하여 회귀분석을 할 것이다.


우선 가장 먼저 해야할 작업이 당연 spark context를 만드는 작업이다.


from pyspark import SparkContext

sc = SparkContext();


context 생성 후에 millionsong.txt 파일을 읽어온다. 이 때 분산 파티션은 2로 설정해준다.


rawData = sc.textFile("millionsong.txt", 2)


읽어들인 rawData를 살펴보면 아래와 같음을 확인할 수가 있다.



이제 우리는 LabeledPoint라는 객체를 사용할 것이다. 이 LabeledPoint는 pyspark에서 제공하는 클래스로써 label이라는 속성과 feature라는 속성값을 지니고 있다. 우리는 이것들을 잘 활용해서 linear regression 에 이를 이용할 것이다.


from pyspark.mllib.regression import LabeledPoint

import numpy as np


해당 rawData의 라인을 집어넣으면 LabeledPoint 객체로 리턴하는 함수를 작성해 보자.


def parsePoint(line):

   parse_data = line.split(',')

   return LebeledPoint(parse_data[0], parse_data[1:])  


해당 라인이 input으로 들어오면 콤마 단위로 라인을 나눠서 첫번째 콤마 부분은 label쪽으로, 나머지는 feature 쪽으로 저장된 LabeledPoint를 리턴하게 된다.


위에서 samplePoints로써 5개만 가져온 데이터에 대해 LabelPoint를 만들면 아래와 같다.


parsedSamplePoints = [parsePoint(line) for line in samplePoints]

firstPointFeatures = parsedSamplePoints[0].features

firstPointLabel = parsedSamplePoints[0].label)



이제 사전 밑작업은 끝났으니, 본격적으로 training, validation, test set으로 구분 지은 후 실제 머신 러닝을 적용해 보자.


weights = [.8, .1, .1]

seed = 42

parsedTrainData, parsedValData, parsedTestData = parsedData.randomSplit(weight, seed)

parsedTrainData.cache()

parsedValData.cache()

parsedTestData.cache()


8:1:1의 비율로 traindata, validationdata, testdata set을 만들었다. 후에 다시 사용할 것을 대비하여 cache를 적용하였다.


이제 회귀 예측 중 가장 기본적인 방법인 라벨 평균법을 이용해서 linear regression을 해보자.


라벨 평균법은 말 그대로 라벨을 다더한후 개수로 나눈 값을 예측하는 방법이다.


averageTrainYear = (parsedTrainData.map(lambda pt: pt.label).mean())


위에서 구한 평균 값이 실제 값과 얼마나 차이가 있는지의 오차율을 보기 위해서 root mean squared error 를 구현해야 한다. 아래와 같이 구현하자


def squaredError(label, prediction):

    return (label - prediction) ** 2


def calcRMSE(labelsAndPreds):

    return np.sqrt(labelsAndPreds.map(lambda args: squaredError(args[0], args[1])).mean())

 


위의 식은 실제 값과 예측 값의 차이를 구한 후 제곱을 해준 후 루트를 씌운 후 평균 값을 구하는 것이다. 해당 하는 값이 작을 수록 예측 값과 실제 값이 일치한다고 할 수가 있다.


그럼 이제 위에서 구한 라벨 평균 값의 RMSE를 구해보자


labelsAndPredsTrain = parsedTrainData.map(lambda obj : (obj.label, averageTrainYear))

rmseTrainBase = calcRMSE(labelsAndPredsTrain)


labelsAndPredsVal = parsedValData.map(lambda obj : (obj.label, averageTrainYear))

rmseValBase = calcRMSE(labelsAndPredsVal)


labelsAndPredsTest = parsedTestData.map(lambda obj : (obj.label, averageTrainYear))

rmseTestBase = calaRMSE(labelsAndPredsTrain)



세가지 set 모두 21.506의 값을 나타내고 있다. 0에 가까울 수록 정확도가 높은 모델이라고 할 수 있기 때문에 21.506은 그리 좋은 모델이라고 할 수가 없다. 아래의 왼쪽 그래프가 RMSE가 0인 그래프를 의미하고 라벨평균법을 이용한 모델 그래프는 오른쪽 그래프와 같다고 할 수가 있다. 어떠한 값이 input으로 들어와도 평균 값만을 리턴하기 때문에 한가지 값에 대해 위로 수직인 그래프가 만들어진다.




이번에는 실제로 왼쪽과 같은 그래프를 만들기 위해 weight를 찾고 그 weight를 이용한 모델을 적용할 것이다. 



모든 입력 값에 대해 weight를 적용해 볼 수 없기 때문에, 처음 weight 값을 정의하고 그 값에 대해 gradient descent를 적용함으로써 그 다음번 weight 값을 찾아나가는 방식으로 해서 가장 적은 RMSE의 weight를 찾는 것이 이번 학습의 목표이다.

일단 weight와 labeledPoint 값이 입력으로 주어지면 gradient descent를 해주는 함수를 만들자.


def gradientSummand(weights, lp):

    return (weights.dot(lp.features) - lp.label) * lp.features


정확히 말해서 무슨 의미인지 하나도 모르겠다. 그냥 저렇게 하면 gradient descent가 구해지는 구나라고만 이해하고 있을 뿐이다. -_-;;


실제 값과 예측 값을 리턴해주는 함수도 하나 만들자


def getLabeledPrediction(weights, observation):

    return (observation.label, weights.dot(observation.features))


weight와 labeledPoint 값이 들어오면 실제 라벨과 labeledPoint의 특징 데이터와 weight 간의 행렬 곱을 통해 나온 예측 결과 값을 리턴해 주는 함수이다.


이제 실제로 linear regression gradient descent를 해보자


def linregGradientDescent(trainData, numIters):

    n = trainData.count()

    d = len(trainData.take(1)[0].features)

    w = np.zeros(d)


    alpha = 1.0

    errorTrain = np.zeros(numIters)

    for i in range(numIters):

        labelsAndPredsTrain = trainData.map(lambda lp:getLabeledPrediction(w, lp))

        errorTrain[i] = calcRMSE(labelsAndPredsTrain)


        gradient = trainData.map(lambda lp: gradientSummand(w,lp)).sum()


        alpha_i = alpha / (n * np.sqrt(i+1))

        w -= alpha_i * gradient

    return w, errorTrain


처음 weight는 trainData의 특징 데이터와 동일한 크기이지만 값은 0으로 설정되어 있다. alpha 값은 weight가 이동할 수 있는 단계를 의미하는데 1로 설정되어 있다. 

weight가 이동할 수 있는 단계에 대해 간략하게 설명하자면 gradient descent의 궁극적인 목표는 RMSE가 0에 가까운 값을 찾는 것이 목표이다. 이 값을 찾기 위해서 weight를 이동시켜가면서 RMSE를 구해야 하는데 이 이동할 수 있는 크기가 alpha라고 할 수 있다. alpha 값이 크면 RMSE가 0인 지점을 찾는 것이 빠를 수 있지만 이동 거리가 크기 때문에 자칫 0인 지점을 놓칠 수도 있다. alpha 값이 작으면 RMSE가 0인 지점은 확실히 찾을 수 있겠으나, 그 속도와 계산량이 커지는 단점이 있다.


errorTrain은 각 weight 단계에서의 RMSE를 모아 놓은 배열이다. 나중에 이 값이 가장 작을 때의 weight를 모델에 적용시키면 된다.


gradient 구하는 부분과 alpha 구하는 부분은 잘 모르겠다. 나중에 다시 공부하기로.....


이제 위에서 구한 weight값을 이용해 RMSE를 구해보자


numIters = 50

weightsLR0, errorTrainLR0 = linregGradientDescent(parsedTrainData, numIters)

labelsAndPreds = parsedValData.map(lambda lp: getLabeledPrediction(weightsLR0, lp))

rmseValLR0 = calcRMSE(labelsAndPreds)



기존 라벨 평균법이 21.506, 그리고 weight gradient descent가 18.253으로 결과가 나타났다. 아래 그래프는 Iterate가 반복될수록 변경되는 errorTrainLR0 값을 그래프로 도식화 한것이다. 횟수가 진행될수록 error율이 점점 수렴되는 것을 확인할 수가 있다.




지금까지는 라벨평균법 보다는 더 효율이 좋은 gradient descent를 이용해서 모델을 구하는 것을 알아봤다. 

하지만 과연 어떻게 하면 RMSE를 더 줄이고 효과적인 모델을 만들수가 있을까?? 

더 많은 iteration을 사용하면 어떨까?? regularization을 적용하면 더 좋은 값이 나타날까??


우리는 pyspark에서 제공하는 LinearRegressionWithSGD 모델을 통해 위의 가설들을 테스트 해볼 것이다.

help(LinearRegressionWithSGD)를 통해 아래의 파라미터들을 확인해 볼 수가 있다.



data 

The training data, an RDD of LabeledPoint 

iterations 

The number of iterations (default : 100) 

step 

The step parameter used in SGD (default : 1.0) 

miniBatchFraction 

Fraction of data to be used for each SGD iteration (default : 1.0) 

initialWeights 

The initial weights (default : None) 

regParam 

The regularizer parameter (default : 0.0) 

regType 

The type of regularizer used for training our model 

intercept 

Boolean parameter which indicates the use or not of the augmented representation features are activated or not 

validateData 

Boolean parameter which indicates if the algorithm should validate data before training (default : True) 


우리는 이제 아래와 같이 LinearRegressionWithSGD를 통해 모델은 만든 후 파라미터 값을 바꿔가며 RMSE가 가장 적었을때의 파라미터 값을 찾아나갈 것이다.


firstModel = LinearRegressionWithSGD.train(parsedTrainData,    

iterations=numIters,

step=alpha,

miniBatchFraction=miniBatchFrac,

initialWeights=None,

regParam=reg,

regType=regType,

intercept=useIntercept)


firstModel의 predict라는 함수를 사용하여 예측값과 실제값의 차이를 살펴보자


labelsAndPreds = parsedValData.map(lambda lp: (lp.label, firstModel.predict(lp.features)))

rmseValLR1 = calcRMSE(labelsAndPreds)


결과 값은 19.025가 나온다. 기존의 gradient descent보다 더 안좋은 결과가 나왔다.


더 좋은 결과를 내는 regParam 값을 찾아 보자.


bestRMSE = rmseValLR1

bestRegParam = reg

bestModel = firstModel


numIters = 500

alpha = 1.0

miniBatchFrac = 1.0

for reg in [1e-10, 1e-5, 1.0]:

    model = LinearRegressionWithSGD.train(parsedTrainData, 

                                                                        numIters, alpha, 

miniBatchFrac, 

regParam=reg,

regType='l2',

intercept=True)

    labelsAndPreds = parsedValData.map(lambda lp: (lp.label, model.predict(lp.features)))

    rmseValGrid = calcRMSE(labelsAndPreds)


    if rmseValGrid < bestRMSE:

        bestRMSE = rmseValGrid

        bestRegParam = reg

        bestModel = model


위에서 찾은 regParam 값을 통해 RMSE를 16.681까지 떨궜다.


지금까지는 모든 실험에 대해 alpha 값을 1로 고정시켜 놓았다. 이번에는 다양한 alpha를 적용시켜보자. alpha 값과 함께 iteration 횟수도 함께 변경해 보자. 


reg = bestRegParam

modelRMSEs= []


for alpha in [1e-5, 10]:

    for numlters in [500, 5]:

        model = LinearRegressionWithSGD.train(parsedTrainData,

numIters,

alpha,

miniBatchFrac,

regParam=reg,

regType='l2',

intercept=True)

  labelsAndPreds = parsedValData.map(lambda lp: (lp.label, model.predict(lp.features)))

  rmseVal = calcRMSE(labelsAndPreds)

  modelRMSEs.append(rmseVal)