'Learning Spark'에 해당되는 글 4건

  1. 2015/03/16 용비 (Chapter 3) 14. RDD Basics
  2. 2015/03/12 용비 (Chapter 2) 13. Initializing a SparkContext
  3. 2015/03/08 용비 (Chapter 2) 12. Standalone Applications
  4. 2015/03/01 용비 (Chapter 2) 10. Python and Scala Shell

(Chapter 3) 14. RDD Basics

Spark 2015/03/16 21:00 용비

Chapter 3. Programming with RDDs

 

이 장에서는 Data를 가지고 작업을 수행할 수 있는 Spark's Core Abstraction RDD(Resilient Distributed Dataset)에 대해서 소개하고자 한다. RDD는 간단히 말해서 element들의 distributed collection이다. Spark내에서 모든 작업들은 결과를 계산하기 위해서 새로운 RDD를 생성하거나 기존 RDD를 변형하거나 RDD상에서 연산을 호출하는 것으로써 이루어진다. Spark의 엔진에서는 자동으로 RDD내의 Data  Cluster로 분산시키고,  수행하고자 하는 작업을 병렬로 처리한다.
 

Data Scientist Engineer 모두 이번 장을 읽어야 한다. Spark에서 RDD Core Concept이기 때문이다. Interactive Shell에 있는 example들을 실행해보기를 강력히 추천한다. 이번 장의 모든 code들은 이 책의 Git Hub Repository에 있다. (https://github.com/databricks/learning-spark)

 

RDD Basics

 

Spark에서 RDD는 간단하게 말하자면 Object들의 Distributed Collection이라고 할 수 있다. 각각의 RDD Cluster의 여러 다른 Node에서 계산되는데 사용되는 multiple partition으로 나누어져 있다. RDD는 다양한 형태의 User-Defined Class Python, Java, Scala Object를 포함할 수 있다.

 

User RDD를 외부 Dataset loading하는 경우와 Driver Program을 통해 Object Collection을 분산 처리하는 경우의 2가지 방법을 통해서 생성할 수 있다. 우리는 이미 SparkContext.textFile()을 사용해서 String 형태의 RDDtext file에서 loading하는 것을 살펴 보았다.

 

(Example 3-1. Creating an RDD of strings with textFile() in Pythpn)

>>> lines = sc.textFile("README.md")

 

한번 생성되면, RDD는 다음 2가지 형태의 연산을 제공한다. : transformation and action

 

Transformation(변형)은 이전 RDD로부터 새로운 RDD를 구성한다. 예를 들면, 우리가 전에 보았던 한가지transformation은 기술한 것과 매칭되는 data filtering이다. 예제로 만든 text file에서는 Python을 포함하는 string으로 이루어진 새로운 RDD를 생성하는데 사용할 수 있다.

 

>>> pythonLines = lines.filter(lambda line: "Python" in line)

 

한편, action RDD를 기반으로 결과를 계산하고, driver program에 그 결과를 return하거나 HDFS와 같은 외부storage system에 그 결과를 저장하는 것이다. 앞에서 살펴본 action의 한가지 예인 first() RDD에서 첫번째element return한다.

 

>>> pythonLines.first()

u'## Interactive Python Shell'

 

Transformation action의 차이점은 Spark RDD를 계산하는 방식 때문이다. 비록 여러분이 어느 때라도 새로운RDD를 정의할 수 있을지라도, Spark는 단지 최초에는 action에서 RDD를 사용하는 lazy fashion으로 계산을 수행한다. 이러한 접근법은 처음에는 일반적이지 않게 보일지도 모르지만, big data로 작업할 때는 이해가 된다. 예를 들어, 위에서 언급한 첫번째 예를 고려해 보자. 우리는 text file을 정의하고 Python 문자열을 가진 string을 추출해 낼 것이다. 만약, 우리가 lines = sc.textFile()을 호출했을 때, Spark text file내의 모든 line load하고 저장한다면 많은 storage space를 낭비하게 되고, 수많은 line filter output을 받게 될 것이다. 대신에, Sparktransformation의 전체 chain에 대해서 한차례 보고, 그 결과에 필요한 data에 대해서만 연산할 수 있다. 사실 first() action 수행할 때, Spark는 단지 first match line에 해당하는 line을 찾을 때까지 scan만 하면 된다. 전체 파일에 대해서 read하지 않아도 된다.

 

마지막으로 Spark RDD는 기본적으로 여러분이 action을 실행할 때마다 매번 재계산된다. 만약, 여러 action에서RDD를 재사용하고자 한다면, RDD.persist()를 사용하여 RDD Persist(지속하여 유지)하도록 Spark에게 요청할 수 있다. 최초 연산 후에, Spark RDD content memory (Cluster내의 여러 machine에 나누어진 memory)에 저장할 것이다. 그리고 이후 여러 action에서 재사용할 수 있다. Memory 대신 Disk RDD persist(지속적으로 유지)하는 것도 가능하다. 기본적으로 persist하지 않도록 하는 것이 비정상적으로 보일지도 모르지만, big dataset에 대해서 생각해 보면 이해가 된다. 만약 여러분이 RDD를 재사용하지 않을 것이라면, Spark는 단지 한차례 data를 이용하여 결과를 계산할 것이기 때문에 storage space를 낭비할 이유가 없다.

 

실제로는 memory data 중에서 subset load하고 반복적인 query를 수행하기 위하여 persist를 종종 사용한다.예를 들어, Python 문자열을 포함하고 있는 README line들에 대해 여러 결과를 계산하고 싶어한다면 다음과 같은 코드를 작성할 수 있다.

 

>>> pythonLines.persist()

>>> pythoneLines.count()

2

>>> pythonLines.first()

u'## Interactive Python Shell'

 

요약하자면, 모든 Spark program shell session은 다음과 같이 동작할 것이다.

a.External Data로부터 input RDD를 생성한다.

b.     Filter()와 같은 transformation을 사용하여 새로운 RDD를 정의하기 위해 변형한다.

c. 재사용을 위한 중간 결과물인 RDD를 유지하기 위하여 Spark persist()를 요청한다.

d.     Spark에 의해 최적화되고, 실행되는 병렬 연산을 시작하기 위해 count() first()와 같은 action을 실행한다.

 

이 장의 나머지 부분에서는 이 과정에 대해서 자세히 들여다볼 것이다. Spark의 가장 기본이 되는 공통 RDD operation에 대해서도 다룰 것이다.

받은 트랙백이 없고, 댓글이 없습니다.

댓글+트랙백 RSS :: http://www.yongbi.net/rss/response/661

Spark application이 한번 link된 이후에는 Spark Package를 프로그램에 import하여 SparkContext를 생성할 수 있다. Application을 설정하기 위해서 SparkConf object를 생성하고, 그 이후에 SparkContext를 생성할 수 있다. 각각의 언어에 따라 지원되는 간단한 example은 다음과 같다.

 

[Initializing Spark in Python]

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("My App")

sc = SparkContext(conf)

 

[Initializing Spark in Java]

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");

JavaSparkContext sc = new JavaSparkContext(conf);

 

[Initializing Spark in Scala]

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._ 

val conf = new SparkConf().setMaster("local").setAppName("My App")

val sc = new SparkContext("local", "My App")

 

위의 예제에서는 2개의 파라미터를 넘겨서 SparkContext 초기화를 위한 최소한의 방법을 보여준다.

·         Cluster URL : 예제에서는 "local"이라고 쓰인 부분이다. 어떻게 Cluster에 접속할지를 Spark에 말해주는 부분이다. "local" Cluster에 연결하지 않고, local machine의 하나의 thread에서 Spark에서 실행하는 특별한 값을 의미한다.

·         Application name : 예제에서는 "My App"이라고 쓰인 부분이다. 이것은 Cluster에 연결할 때, cluster manager UI에서 표시되는 application identify이다.

 

추가적인 파라미터들은 application이 어떻게 실행되는지 혹은 Cluster 상에 추가되는 code에 대한 설정을 위하여 존재한다. 하지만, 이 부분은 책의 나중에 다룰 것이다.

 

SparkContext를 초기화한 이후에 이전에 보여준 RDD를 생성하고 다루는 모든 method를 사용할 수 있다.

 

끝으로, Spark를 종료하기 위해서는 SparkContext stop() method를 호출하거나 application에서 System.exit(0)이나 sys.exit()를 호출하여 간단하게 종료할 수 있다.

 

이 짧은 overview는 랩탑에서 Standalone Spark application을 실행하기에는 충분하다. 더 많은 advanced configuration을 위해서 이 책의 이후 챕터에서 어떻게 application Cluster에 접속하는지, Application 패키징을 포함하여 어떻게 자동으로 worker node code가 배포되는지에 대해서 다룰 것이다.

 

Conclusion

 

이 챕터에서는 Spark를 다운로드하고 랩탑에서 실행하고, standalone application을 통해 interactive하게 동작하는 부분을 살펴보았다. Spark Core Concept에 대해서 간단히 살펴보고, Spark를 이용한 프로그래밍도 살펴보았다. SparkContext RDD를 생성하는 driver program과 병렬 연산을 처리하는 부분에 대해서도 살펴보았다. 다음 장에서는 RDD operate가 어떻게 이루어지는지에 대해서 더 깊이 살펴볼 것이다.

받은 트랙백이 없고, 댓글이 없습니다.

댓글+트랙백 RSS :: http://www.yongbi.net/rss/response/660

Spark에 대한 빠른 여행에서 빠뜨린 마지막 조각은 어떻게 standalone program을 사용하느냐는 것이다. Interactive하게 실행하는 것을 제외하고, Spark Java, Scala, Python으로 작성된 Standalone application에도 link될 수 있다. Shell에서 그것을 사용하는 주요 차이점은 직접 SparkContext를 초기화 해주어야 한다는 것이다. 그 이후, API는 동일하게 사용할 수 있다.

 

Spark linking하는 과정은 언어에 따라 다르다. Java Scala에서는 application Maven dependency Apache에 의해publish spark-core artifact를 넣어 주면 된다. 이 글이 쓰여질 시점에 최종 Spark 버전은 1.0.0이고, Maven coordinate는 다음과 같다.

 

groupId = org.apache.spark

artifactId = spark-core_2.10

version = 1.0.0

 

Maven에 익숙하지 않다면, maven public repository library를 등록하여 link할 수 있는 Java 기반 언어에 대한 유명한package management tool이다. 프로젝트를 build하는데 Maven을 사용할 수 있다. 혹은 Maven Repository에 연동할 수 있는 다른 툴-Scala SBT tool이나 Gradle과 같은-을 사용할 수도 있다. Eclipse와 같은 유명한 통합 개발 환경 또한 프로젝트에Maven dependency를 직접 추가할 수 있도록 지원한다.

 

Python에서는 Python Script로 간단하게 application을 작성할 수 있다. 하지만, Spark에 포함된 특별한 bin/spark-submit script를 사용하여 실행해야만 한다.  script Spark Python API function으로 사용할 수 있도록 환경을 설정한다. Script는 다음처럼 간단하게 실행할 수 있다.

 

bin/spark-submit my_script.py

 

(윈도우 시스템에서는 / 대신 \(backslash)를 사용해야 함에 주의!)

(NOTE : 1.0 이전 Spark 버전에서는 Python application을 실행하기 위해서 bin/pyspark my_script.py을 사용하라)

 

Application Spark link하기 위한 상세 예제는 공식 Spark Document Quick Spark Guide에 나와 있다. 그 문서의 최종본에는 appendix에 전체 example도 또한 포함되어 있다.

받은 트랙백이 없고, 댓글이 없습니다.

댓글+트랙백 RSS :: http://www.yongbi.net/rss/response/659

Introduction to Spark's Python and Scala Shells

Spark는 즉각적인 Data Analysis를 쉽게 할 수 있는 interactive shell을 제공한다. Spark shell R, Python, Scala와 같은 다른 형태의 shell이나 Bash, windows command prompt와 같은 OS시스템의 shell을 사용해본 경험이 있다면 쉽게 익숙해질 수 있을 것이다.

 

하지만, 하나의 machine에 있는 disk memory에 있는 데이터를 취급하는 대부분의 다른 shell들과는 다르게 Spark shell은 여러 machine disk memory에 분산되어 있는 데이터로 작업할 수 있도록 한다. 그리고 SparkData 처리 process를 자동으로 분산처리하고 보호한다.

 

Spark memory Data load할 수 있기 때문에, 많은 분산 컴퓨팅 환경에서 수초안에 - 심지어 수십대의 machine에서 테라바이트 단위의 data를 처리하는 경우에도 - 처리를 완료할 수 있다. 그러므로 shell을 이용해 sort of iterative, ad-hoc, exploratory analysis를 처리하는 경우에 Spark를 이용해서 처리하는 것이 적합하다. Spark Cluster에 연결하는 것을 지원하기 위해 많이 사용하는 Python, Scala shell 모두 지원한다.

 

[NOTE]

이 책의 대부분의 코드는 Spark에서 지원하는 모든 language로 이루어져 있지만, interactive shell Python Scala만 이용가능하다. Shell API를 배우는데 아주 유용하기 때문에 Java 개발자들도 Python이나 Scala 언어로 모든 example을 사용해 보기를 추천한다. API는 모든 언어에서 동일하다.

 

Spark's Shell power를 설명하는 가장 쉬운 방법은 간단한 data analysis에 사용해 보는 것이다. 공식적인 Spark document에 있는 Quick Start Guide에 있는 example을 돌려보자.

 

첫번째로 Spark's Shell Open한다.

Python version Spark Shell open하기 위해서는(우리는 PySpark Shell이라고도 부른다.) Spark directory로 들어가서 다음을 치면 된다.

 

bin/pyspark (윈도우 시스템에서는 bin\pyspark)

 

Scala versionShell open하기 위해서는 다음을 친다.

 

bin/spark-shell

 

Shell prompt는 몇 초 안에 나타날 것이다. Shell을 시작할 때, 많은 log message를 볼 수 있을 것이다. Log output을 깨끗하게 하기 위해서 [Enter] 키보드를 치면 shell prompt를 볼 수 있다. PySpark Shell을 실행하고 난 후의 화면은 다음 그림과 같다.

사용자 삽입 이미지

[The PySpark Shell With Default Logging Output]


Shell 내에 주의를 산만하게 하는 화면에 출력된 logging statements를 볼 수 있을 것이다. Logging에 대한 길이를 조절할 수 있다. 이렇게 하기 위해서 conf directory log4j.properties 파일을 만들수 있다. Spark 개발자에게는 이미 log4j.properties.template 파일이 포함되어 있다. Logging을 덜 나오게 하려면 conf/log4j.properties.template 파일을 conf/log4j.properties 파일로 복사하고 다음 라인을 찾는다.

 

log4j.rootCategory=INFO, console

 

다음과 같이 변경하여 WARN에 대한 message만 보도록 log level을 낮출 수 있다.

 

log4j.rootCategory=WARN, console

 

Shell을 다시 open했을 때, 이제는 output이 다음과 같이 나타날 것이다.

사용자 삽입 이미지

[The PySpark Shell With Less Logging Output]


[IPYTHON 사용]

Ipython은 많은 사용자들이 애용하고 있는 tab 키보드를 치면 이후 문장을 자동으로 완성시켜주는  Python Shell의 확장판이다. 설치에 대한 안내는 http://ipython.org 에서 찾아볼 수 있다. 변수 1 IPYTHON 환경 설정을 해줌으로써 Spark에서 Ipython을 사용할 수 있다.

 

IPYTHON=1 ./bin/pyspark

 

Web 브라우저 버전의 Ipython Notebook을 사용하기 위해서는 다음을 사용하라.

 

IPYTHON_OPTS="notebook" ./bin/pyspark

 

윈도우 시스템에서는 환경변수 설정을 다음과 같이 하면 된다.

 

set IPYTHON=1
bin\pyspark

 

우리는 Spark를 이용해 Cluster 환경에서 자동으로 분산되어 있는 collection으로부터 계산하는 operation을 수행해 볼 것이다. 이러한 collection RDD (Resilient Distributed Dataset)라고 부른다. RDD Spark에서 분산되어 있는Data와 계산에 대한 가장 기본적인 추상화 계층이다.

 

RDD에 대해서 더 자세히 말하기 전에, local text file로 아래와 같이 ad-hoc data analysis하기 위한 하나의 shell을 간단히 작성해 보자.

 

[Python line count]

>>> lines = sc.textFile("README.md") # Create an RDD called lines

>>> lines.count() # Count the number of items in this RDD

127

>>> lines.first() # First item in this RDD, i.e. first line of README.md

u'# Apache Spark'

 

[Scala line count]

scala> val lines = sc.textFile("README.md") // Create an RDD called lines

lines: spark.RDD[String] = MappedRDD[...]

scala> lines.count() // Count the number of items in this RDD

res0: Long = 127

scala> lines.first() // First item in this RDD, i.e. first line of README.md

res1: String = # Apache Spark

 

Shell에서 빠져 나오려면 Ctrl+D를 누르면 빠져나올 수 있다.

 

위의 예에서, lines라고 불리는 변수는 RDD이고 text file로부터 local machine에 생성된다. 우리는 RDD 상에서 dataset의 구성요소 수를 세거나 (여기서는 text file 내의 line ), 첫번째 라인을 출력하는 것과 같은 여러 병렬 연산들을 수행할 수 있다. 우리는 다음 장에서 RDD에 대해서 더 깊이 다룰 것이다. 하지만, 더 나아가기 전에 Spark의 기본적인 개념들에 대해서 설명하는 시간을 갖도록 하자.

받은 트랙백이 없고, 댓글이 없습니다.

댓글+트랙백 RSS :: http://www.yongbi.net/rss/response/657