본문 바로가기
Bigdata Components/SPARK

[SPARK] Spark 세부 동작(Transformation, Action)

by Blue____ 2024. 1. 6.

이전 글에서는 spark의 기본적인 개념과 아키텍쳐, 대략적인 동작 과정을 살펴 보았습니다. 

이번 글에서는 Spark의 세부 동작 과정의 중요 개념인 Transformation, Action과 Executor memory 구조에 대해서 살펴 볼 예정입니다. 


Spark 동작 과정

 

RDD(Resilient Distributed Datasets)

 

Spark의 Transformation, Action 과정을 설명하기 전에 Spark의 기본적인 데이터구조인, RDD(Resilient Distributed Datasets)에 대해 간단한 설명이 필요합니다.

 

RDD는 spark에서의 데이터구조이고, 쉽게 말해 데이터들의 집합이라고 생각하면 편할 것 같습니다. RDD라는 데이터 집합은 partition이라는 논리적 분할 데이터로 구성되어 있으며, 물리적으로는 객체들의 집합 데이터라고 이해하면 편합니다.

 

  • RDD의 특징
    • read-only의 partition된 record들의 집합이며, 다른 Storage(외부 파일, HDFS, S3 등)와 다른 RDD(다른 RDD의 연산 결과)로부터도 만들어 질 수 있습니다.
    • 또한, Driver에 있는 collection을 parallelize합니다. 간단하게 풀어 설명하면, driver에 있는 list, 배열, 다른 collection을 spark의 RDD로 변환한다는 것을 의미하며, 이는 여러 노드에 병렬로 분산됩니다.
    • 추가적으로 외부 스토리지(예를 들어, HDFS, S3 등)에 있는 데이터를 참조해서 만들어진다는 특징이 있습니다.

 

Transformation

 

RDD Transformation은 RDD의 데이터에 변화를 주는 Spark operation들을 통칭한다고 정의할 수 있습니다. 

  • 위의 내용을 풀어 설명하면, Transformation의 결과는 새로운 하나, 혹은 복수의 RDD가 되며, 기존 RDD는 불변하기 때문에 Trnasformation의 결과는 항상 새로운 RDD됩니다.
  • 이렇게 Transformation의 결과("Transformation과 Action 과정의 시각화" 참고)를 RDD operator graph, RDD dependency graph 등으로 칭합니다. 

Transforamtion의  가장 큰 특징 중의 하나는 Lazy operation이라는 것입니다.

  • 직역하면, 느긋한 혹은 지연된 실행? 정도로 해석이 되는데, 말 그대로 해당 코드(Transformation에 해당 하는)의 수행(Operation) 시점이 아니라, Action 과정의 함수가 호출되어야 앞선의 Transformation의 RDD들이 수행됩니다.
  • 즉, Transformation 과정에서의 함수들(아래에서 간단한 테스트 진행 예정)이 호출될 때는 Operation 자체에 대해서 기록 및 계획만 세우고, 뒤에 Action이 호출(쉽게 말해 데이터 결과를 return하는)이 되어야 앞선의 RDD transformation들이 순차적으로 수행되어, data(결과)를 return합니다.

Transformation과 Action 과정의 시각화

 

RDD가 Transformation되는 방식은 크게 두가지가 있는데, 두 종류 모두 RDD 내부의 parition들과 관련이 있습니다.

 

Narrow Transformation

Narrow Transformation은 하나의 파티션(RDD의 논리적 데이터 부분들 - 물리적으로 파티션은 여러 노드에 분산되어 존재한다.) 사이의 데이터 이동 없는(partition들의 단방향 이동은 있습니다.) 작업이 Narrow Transformation입니다.

Narrow Transformation

 

대표적인 Narrow Transformation에 해당하는 함수는 map(), mapPartition(), fillter(), union()이 있다.

 

Narrow Transformation의 대표 함수

 

위의 그림("Narrow Transformation")에서 화살표(Transformation)은 partition의 데이터 이동이 아니라, Transformation을 통해 새로운 RDD(partition들의 집합)가 생성되는 걸로 생각하시면 됩니다. 

 

Wider Transformation

