'Spark'에 해당되는 글 18건

  1. 2016/03/21 용비 (Chapter 3) 17. Actions
  2. 2015/04/08 용비 (Chapter 3) 16. RDD Operations
  3. 2015/04/08 용비 (Chapter 3) 15. Creating RDDs
  4. 2015/03/16 용비 (Chapter 3) 14. RDD Basics
  5. 2015/03/12 용비 (Chapter 2) 13. Initializing a SparkContext

(Chapter 3) 17. Actions

Spark 2016/03/21 17:15 용비

그동안 서로간의 변형을 통해서 어떻게 RDD 생성하는지를 알아보았다. 하지만, 어떤 경우에는 Dataset 가지고 직접적으로 어떤 일을 하고 싶을 수도 있다. Action RDD operation 번째 형태이다. Action driver program 마지막 값을 되돌려 주거나 외부 storage system 데이터를 쓰는 작업을 수행한다. Action RDD 호출하는 곳에서 필요로 하는 변형에 대해서 평가하기 때문에 실제적인 ouput 만들어낼 필요가 있다.


앞의 섹션의 log example에서 계속해서 살펴보자면 badLinesRDD 대해 어떤 정보를 출력하고 싶을 수도 있다. 그렇게 하기 위해서는 2가지 action 사용할 있다. count() 숫자를 값을 리턴하고, take() RDD element collection 리턴한다. 샘플코드는 3-15, 3-17에서 있다.


Example 3-15. Python error count using actions

print "Input had " + badLinesRDD.count() + " concerning lines"

print "Here are 10 examples: "

for line in badLinesRDD.take(10):

     print line


Example 3-16. Scala rror count using actions

println("Input had " + badLinesRDD.count() + " concerning lines")

println("Here are 10 examples:")

badLinesRDD.take(10).foreach(println)


Example 3-17. Java error count using actions

System.out.println("Input had " + badLinesRDD.count() + " concerning lines")

System.out.println("Here are 10 examples:")

for (String line : badLinesRDD.take(10)) {

     System.out.println(line);

}


위의 예제에서, driver program에서 작은 숫자의 RDD element 추출하기 위해서 take() 사용했다. Driver 정보를 출력하기 위해서 반복하여 출력했다. RDD 전체 RDD 추출하기 위해서 collect() 함수를 가지고 있다. 만약 프로그램 필터에서 RDD 아주 작은 크기로 줄여서 내부적으로 다루고자 하는 경우에 유용하게 사용할 있다. 다만, 전체 dataset collect() 사용하여 하나의 machine 메모리 크기에 적합해야 한다는 것을 유념해야 한다. 따라서 collect() large dataset 대상으로는 사용하지 않아야 한다.


대부분의 경우, RDD driver 직접적으로 collect()함수를 사용하여 데이터를 추출할 없다. 왜냐하면 dataset 너무 크기 때문이다. 이러한 경우에는 HDFS Amazon S3 같은 분산 저장환경에 데이터를 쓰는 경우가 일반적이다. 또한 saveAsTextFile() action, saveAsSequenceFile()이나 다양한 내장된 형태로 다른 많은 action 사용하여 RDD 내용을 저장할 있다. 데이터를 추출하는 다른 option 대해서는 Chapter 5에서 다룰 것이다.


새로운 action 매번 호출할 때마다 전체 RDD " 처음부터"(from scratch) 계산되어야 한다는 것을 아는 것이 중요하다. 이러한 비효율성을 피하기 위해서 44페이지에서 "Persistence (Caching)"으로 다루는 중간 결과를 유지할 있다.

받은 트랙백이 없고, 댓글이 없습니다.

댓글+트랙백 RSS :: http://www.yongbi.net/rss/response/743

(Chapter 3) 16. RDD Operations

Spark 2015/04/08 13:21 용비

RDD 다음 2가지 형태의 Operation 지원한다.

  • Transformation
  • Action

Transformation map, filter 같은 새로운 RDD return하는 RDD상의 operation이다. Action driver program으로 결과를 return하거나 storage 결과를 write하고, count first 같은 연산을 시작하는 operation이다. Spark에서는 transformation action 아주 다르게 취급한다. 따라서, 여러분이 수행하고자 하는 작업의 형태에 대해서 이해하는 것이 아주 중요하다. 만약 여러분이 여전히 주어진 기능에 대해서 transformation인지, action인지 혼란스럽다면 return 타입을 통해 있다. Transformation 새로운 RDD return하지만, action 다른 data type return한다.


Transformations


