이번 글에서는 spark(pyspark)-shell 환경에서 spark-shell 어플리케이션의 환경 변수(conf)를 변경하는 방법에 대해서 알아 볼 예정입니다.
spark-shell 환경을 사용하다 보면, 사용 와중에 spark-context의 환경 변수(conf)를 변경해야 하는 경우가 생깁니다. 예를 들어, dynamicAllocation 설정을 추가한다 던지, executor의 instance 수를 조정하고 싶은 경우가 그런 경우입니다.
하지만 spark(pyspark)-shell 환경에서 작업을 하다보면, spark-session을 다시 설정한다고 해도 원하는대로 적용이 안되는 경우를 경험하신 분들이 많으실 겁니다. 이러한 경우, 처음에 올라온 spark-shell의 session을 다시 구성하는 법을 살펴보겠습니다.
아래의 Error는 spark-shell 환경에서 작업을 하다, Executor의 memory가 부족해 OOM이 발생한 경우입니다.
Py4JJavaError: An error occurred while calling o1607.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 79 in stage 20.0 failed 4 times, most
recent failure: Lost task 79.3 in stage 20.0 (TID 2740, {worker-node-1}, executor 329): org.apache.
spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory, got 0
at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:
128)
...
Error 로그를 살펴보면, worker-node-1, Executor의 Memory가 부족해 특정 코드에서 OOM이 발생하게 되었습니다. 이러한 경우, 가장 간단한 튜닝 방법은 Executor의 Memory를 늘려주는 방법입니다. 따라서 아래와 같이 Executor의 메모리를 4G로 늘려, SparkSession을 build하였습니다.
sc = SparkSession \
.builder.config("spark.executor.memory","4") \
.config("spark.executor.cores","2") \
.getOrCreate()
sc.getConf().getAll()
[('spark.executor.memory', '4G'),
('spark.executor.cores','2')
...
새로운 SparkSession을 생성햇고, 해당 sc 변수에 담긴 SparkContext의 conf 내용을 확인했습니다. 위와 같이 정상적으로 변경된 memory와 core가 적용되어있는 것으로 확인됩니다.
하지만, 이후에도 기존과 동일한 Error가 지속 발생했습니다. 이유는 "SparkSession의 기존 환경 변수값이 변경되지 않았기 때문입니다." sc.getConf().getAll()을 통해 확인한 conf 내역은 적용된 것처럼 보이지만, 실제 Session 내부적으로는 변경되지 않았던 것입니다.
spark-history server에서 해당 spark-shell application의 environment 영역을 확인해보면 기존의 설정 값과 동일한 것으로 확인이 됩니다. 즉, spark-shell은 접속과 동시에 SparkContext가 sc라는 변수에 담겨 생성이 되는데, 해당 Session의 환경 변수를 변경하기 위해선 기존의 Session을 중지 시킨 후, 새로운 SparkContext를 build 하여야 합니다. (결국, SparkContext도 Container 형태로 application이 올라오는데, 기존 설정으로 생성된 Container에서는 변경이 안됩니다.)
방법은 아주 간단한데, 아래 코드와 같이 SparkContext가 담긴 sc변수의 stop() 함수를 이용해 기존의 Session을 종료한 후에, 다시 새로운 SparkSession을 생성하면 됩니다. (sc변수에 Spark-Context가 없다면, Spark-Context가 담긴 변수를 재확인 해야 합니다.)
sc.stop()
sc = SparkSession \
.builder.config("spark.executor.memory","4") \
.config("spark.executor.cores","2") \
.getOrCreate()
sc.getConf().getAll()
[('spark.executor.memory', '4G'),
('spark.executor.cores','2')
...
결과적으로 위의 방법을 통해, OOM 이슈를 해결할 수 있었습니다. 단, spark-shell의 경우 기본적으로 가벼운 Task들을 위한 환경이므로, 무거운 spark task의 경우는 spark-submit을 통해 진행을 권장합니다.
'Bigdata Components > SPARK' 카테고리의 다른 글
[SPARK] java.io.FileNotFoundException: {file_name}.conf ERROR 해결 방법 (0) | 2024.01.08 |
---|---|
[SPARK] spark job과 partition의 개념 (1) | 2024.01.06 |
[SPARK] Spark의 Execuotr memory 구조 (0) | 2024.01.06 |
[SPARK] Spark 세부 동작(Transformation, Action) (0) | 2024.01.06 |
[SPARK] Spark 개념 정리 및 Architecture (0) | 2024.01.06 |