Wider Transformation는 여러 파티션에 걸쳐서 데이터를 처리하는 작업을 의미합니다. 즉, 파티션 사이에서 데이터의 이동인 Shuffle이 필요한 작업들입니다. 다른 말로, shuffle transformation이라고도 부릅니다. 

 

Wider Transformation

 

대표 함수로는 groupby(), reduceByKey(), Join(), aggregate(), repartition() 등이 있습니다. 

Wider Transformation의 대표 함수

위와 같이 groupby(), reduceByKey(), Join(), aggregate(), repartition() 등은 Transformation 과정에서 파티션 사이의 데이터 이동(shuffle)이 굉장히 많이 발생하게 됩니다.

 

Action

RDD actions 은 value를 리턴하는 operation입니다. 앞의 내용인, Transformation이 lazy 하다는 것은, RDD의 transformation 선언은 해당 RDD에 대한 action에 해당하는 operation이 있을때 수행된다는 것을 의미한다. "Transformation과 Action 과정 시각화" 그림을 보면, 이해가 더 편한데, 앞선의 Transforatmation RDD들은 action의 함수 호출이 되면, 앞선의 Transformation RDD들이 다 수행이되면서 data를 return하게 됩니다. 

 

Action의 대표 함수는 collect(), count(), countByValue(), take(), foreach() 등이 있습니다.

 

Spark history UI 를 통해 확인

테스트는 매우 간단한 Transformation, action 들의 함수들로 이루어져있습니다. 우선, 아래와 같은 테스트 케이스들이 있습니다.

  • Narrow Transforamation → action
# , 1~10 rdd paralize. rdd .
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# narrow transformation
# 1~10 RDD map
key_value_rdd = rdd.map(lambda x: ("" if x % 2 == 0 else "", x))

# collect() - action
key_value_rdd.collect()

# partition 확인
key_value_rdd.getNumPartitions()
: 2

 

우선, 위의 코드를 순서대로 보면, narrow transformation인 map() 함수를 통해  transformation를 수행하고, collect()를 통해 action을 수행할 예정입니다. 

 

먼저, Transformation 코드인 map 함수까지 코드 수행 후, spark history UI에서 확인해보면, 제출된 jobs들이 아직 확인되지 않습니다. 즉, Transformation만 수행하였기 때문에 연산과정이 기록되지 않습니다. (Transforamtion의 LAZY하다는 특징)

 

 

이제 action을 수행하는 함수인 collect()를 호출합니다.

 

action ( collect() )을 수행하고 나면, 위와 같이 job이 생기게 되고, job 안에 하나의 stage, stage안에 총 2개의 task가 수행된 것을 확인됩니다. 

 

 

job detail을 확인해보면, collect()를 통해 action이 이루어졌고, Narrow Transforamation 만을 진행했기 때문에 어떠한 shuffle도 일어나지 않은걸로 확인됩니다.  즉, 파티션 사이의 데이터 이동(shuffle)이 일어나지 않은 것이 확인됩니다. 

 

  • Wider Transforamation → action
# 1~10 rdd paralize
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# wider transformation (groupby key, value groupy)
key_value_rdd = rdd.map(lambda x: ("" if x % 2 == 0 else "", x))
grouped_rdd = key_value_rdd.groupByKey()

# action
# collect()
result = grouped_rdd.collect()

# partition
key_value_rdd.getNumPartitions()
: 2

 

groupbykey()를 통해 wider transformation 수행 후, 위와 동일하게 collect()함수로 action 수행합니다. 

 

 

action ( collect() ) 이후, 이번에는 2개의 stage, 총 4개의 task가 수행된 것으로 확인됩니다. 

 

 

이번에는 groupbykey()라는 wider transformation 함수를 수행했기에 shuffle R/W가 214Byte가 일어난 것으로 확인됩니다. 즉, wider transformation 함수인 groupbykey()로 인해 partition 간의 데이터 이동(Shuffle)이 일어나게 됩니다.

 

 

이번 글에서는 Spark의 세부 동작(Transformation, Action)을 확인 했습니다. 다음 글에서는 Spark의 executor memory 구조에 대해서 알아볼 예정입니다.