Transformation 새로운 RDD return하는 operation이다. Lazy evaluation section에서 논의되겠지만, transformed RDD 단지 여러분이 action에서 사용할 , 느긋하게(lazily) 계산된다. 많은 transformation(변형) element측면에서 일어난다. Transformation 한번에 하나의 element상에서 동작하지만, 모든 transformation 그런 것은 아니다.


예를 들면, 수많은 메시지를 담고 있는 log.txt라는 로그 파일을 가지고 있다고 생각해보자. 그리고 중에서 error 메시지만을 추출하고 싶다. 여기에 앞에서 보았던 filter transformation 사용할 있다. Spark 3가지 언어로 filter API 살펴보면 다음과 같다.


Example 3-8. Python filter example

inputRDD = sc.textFile("log.txt")

errorsRDD = inputRDD.filter(lambda x:"error" in x)


Example 3-9. Scala filter example

var inputRDD = sc.textFile("log.txt")

var errorsRDD = inputRDD.filter(line => line.contains("error"))


Example 3-10. Java filter example

JavaRDD<String> inputRDD = sc.textFile("log.txt");

JavaRDD<String> errorsRDD = inputRDD.filter(

    new Function<String, Boolean>() {

         public Boolean call (String x) {return x.contains("error");}

    }

);


Filter operation 기존의 inputRDD 대한 돌연변이가 아님에 주의하라. 전혀 새로운 RDD 대한 포인터를 return한다. inputRDD 프로그램내에서-예를 들어, 다른 단어들을 검색하는데-나중에 재사용할 있다. Warning이라는 단어를 검색하는데 inputRDD 사용해보자. , 새로운 transformation union 사용하여 error warning 단어를 포함하고 있는 모든 line 출력할 있다. 이에 대한 Python code 있지만, union() function 3가지 언어에서 모두 동일하다.


Example 3-11. Python union example

errorsRDD = inputRDD.filter(lambda x:"error" in x)

warningsRDD = inputRDD.filter(lambda x:"warning" in x)

badLinesRDD = errorsRDD.union(warningsRDD)


union filter와는 약간 다르다. 하나의 RDD 아니라 2개의 RDD 가지고 동작하기 때문이다. Transformation 복수 개의 input RDD 가지고 동작할 있다.


끝으로, transformation 사용하여 서로 다른 새로운 RDD 이끌어낼 , Spark 다른 RDD 사이의 dependency 집합을 추적하고 유지한다. 그것을 lineage graph라고 한다. Spark 요구에 맞춰 RDD 계산할 , 정보를 이용하고 Persistent RDD 일부 정보가 유실되었을 , 이를 이용해 복구한다.



<Figure 3-1. RDD lineage graph created during log analysis>

받은 트랙백이 없고, 댓글이 없습니다.

댓글+트랙백 RSS :: http://www.yongbi.net/rss/response/681

(Chapter 3) 15. Creating RDDs

Spark 2015/04/08 13:20 용비

Spark RDD 생성하는 다음 2가지 방법을 제공한다.

  • Loading external dataset
  • Parallelizing collection in driver program

RDD 생성하기 위한 가장 간단한 방법은 memory 있는 collection 가져다가 parallelize method 통해 SparkContext 전달하는 방법이다. 방법은 Spark 공부할 , 아주 유용하다. Shell에서 간단하게 RDD 빠르게 생성하여 여러 operation 수행할 있기 때문이다. 그러나, prototyping이나 testing 목적이 아니라면, 하나의 machine에서 메모리에 전체 dataset 가지고 있는 형태이므로 폭넓게 사용되는 것이 아니라는 점에 주의하라.


(Example 3-2. Python Parallelize Example)

lines = sc.parallelize(["pandas", "I like pandas"])


(Example 3-3. Scala Parallelize Example)

var lines = sc.parallelize(List("pandas", "I like pandas"))


(Example 3-4. Java Parallelize Example)

JavaRDD<String> lines = sc.parallelize(Array.asList("pandas", "I like pandas"));


RDD 생성하는 일반적인 방법은 external storage로부터 data loading하는 것이다. External dataset loading하는 것은 Chapter 5에서 상세히 다룰 것이다. 그러나 우리는 이미 text file로부터 SparkContext.textFile(…) 이용하여 String 포함하는 RDD 생성하기 위해 data loading하는 것을 이미 살펴보았다.


(Example 3-5. Python textFile Example)

lines = sc.textFile("/path/to/README.md")


(Example 3-6. Scala textFile Example)

var lines = sc.textFile("/path/to/README.md")


(Example 3-7. Java textFile Example)

JavaRDD<String> lines = sc.textFile("/path/to/README.md");

받은 트랙백이 없고, 댓글이 없습니다.

