일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 정리
- AWS
- 데이터 엔지니어링
- 가상환경
- 데이터 파이프라인
- TIL
- sql
- redshift
- PYTHON
- 데이터 웨어하우스
- 운영체제
- HADOOP
- Docker
- TCP
- 파이썬
- 데이터베이스
- dockerfile
- airflow.cfg
- Django
- 컴퓨터네트워크
- 데브코스
- S3
- http
- linux
- 자료구조
- 데이터엔지니어링
- 컴퓨터 네트워크
- airflow
- Go
- 종류
- Today
- Total
홍카나의 공부방
[Hadoop] Apache Spark 2.0 개요와 RDD, DataSet 본문
이번 글은 유데미 - 하둡 강좌에서 알려주는 내용을 바탕으로 실제 빅데이터 처리에 빈번하게 사용하는 스파크에 대해서 알아보는 글이다. 이 글에서 설명하는 배경지식은 모두 스파크 2.x 버전을 기준으로 함을 미리 알린다. ( 스파크 3.x 기준은 추후 작성 예정 )
Apache Spark
스파크는 대규모 데이터셋 처리에 사용하는 프레임워크다. 스파크는 다음과 같은 특징이 있다.
- 매퍼와 리듀서의 관점에서 생각해야 하는 맵리듀스의 제약을 벗어날 수 있다.
- 맵리듀스보다 10~100배는 빠를 수 있다.
- 생태계가 풍부하다.
- DAG Engine이 내장되어 있다.
- Java, Scala, Python과 같은 친숙한 언어로 작성이 가능하다. ( 스파크는 Scala를 기반으로 만들어졌다. )
스파크는 하나의 주된 개념을 기반으로 구축되었는데, 바로 Resilient Distributed Dataset(RDD, 탄력적인 분산 데이터셋)다.
RDD
위 그림에서 살펴볼 수 있듯이, 스파크는 다양한 데이터 추상화를 제공하는데 그중 기본적인 데이터 구조는 RDD, DataFrame, DataSet이다. 가장 먼저 시작된 RDD는 분산 데이터 컬렉션으로, 불변하다는 특징 때문에 한 번 생성된 RDD는 변경할 수 없다. 대신 변환(Transformation)을 통해 새로운 RDD를 생성할 수는 있다. RDD는 클러스터 내부의 여러 노드에 걸쳐 분산 저장되며, 데이터 손실 시 재생성할 수 있는 메타 데이터도 포함하고 있다.
RDD 연산은 크게 변환과 행동이라는 두 가지 유형이 있으며, 변환은 새로운 RDD를 반환하는 것이고 행동(Action)은 실제 결과를 반환하는 것이다. RDD는 low-level API로 많은 control 기능을 제공하지만, 그만큼 복잡할 수도 있어서 DataFrame과 DataSet과 같은 고수준의 데이터 추상화를 선택해서 사용할 수 있다.
DataFrame과 DataSet
RDD는 그저 정보를 가진 행(row)일 뿐이다. 그래서 RDD를 DataFrame 객체로 확장하면 실제 열(Column)을 가진 구조화된 객체로 실행할 수 있고, SQL처럼 데이터를 가져올 수 있다.
DataFrame 접근 방식의 강점은 데이터를 더 효율적으로 저장하거나, SQL 쿼리를 최적화하여 Spark를 더 효율적이고 빠르게 실행할 수 있다는 것이다. 아래 코드는 DataFrame을 이용한 pyspark 코드다.
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
def loadMovieNames():
movieNames = {}
with open("ml-100k/u.item") as f:
for line in f:
fields = line.split("|")
movieNames[int(fields[0])] = fields[1]
return movieNames
def parseInput(line):
fields = line.split()
return Row(movieID = int(fields[1]), rating = float(fields[2])
if __name__ = "__main__":
# 스파크 세션 객체 구축 (Spark 2.0 방식)
# 실패가 발생할 경우, 새로운 스파크 세션 구축 또는 이전 세션 get
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()
movieNames = loadMovieNames()
# lines라는 RDD로 u.data를 불러옴
lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
movies = lines.map(parseInput)
movieDataset = spark.createDataFrame(movies)
averageRatings = movieDataset.groupBy("movieID").avg("rating")
counts = movieDataset.groupBy("movieID").count()
averagesAndCounts = counts.join(averageRatings, "movieID")
topTen = averagesAndCounts.orderBy("avg(rating)").take(10)
for movie in topTen:
print (movieNames[movie[0]], movie[1], movie[2])
# 세션 멈춤
spark.stop()
'Data Engineering > Hadoop' 카테고리의 다른 글
[Hadoop] MongoDB와 Hadoop (0) | 2024.05.27 |
---|---|
[Hadoop] Hive 소개와 작동 방식 (5) | 2024.05.26 |
[Hadoop] Apache Pig의 개요와 사용법 (0) | 2024.05.19 |
[Hadoop] HDFS 개요와 HDFS 접근 방법, MapReduce (0) | 2024.05.12 |
[Hadoop] NCP에서 하둡 실습 환경 구축하기 (1) | 2024.05.11 |