본문 바로가기
  • Let's go grab a data
Data/SPARK

8.스파크 RDD의 연산 기본 함수 예제

by pub-lican-ai 2016. 3. 2.
반응형

8. 스파크 RDD의 연산 기본 함수 예제 #spark #filter #union #map #flatMap #distinct #intersection #subtract #reduceByKey




*파란색은 스크립트, 검은색은 결과입니다.


sc.textFile() 텍스트 파일 읽어오기 [SparkContext 객체]

스칼라에서 README를 spark context 객체의 textFile 메서드를 이용해 읽어오면 RDD 객체가 생성됨

scala> val inputRDD = sc.textFile("/usr/local/lib/spark/README.md")

inputRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile at <console>:27


filter() 주어진 조건에 해당하는 데이터만 선별 [Transformation 함수]
filter 메서드는 이미 존재하는 RDD를 변경하는 것이 아니라 완전히 새로운 RDD에 대한 포인터 리턴함
scala> val sparkRDD = inputRDD.filter(line => line.contains("spark"))
sparkRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at filter at <console>:29

scala> val apacheRDD = inputRDD.filter(line=>line.contains("apache"))

apacheRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at filter at <console>:29


union() 2개 이상의 RDD를 조합함, 합집합 [Transformation 함수]
union()는 새로운 RDD 포인터를 리턴함
scala> val unionRDD = sparkRDD.union(apacheRDD)
unionRDD: org.apache.spark.rdd.RDD[String] = UnionRDD[4] at union at <console>:33

count() RDD의 데이터 갯수 [Actions 함수]
scala> sparkRDD.count()
res3: Long = 10

scala> apacheRDD.count()
res4: Long = 8

scala> unionRDD.count()
res5: Long = 18

take() RDD에서 데이터를 가져옴 [Actions 함수]
scala> inputRDD.take(3).foreach(println)
# Apache Spark
Spark is a fast and general cluster computing system for Big Data. It provides

scala> sparkRDD.take(3).foreach(println)
<http://spark.apache.org/>
guide, on the [project web page](http://spark.apache.org/documentation.html)
["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).

scala> apacheRDD.take(3).foreach(println)

<http://spark.apache.org/>

guide, on the [project web page](http://spark.apache.org/documentation.html)

and [project wiki](https://cwiki.apache.org/confluence/display/SPARK).


sc.parallelize() 데이터세트를 넘겨주고 RDD를 생성시킴 [SparkContext 객체]
scala> val input = sc.parallelize(List(1,2,3,4)) 
input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:27

map() 데이터를 가공함, 반환 타입이 같지 않아도 되서 유용함 [Transformation 함수]
scala> val result = input.map(x=>x*x)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:29

scala> println(result.collect().mkString(","))
1,4,9,16

flatMap() 각 입력 데이터에 대해 여러 개의 아웃풋 데이터 생성 [Transformation 함수]
scala> val lines = sc.parallelize(List("hello spark","hi"))
lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:27

scala> val words = lines.flatMap(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at flatMap at <console>:29

scala> words.first()
res11: String = hello


scala> val rdd1 = sc.parallelize(List("coffee","coffee","tea","milk"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:27

scala> val rdd2 = sc.parallelize(List("coffee","cola","water"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:27

distinct() 중복 제거한 RDD 반환 [Transformation 함수] shuffling
scala> rdd1.distinct().foreach(println)
milk
coffee
tea

intersection() 두개 이상의 RDD 데이터 교집합 RDD 반환 [Transformation 함수shuffling
scala> rdd1.intersection(rdd2).foreach(println)
coffee

subtract() 첫번째 RDD 항목들 중 두 번째 RDD 항목 제외한 RDD 반환 [Transformation 함수]
scala> rdd1.subtract(rdd2).foreach(println)
milk
tea

cartesian()  카테시안 곱 계산 [Transformation 함수] shuffling
scala> rdd1.cartesian(rdd2).foreach(println)
(coffee,coffee)
(coffee,cola)
(coffee,water)
(coffee,coffee)
(coffee,cola)
(coffee,water)
(tea,coffee)
(tea,cola)
(tea,water)
(milk,coffee)
(milk,cola)
(milk,water)

sample(withReplacement,fraction,[seed]) 복원 추출/비복원 추출로 표본 뽑아냄 [Transformation 함수]
scala> rdd1.sample(false,0.5).foreach(println)
coffee
tea
milk

scala> val data = sc.parallelize(List(1,2,3,3))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at <console>:27

collect() RDD의 모든 데이터 요소 리턴 [Actions 함수]
scala> data.collect()
res21: Array[Int] = Array(1, 2, 3, 3)

countByValue() RDD에 있는 각 값의 개수 리턴 [Actions 함수]
scala> data.countByValue()
res22: scala.collection.Map[Int,Long] = Map(1 -> 1, 3 -> 2, 2 -> 1)

top(n) 상위 n개 요소 리턴 [Actions 함수]
scala> data.top(2)
res23: Array[Int] = Array(3, 3)

takeSample(withReplacement,n,[seed]) 복원/비복원 추출로 n개 표본 뽑아냄 [Actions 함수]
scala> data.takeSample(false,1).take(1).foreach(println)
3

reduce(func) RDD 값들을 병렬로 병합연산, 같은 타입 리턴 [Actions 함수]
scala> data.reduce((x,y)=>x+y)
res30: Int = 9
scala> data.reduce(_+_)
res31: Int = 9

aggregate(zeroValue) reduce와 유사하나 다른 타입 리턴, 아래 예제에서는 (총합,갯수)로 리턴 [Actions 함수]
scala> data.aggregate((0,0))((x,y)=>(x._1+y,x._2+1),(x,y)=>(x._1+y._1,x._2+y._2))
res32: (Int, Int) = (9,4)

persist() 영속화, 데이터를 JVM 힙heap 영역에 객체 형태로 저장함 [캐싱 함수]
scala> val input = sc.parallelize(List(1,2,3,4,5))
input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at <console>:27
scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel
scala> val result = input.map(x=>x*x)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[39] at map at <console>:30
scala> result.persist(StorageLevel.DISK_ONLY)
res34: result.type = MapPartitionsRDD[39] at map at <console>:30
scala> println(result.count())
5
scala> println(result.collect().mkString(","))
1,4,9,16,25


반응형