[SPARK] RDD를 DataFrame으로 변환해주기
file을 읽어서 RDD로 만든 다음 해당 RDD를 DataFrame으로 변환해 주려고 한다.
일단 json 데이터를 파일로 읽어서 아래와 같이 RDD로 바꿔 보자.
val conf = new SparkConf().setAppName("test")
val sc = new SparkContext(conf)
val rdd = sc.textFile("test.json")
해당 rdd를 df로 변환하자. df로 변환하기 위해서는 SQLContext를 사용해야 하는데, 이를 위해 maven에 spark-sql을 추가해 주자.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.4.1</version>
</dependency>
이제 rdd를 df로 변환해 주자.
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = rdd.toDF()
println(df.printSchema)
이렇게 찍어보면 column이 _1: string 이런식으로 string type의 column 하나로만 정의되어 있는 것을 확인할 수 있다.
df의 장점은 각 colume별 데이터들을 나눠서 볼 수 있다는 장점이 있기에 json 데이터를 df column으로 변환해 주기 위한 중간 과정을 해야한다.
일단 값을 구조화하기 위한 class를 정의해 주자. 참고로 class는 해당 코드가 있는 함수 외부에 선언해줘야 동작한다.
(나 같은 경우에는 같은 코드에 class 정의했더니 type 오류가 발생했음..)
case class Raw(key:String, value1:String, value2:String)
이렇게 class를 선언해줬다면 rdd에 map을 적용해서 해당 값들을 Raw class 구조로 바꾸자.
val rawRdd = rdd.map { item =>
Raw(item.getOrElse("key", ""), item.getOrElse("value1", ""), item.getOrElse("value2", "")
이제 rawRdd를 df로 변환한 후, printSchema를 찍어보면 key, value1, value2 이렇게 column이 생성되는 것을 확인할 수가 있다.
df는 sql처럼 join도 가능하며 아래와 같이 사용할 수 있다.
val dfResult = df.join(df2, df("df_key") === df2("df2_key"), "left_outer")
join의 default는 inner join이며 주의할 사항으로는 df와 df2를 outer join을 할 경우, column 명이 같으면 에러가 난다는 것이다. join 하고자 하는 df의 column 명을 다르게 써주어야 한다.
또 하나의 주의 사항으로 이렇게 join을 하면 meta conflict라는 에러가 발생하는데, 이게 어떤 의미인지 정확히 모르겠다. 피하는 방법은 hadoop config에서 summary-metadata를 사용하지 않음으로 설정하면 된다.
sc.hadoopConfiguration.setBoolean("parquet.enable.summary-metadata", false)
이렇게 join한 결과를 parquet로 저장할 수 있다.
dfResult.write.mode(SaveMode.Append).parquet("/output/")