Chapter 3. Programming with RDDs
이 장에서는 Data를 가지고 작업을 수행할 수 있는 Spark's Core Abstraction인 RDD(Resilient Distributed Dataset)에 대해서 소개하고자 한다. RDD는 간단히 말해서 element들의 distributed collection이다. Spark내에서 모든 작업들은 결과를 계산하기 위해서 새로운 RDD를 생성하거나 기존 RDD를 변형하거나 RDD상에서 연산을 호출하는 것으로써 이루어진다. Spark의 엔진에서는 자동으로 RDD내의 Data를 Cluster로 분산시키고, 수행하고자 하는 작업을 병렬로 처리한다.
Data Scientist나 Engineer 모두 이번 장을 읽어야 한다. Spark에서 RDD가 Core Concept이기 때문이다. Interactive Shell에 있는 example들을 실행해보기를 강력히 추천한다. 이번 장의 모든 code들은 이 책의 Git Hub Repository에 있다. (https://github.com/databricks/learning-spark)
RDD Basics
Spark에서 RDD는 간단하게 말하자면 Object들의 Distributed Collection이라고 할 수 있다. 각각의 RDD는 Cluster의 여러 다른 Node에서 계산되는데 사용되는 multiple partition으로 나누어져 있다. RDD는 다양한 형태의 User-Defined Class인 Python, Java, Scala Object를 포함할 수 있다.
User는 RDD를 외부 Dataset을 loading하는 경우와 Driver Program을 통해 Object의 Collection을 분산 처리하는 경우의 2가지 방법을 통해서 생성할 수 있다. 우리는 이미 SparkContext.textFile()을 사용해서 String 형태의 RDD를text file에서 loading하는 것을 살펴 보았다.
(Example 3-1. Creating an RDD of strings with textFile() in Pythpn)
>>> lines = sc.textFile("README.md")
한번 생성되면, RDD는 다음 2가지 형태의 연산을 제공한다. : transformation and action
Transformation(변형)은 이전 RDD로부터 새로운 RDD를 구성한다. 예를 들면, 우리가 전에 보았던 한가지transformation은 기술한 것과 매칭되는 data filtering이다. 예제로 만든 text file에서는 Python을 포함하는 string으로 이루어진 새로운 RDD를 생성하는데 사용할 수 있다.
>>> pythonLines = lines.filter(lambda line: "Python" in line)
한편, action은 RDD를 기반으로 결과를 계산하고, driver program에 그 결과를 return하거나 HDFS와 같은 외부storage system에 그 결과를 저장하는 것이다. 앞에서 살펴본 action의 한가지 예인 first()는 RDD에서 첫번째element를 return한다.
>>> pythonLines.first()
u'## Interactive Python Shell'
Transformation과 action의 차이점은 Spark과 RDD를 계산하는 방식 때문이다. 비록 여러분이 어느 때라도 새로운RDD를 정의할 수 있을지라도, Spark는 단지 최초에는 action에서 RDD를 사용하는 lazy fashion으로 계산을 수행한다. 이러한 접근법은 처음에는 일반적이지 않게 보일지도 모르지만, big data로 작업할 때는 이해가 된다. 예를 들어, 위에서 언급한 첫번째 예를 고려해 보자. 우리는 text file을 정의하고 Python 문자열을 가진 string을 추출해 낼 것이다. 만약, 우리가 lines = sc.textFile(…)을 호출했을 때, Spark가 text file내의 모든 line을 load하고 저장한다면 많은 storage space를 낭비하게 되고, 수많은 line의 filter output을 받게 될 것이다. 대신에, Spark는transformation의 전체 chain에 대해서 한차례 보고, 그 결과에 필요한 data에 대해서만 연산할 수 있다. 사실 first() action 수행할 때, Spark는 단지 first match line에 해당하는 line을 찾을 때까지 scan만 하면 된다. 전체 파일에 대해서 read하지 않아도 된다.
마지막으로 Spark의 RDD는 기본적으로 여러분이 action을 실행할 때마다 매번 재계산된다. 만약, 여러 action에서RDD를 재사용하고자 한다면, RDD.persist()를 사용하여 RDD를 Persist(지속하여 유지)하도록 Spark에게 요청할 수 있다. 최초 연산 후에, Spark는 RDD content를 memory (Cluster내의 여러 machine에 나누어진 memory)에 저장할 것이다. 그리고 이후 여러 action에서 재사용할 수 있다. Memory 대신 Disk에 RDD를 persist(지속적으로 유지)하는 것도 가능하다. 기본적으로 persist하지 않도록 하는 것이 비정상적으로 보일지도 모르지만, big dataset에 대해서 생각해 보면 이해가 된다. 만약 여러분이 RDD를 재사용하지 않을 것이라면, Spark는 단지 한차례 data를 이용하여 결과를 계산할 것이기 때문에 storage space를 낭비할 이유가 없다.
실제로는 memory에 data 중에서 subset을 load하고 반복적인 query를 수행하기 위하여 persist를 종종 사용한다.예를 들어, Python 문자열을 포함하고 있는 README line들에 대해 여러 결과를 계산하고 싶어한다면 다음과 같은 코드를 작성할 수 있다.
>>> pythonLines.persist()
>>> pythoneLines.count()
2
>>> pythonLines.first()
u'## Interactive Python Shell'
요약하자면, 모든 Spark program과 shell session은 다음과 같이 동작할 것이다.
a.External Data로부터 input RDD를 생성한다.
b. Filter()와 같은 transformation을 사용하여 새로운 RDD를 정의하기 위해 변형한다.
c. 재사용을 위한 중간 결과물인 RDD를 유지하기 위하여 Spark에 persist()를 요청한다.
d. Spark에 의해 최적화되고, 실행되는 병렬 연산을 시작하기 위해 count()나 first()와 같은 action을 실행한다.
이 장의 나머지 부분에서는 이 과정에 대해서 자세히 들여다볼 것이다. Spark의 가장 기본이 되는 공통 RDD operation에 대해서도 다룰 것이다.
트랙백 주소 :: http://www.yongbi.net/trackback/661
트랙백 RSS :: http://www.yongbi.net/rss/trackback/661
댓글을 달아 주세요
댓글 RSS 주소 : http://www.yongbi.net/rss/comment/662