본문 바로가기
  • Let's go grab a data
Data/SPARK

9. 스파크 key/value pair RDD 예제

by pub-lican-ai 2016. 9. 5.
반응형

9. 스파크 key/value RDD 예제 #spark #reduceByKey #groupByKey #combineByKey # mapValues #keys #values #sortByKey


*파란색은 스크립트, 검은색은 결과입니다.


map() pair RDD 생성 [Transformation 함수]

스칼라에서 README를 spark context 객체의 textFile 메서드를 이용해 읽어오면 RDD 객체가 생성됨

이후 map 함수를 이용해 첫 번째 단어를 키로 사용한 pair RDD 생성

scala> val lines = sc.parallelize(List("holden likes coffee","panda likes long strings and coffee"))

scala> pairs = lines.map(x=>(x.split(" ")(0),x))

scala> pairs.first()

lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[112] at parallelize at <console>:32

pairs: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[113] at map at <console>:34

res98: (String, String) = (holden,holden likes coffee)


filter() 단순 필터 적용 [Transformation 함수]

scala> pairs.filter{case(key,value)=>value.length>20}.first()

res106: (String, String) = (panda,panda likes long strings and coffee)


flatMap() 라인별 단어별 잘라서 단어들의 집합으로 변환 [Transformation 함수]

scala> val pairs1 = lines.flatMap(x=>x.split(" ")).map(word=>(word,1)).take(10)

pairs1: Array[(String, Int)] = Array((holden,1), (likes,1), (coffee,1), (panda,1), (likes,1), (long,1), (strings,1), (and,1), (coffee,1))


mapValues() 각 value에 count를 위한 1을 붙이고 [Transformation 함수]

reduceByKey() key별 (value의 총합, 값 갯수) [Transformation 함수]

scala> pairs1.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).take(6)

res126: Array[(String, (Int, Int))] = Array((long,(1,1)), (coffee,(2,2)), (holden,(1,1)), (likes,(2,2)), (panda,(1,1)), (strings,(1,1)))


combineByKey() key별 집합 연산 일반적으로 사용 -> map-side 집합연산

한 파티션 내의 데이터들을 하나씩 처리. 각 데이터는 이전에 나온 적이 없는 키를 갖고 있거나 이전에 나온 적이 있는 키

새로운 데이터 경우 createCombiner()함수로 해당 키에 대한 accumulator 생성.

이전에 나온 키의 경우 mergeValue()함수로 합함.

파티션별 계산이 끝나고 RDD전체에서 최종적으로 결과를 합칠 때 동일 키에 대한 accummulator를 가지면 mergeCombiner()함수로 합함.

//예제 -> input이 없음. 함수 인자들만 참고

scala> val result = input.combineByKey((v)=>(v,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)).map{case(key,value)=>(key,vlaue._1/value._2.toFloat)}

scala> result.collectAsMap().map(println(_))


병렬화 수준지정. 스파크에 특정 개수의 파티션을 사용하라고 요청

val data = Seq(("a",3),("b",4),("a",1))

sc.parallelize(data).reduceByKey((x,y)=>x+y).take(2)

sc.parallelize(data).reduceByKey((x,y)=>x+y,10).take(2)


join()

데이터 조인, left right outer join, cross join, inner join 가능

//storeAdd ={ (Store("R"),"1026"),(Store("P"),"748"),(Store("P"),"3101"),(Store("S"),"Seattle")}

//storeRate = {(Store("R"),4.9),(Store("P"),4.8)}

//storeAdd.join(storeRate) = {(Store("R"),("1026",4.9)), (Store("P"),("748",4.8)), (Store("P"),("3101",4.8))}


sort() 데이터 정렬

문자열 비교 함수

//val input:RDD[(Int,Venue)] =

implicit val sortIntegerByString = new Ordering[Int]{

    override def compare(a:Int, b:Int) = a.toString.compare(b.toString)

}

//rdd.sortByKey(sortIntegerByString)

반응형