홍카나의 공부방

[Hadoop] Apache Spark 2.0 개요와 RDD, DataSet 본문

Data Engineering/Hadoop

[Hadoop] Apache Spark 2.0 개요와 RDD, DataSet

홍문관카페나무 2024. 5. 19. 15:48

 

 

이번 글은 유데미 - 하둡 강좌에서 알려주는 내용을 바탕으로 실제 빅데이터 처리에 빈번하게 사용하는 스파크에 대해서 알아보는 글이다. 이 글에서 설명하는 배경지식은 모두 스파크 2.x 버전을 기준으로 함을 미리 알린다. ( 스파크 3.x 기준은 추후 작성 예정 )

 


 

Apache Spark

 

스파크는 대규모 데이터셋 처리에 사용하는 프레임워크다. 스파크는 다음과 같은 특징이 있다.

 

  • 매퍼와 리듀서의 관점에서 생각해야 하는 맵리듀스의 제약을 벗어날 수 있다.
  • 맵리듀스보다 10~100배는 빠를 수 있다.
  • 생태계가 풍부하다.
  • DAG Engine이 내장되어 있다.
  • Java, Scala, Python과 같은 친숙한 언어로 작성이 가능하다. ( 스파크는 Scala를 기반으로 만들어졌다. )

스파크는 하나의 주된 개념을 기반으로 구축되었는데, 바로 Resilient Distributed Dataset(RDD, 탄력적인 분산 데이터셋)다.

 

 

RDD

출처 : https://www.databricks.com/glossary/what-is-rdd

 

 

 

위 그림에서 살펴볼 수 있듯이, 스파크는 다양한 데이터 추상화를 제공하는데 그중 기본적인 데이터 구조는 RDD, DataFrame, DataSet이다. 가장 먼저 시작된 RDD는 분산 데이터 컬렉션으로, 불변하다는 특징 때문에 한 번 생성된 RDD는 변경할 수 없다. 대신 변환(Transformation)을 통해 새로운 RDD를 생성할 수는 있다. RDD는 클러스터 내부의 여러 노드에 걸쳐 분산 저장되며, 데이터 손실 시 재생성할 수 있는 메타 데이터도 포함하고 있다.

 

RDD 연산은 크게 변환과 행동이라는 두 가지 유형이 있으며, 변환은 새로운 RDD를 반환하는 것이고 행동(Action)은 실제 결과를 반환하는 것이다. RDD는 low-level API로 많은 control 기능을 제공하지만, 그만큼 복잡할 수도 있어서 DataFrame과 DataSet과 같은 고수준의 데이터 추상화를 선택해서 사용할 수 있다.

 

DataFrame과 DataSet

 

RDD는 그저 정보를 가진 행(row)일 뿐이다. 그래서 RDD를 DataFrame 객체로 확장하면 실제 열(Column)을 가진 구조화된 객체로 실행할 수 있고, SQL처럼 데이터를 가져올 수 있다.

 

DataFrame 접근 방식의 강점은 데이터를 더 효율적으로 저장하거나, SQL 쿼리를 최적화하여 Spark를 더 효율적이고 빠르게 실행할 수 있다는 것이다. 아래 코드는 DataFrame을 이용한 pyspark 코드다.

 

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.item") as f:
    	for line in f:
            fields = line.split("|")
            movieNames[int(fields[0])] = fields[1]
    return movieNames
    
def parseInput(line):
    fields = line.split()
    return Row(movieID = int(fields[1]), rating = float(fields[2])

if __name__ = "__main__":
	
    # 스파크 세션 객체 구축 (Spark 2.0 방식)
    # 실패가 발생할 경우, 새로운 스파크 세션 구축 또는 이전 세션 get
    spark = SparkSession.builder.appName("PopularMovies").getOrCreate()
    
    movieNames = loadMovieNames()
    
    # lines라는 RDD로 u.data를 불러옴
    lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
    movies = lines.map(parseInput)
    movieDataset = spark.createDataFrame(movies)
    
    averageRatings = movieDataset.groupBy("movieID").avg("rating")
    
    counts = movieDataset.groupBy("movieID").count()
    
    averagesAndCounts = counts.join(averageRatings, "movieID")
    
    topTen = averagesAndCounts.orderBy("avg(rating)").take(10)
    
    for movie in topTen:
    	print (movieNames[movie[0]], movie[1], movie[2])
        
    # 세션 멈춤
    spark.stop()

 

 

반응형