댓글+트랙백 RSS :: http://www.yongbi.net/rss/response/680

(Chapter 3) 14. RDD Basics

Spark 2015/03/16 21:00 용비

Chapter 3. Programming with RDDs

 

이 장에서는 Data를 가지고 작업을 수행할 수 있는 Spark's Core Abstraction RDD(Resilient Distributed Dataset)에 대해서 소개하고자 한다. RDD는 간단히 말해서 element들의 distributed collection이다. Spark내에서 모든 작업들은 결과를 계산하기 위해서 새로운 RDD를 생성하거나 기존 RDD를 변형하거나 RDD상에서 연산을 호출하는 것으로써 이루어진다. Spark의 엔진에서는 자동으로 RDD내의 Data  Cluster로 분산시키고,  수행하고자 하는 작업을 병렬로 처리한다.
 

Data Scientist Engineer 모두 이번 장을 읽어야 한다. Spark에서 RDD Core Concept이기 때문이다. Interactive Shell에 있는 example들을 실행해보기를 강력히 추천한다. 이번 장의 모든 code들은 이 책의 Git Hub Repository에 있다. (https://github.com/databricks/learning-spark)

 

RDD Basics

 

Spark에서 RDD는 간단하게 말하자면 Object들의 Distributed Collection이라고 할 수 있다. 각각의 RDD Cluster의 여러 다른 Node에서 계산되는데 사용되는 multiple partition으로 나누어져 있다. RDD는 다양한 형태의 User-Defined Class Python, Java, Scala Object를 포함할 수 있다.

 

User RDD를 외부 Dataset loading하는 경우와 Driver Program을 통해 Object Collection을 분산 처리하는 경우의 2가지 방법을 통해서 생성할 수 있다. 우리는 이미 SparkContext.textFile()을 사용해서 String 형태의 RDDtext file에서 loading하는 것을 살펴 보았다.

 

(Example 3-1. Creating an RDD of strings with textFile() in Pythpn)

>>> lines = sc.textFile("README.md")

 

한번 생성되면, RDD는 다음 2가지 형태의 연산을 제공한다. : transformation and action

 

Transformation(변형)은 이전 RDD로부터 새로운 RDD를 구성한다. 예를 들면, 우리가 전에 보았던 한가지transformation은 기술한 것과 매칭되는 data filtering이다. 예제로 만든 text file에서는 Python을 포함하는 string으로 이루어진 새로운 RDD를 생성하는데 사용할 수 있다.

 

>>> pythonLines = lines.filter(lambda line: "Python" in line)

 

한편, action RDD를 기반으로 결과를 계산하고, driver program에 그 결과를 return하거나 HDFS와 같은 외부storage system에 그 결과를 저장하는 것이다. 앞에서 살펴본 action의 한가지 예인 first() RDD에서 첫번째element return한다.

 

>>> pythonLines.first()

u'## Interactive Python Shell'

 

Transformation action의 차이점은 Spark RDD를 계산하는 방식 때문이다. 비록 여러분이 어느 때라도 새로운RDD를 정의할 수 있을지라도, Spark는 단지 최초에는 action에서 RDD를 사용하는 lazy fashion으로 계산을 수행한다. 이러한 접근법은 처음에는 일반적이지 않게 보일지도 모르지만, big data로 작업할 때는 이해가 된다. 예를 들어, 위에서 언급한 첫번째 예를 고려해 보자. 우리는 text file을 정의하고 Python 문자열을 가진 string을 추출해 낼 것이다. 만약, 우리가 lines = sc.textFile()을 호출했을 때, Spark text file내의 모든 line load하고 저장한다면 많은 storage space를 낭비하게 되고, 수많은 line filter output을 받게 될 것이다. 대신에, Sparktransformation의 전체 chain에 대해서 한차례 보고, 그 결과에 필요한 data에 대해서만 연산할 수 있다. 사실 first() action 수행할 때, Spark는 단지 first match line에 해당하는 line을 찾을 때까지 scan만 하면 된다. 전체 파일에 대해서 read하지 않아도 된다.

 

마지막으로 Spark RDD는 기본적으로 여러분이 action을 실행할 때마다 매번 재계산된다. 만약, 여러 action에서RDD를 재사용하고자 한다면, RDD.persist()를 사용하여 RDD Persist(지속하여 유지)하도록 Spark에게 요청할 수 있다. 최초 연산 후에, Spark RDD content memory (Cluster내의 여러 machine에 나누어진 memory)에 저장할 것이다. 그리고 이후 여러 action에서 재사용할 수 있다. Memory 대신 Disk RDD persist(지속적으로 유지)하는 것도 가능하다. 기본적으로 persist하지 않도록 하는 것이 비정상적으로 보일지도 모르지만, big dataset에 대해서 생각해 보면 이해가 된다. 만약 여러분이 RDD를 재사용하지 않을 것이라면, Spark는 단지 한차례 data를 이용하여 결과를 계산할 것이기 때문에 storage space를 낭비할 이유가 없다.

 

실제로는 memory data 중에서 subset load하고 반복적인 query를 수행하기 위하여 persist를 종종 사용한다.예를 들어, Python 문자열을 포함하고 있는 README line들에 대해 여러 결과를 계산하고 싶어한다면 다음과 같은 코드를 작성할 수 있다.

 

>>> pythonLines.persist()

>>> pythoneLines.count()

2

>>> pythonLines.first()

u'## Interactive Python Shell'

 

요약하자면, 모든 Spark program shell session은 다음과 같이 동작할 것이다.

a.External Data로부터 input RDD를 생성한다.

b.     Filter()와 같은 transformation을 사용하여 새로운 RDD를 정의하기 위해 변형한다.

c. 재사용을 위한 중간 결과물인 RDD를 유지하기 위하여 Spark persist()를 요청한다.

d.     Spark에 의해 최적화되고, 실행되는 병렬 연산을 시작하기 위해 count() first()와 같은 action을 실행한다.

 

이 장의 나머지 부분에서는 이 과정에 대해서 자세히 들여다볼 것이다. Spark의 가장 기본이 되는 공통 RDD operation에 대해서도 다룰 것이다.

받은 트랙백이 없고, 댓글이 없습니다.

댓글+트랙백 RSS :: http://www.yongbi.net/rss/response/661

Spark application이 한번 link된 이후에는 Spark Package를 프로그램에 import하여 SparkContext를 생성할 수 있다. Application을 설정하기 위해서 SparkConf object를 생성하고, 그 이후에 SparkContext를 생성할 수 있다. 각각의 언어에 따라 지원되는 간단한 example은 다음과 같다.

 

[Initializing Spark in Python]

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("My App")

sc = SparkContext(conf)

 

[Initializing Spark in Java]

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");

JavaSparkContext sc = new JavaSparkContext(conf);

 

[Initializing Spark in Scala]

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._ 

val conf = new SparkConf().setMaster("local").setAppName("My App")

val sc = new SparkContext("local", "My App")

 

위의 예제에서는 2개의 파라미터를 넘겨서 SparkContext 초기화를 위한 최소한의 방법을 보여준다.

·         Cluster URL : 예제에서는 "local"이라고 쓰인 부분이다. 어떻게 Cluster에 접속할지를 Spark에 말해주는 부분이다. "local" Cluster에 연결하지 않고, local machine의 하나의 thread에서 Spark에서 실행하는 특별한 값을 의미한다.

·         Application name : 예제에서는 "My App"이라고 쓰인 부분이다. 이것은 Cluster에 연결할 때, cluster manager UI에서 표시되는 application identify이다.

 

추가적인 파라미터들은 application이 어떻게 실행되는지 혹은 Cluster 상에 추가되는 code에 대한 설정을 위하여 존재한다. 하지만, 이 부분은 책의 나중에 다룰 것이다.

 

SparkContext를 초기화한 이후에 이전에 보여준 RDD를 생성하고 다루는 모든 method를 사용할 수 있다.

 

끝으로, Spark를 종료하기 위해서는 SparkContext stop() method를 호출하거나 application에서 System.exit(0)이나 sys.exit()를 호출하여 간단하게 종료할 수 있다.

 

이 짧은 overview는 랩탑에서 Standalone Spark application을 실행하기에는 충분하다. 더 많은 advanced configuration을 위해서 이 책의 이후 챕터에서 어떻게 application Cluster에 접속하는지, Application 패키징을 포함하여 어떻게 자동으로 worker node code가 배포되는지에 대해서 다룰 것이다.

 

Conclusion

 

이 챕터에서는 Spark를 다운로드하고 랩탑에서 실행하고, standalone application을 통해 interactive하게 동작하는 부분을 살펴보았다. Spark Core Concept에 대해서 간단히 살펴보고, Spark를 이용한 프로그래밍도 살펴보았다. SparkContext RDD를 생성하는 driver program과 병렬 연산을 처리하는 부분에 대해서도 살펴보았다. 다음 장에서는 RDD operate가 어떻게 이루어지는지에 대해서 더 깊이 살펴볼 것이다.

받은 트랙백이 없고, 댓글이 없습니다.

댓글+트랙백 RSS :: http://www.yongbi.net/rss/response/660