메모장

빅데이터 분산을 위한 스파크2 프로그래밍 본문

교육(KOCW, 오프라인)

빅데이터 분산을 위한 스파크2 프로그래밍

hiandroid 2017. 9. 11. 16:28
반응형

스파크

 - 하둡과는 달리 메모리를 이용한 데이터 저장방식을 제공

 - 머신러닝 등 반복적인 데이터 처리가 필요한 분야에서 높은 성능을 보여줌

 - 스파크2.0부터는 자바, 스칼라, 파이썬, R스크립트로도 스파크 애플리케이션 작성 가능


스파크 라이브러리

스파크SQL: 하이브와 연동이 가능한 스키마기반 데이터분석 모듈

스파크 스트리밍: 실시간 스트리밍 데이터를 다루는 모듈

GraphX: 그래프 알고리즘 처리 모듈

SparkR: 통계분석프로그램인 R과의 연동을 지원하는 모듈

MLlib: 머신러닝 알고리즘 수행 모듈


하둡 파일시스템 기본적인 동작 방법

 - 분석할 데이터를 하둡파일시스템인 HDFS에 저장해 두고 HDFS상에서 맵리듀스 프로그램을 이용해 데이터를 처리

 - 하둡파일시스템은 하나의 네임노드와 여러개의 데이터노드로 구성되며, 하나의 네임노드가 나머지 데이터 노드를 관리하는 형태로 동작


파이썬 워드카운트


wordcount.py

 - 메인함수가 있고 스파크컨텍스트를 만든 다음 RDD를 생성하고 필요한 연산을 적용한뒤 결과를 저장

from pyspark import SparkContext, SparkConf
import sys

class WordCount:

def getSparkContext(self, appName, master):
conf = SparkConf().setAppName(appName).setMaster(master)
conf.set("spark.local.ip", "127.0.0.1");
conf.set("spark.driver.host", "127.0.0.1");
return SparkContext(conf=conf)

def getInputRDD(self, sc, input):
return sc.textFile(input)

def process(self, inputRDD):
words = inputRDD.flatMap(lambda s : s.split(" "))
wcPair = words.map(lambda s : (s,1))
return wcPair.reduceByKey(lambda x, y: x + y)

if __name__ == "__main__":
wc = WordCount()
sc = wc.getSparkContext("appName", sys.argv[1])
inputRDD = wc.getInputRDD(sc, sys.argv[2])
resultRDD = wc.process(inputRDD)
resultRDD.saveAsTextFile(sys.argv[3])
sc.stop()

wordcount_text.py

import unittest
from pyspark import SparkContext, SparkConf
from wordcount import WordCount

class WordCountTest(unittest.TestCase):
def testWordCount(self):
wc = WordCount()
sc = wc.getSparkContext("appName", "local[*]")
input = ["Apache Spark is a fast and general engine for large-scale data processing.", "Spark runs on both Windows and UNIX-like systems"]

inputRDD = sc.parallelize(input)
resultRDD = wc.process(inputRDD)
resultMap = resultRDD.collectAsMap()

self.assertEqual(resultMap['Spark'], 2)
self.assertEqual(resultMap['UNIX-like'], 1)
self.assertEqual(resultMap['runs'], 1)

print(resultMap)

sc.stop()

위의 wordcount.py를 가지고 README.md파일을 wordcount해서 testresult에 결과값을저장

- spark-submit(스파크서브밋으로 실행) wordcount.py(연산수행할 파이썬소스) README.md(워드카운트할 파일) testresult(결과물위치)

./bin/spark-submit file:/home/nation909/dev/Apps/spark-2.1.0-bin-hadoop2.7/source/Python/ch1/wordcount.py local[*] file:/home/nation909/dev/Apps/spark-2.1.0-bin-hadoop2.7/README.md file:/home/nation909/dev/Apps/spark-2.1.0-bin-hadoop2.7/source/Python/testresult



pyspark 셸로 개발(wordcount)

sample.txt 내용
Once more I summon you Out of the past With poignant love, You who nourished the poet And the lover. I see your gray eyes Looking out to sea In those Rockport summers, Keeping a distance Within the closeness Which was never intrusive Opening out Into the world.
sample.txt파일을 inputRDD에 저장 >>> inputRDD = sc.textFile("file:/home/nation909/dev/Apps/spark-2.1.0-bin-hadoop2.7/sample.txt") >>> inputRDD.glom().collect() [[u'Once more', u'I summon you', u'Out of the past', u'With poignant love,', u'You who nourished the poet', u'And the lover.', u'I see your gray eyes', u'Looking out to sea'], [u'In those Rockport summers,', u'Keeping a distance', u'Within the closeness', u'Which was never intrusive', u'Opening out', u'Into the world.']] words변수에 flatMap으로 가공하는데 띄어쓰기한 경우도 분리 >>> words = inputRDD.flatMap(lambda str :str.split(" ")) >>> words.glom().collect() [[u'Once', u'more', u'I', u'summon', u'you', u'Out', u'of', u'the', u'past', u'With', u'poignant', u'love,', u'You', u'who', u'nourished', u'the', u'poet', u'And', u'the', u'lover.', u'I', u'see', u'your', u'gray', u'eyes', u'Looking', u'out', u'to', u'sea'], [u'In', u'those', u'Rockport', u'summers,', u'Keeping', u'a', u'distance', u'Within', u'the', u'closeness', u'Which', u'was', u'never', u'intrusive', u'Opening', u'out', u'Into', u'the', u'world.']] words1변수에 map으로 가공하는데 띄어쓰기한 경우도 분리 >>> words1 = inputRDD.map(lambda str : str.split(" ")) >>> words1.glom().collect() [[[u'Once', u'more'], [u'I', u'summon', u'you'], [u'Out', u'of', u'the', u'past'], [u'With', u'poignant', u'love,'], [u'You', u'who', u'nourished', u'the', u'poet'], [u'And', u'the', u'lover.'], [u'I', u'see', u'your', u'gray', u'eyes'], [u'Looking', u'out', u'to', u'sea']], [[u'In', u'those', u'Rockport', u'summers,'], [u'Keeping', u'a', u'distance'], [u'Within', u'the', u'closeness'], [u'Which', u'was', u'never', u'intrusive'], [u'Opening', u'out'], [u'Into', u'the', u'world.']]] wcPair변수에 map으로 key,value로 만드는데 단어를 key로 1를 value로 가공 >>> wcPair = words.map(lambda s: (s,1)) >>> wcPair.glom().collect() [[(u'Once', 1), (u'more', 1), (u'I', 1), (u'summon', 1), (u'you', 1), (u'Out', 1), (u'of', 1), (u'the', 1), (u'past', 1), (u'With', 1), (u'poignant', 1), (u'love,', 1), (u'You', 1), (u'who', 1), (u'nourished', 1), (u'the', 1), (u'poet', 1), (u'And', 1), (u'the', 1), (u'lover.', 1), (u'I', 1), (u'see', 1), (u'your', 1), (u'gray', 1), (u'eyes', 1), (u'Looking', 1), (u'out', 1), (u'to', 1), (u'sea', 1)], [(u'In', 1), (u'those', 1), (u'Rockport', 1), (u'summers,', 1), (u'Keeping', 1), (u'a', 1), (u'distance', 1), (u'Within', 1), (u'the', 1), (u'closeness', 1), (u'Which', 1), (u'was', 1), (u'never', 1), (u'intrusive', 1), (u'Opening', 1), (u'out', 1), (u'Into', 1), (u'the', 1), (u'world.', 1)]]
resultRDD에 key를 기준으로 같은 key일 경우 값을 더해서 결과물 출력 >>> resultRDD = wcPair.reduceByKey(lambda x, y: x+y) >>> resultRDD.glom().collect() [[(u'sea', 1), (u'closeness', 1), (u'Keeping', 1), (u'Looking', 1), (u'see', 1), (u'Which', 1), (u'You', 1), (u'summers,', 1), (u'poet', 1), (u'you', 1), (u'world.', 1), (u'a', 1), (u'eyes', 1), (u'was', 1), (u'I', 2), (u'poignant', 1), (u'nourished', 1), (u'intrusive', 1), (u'those', 1), (u'And', 1), (u'past', 1), (u'With', 1), (u'Into', 1), (u'Rockport', 1), (u'the', 5), (u'lover.', 1)], [(u'gray', 1), (u'never', 1), (u'Opening', 1), (u'distance', 1), (u'love,', 1), (u'Once', 1), (u'who', 1), (u'Within', 1), (u'summon', 1), (u'to', 1), (u'Out', 1), (u'In', 1), (u'of', 1), (u'out', 2), (u'your', 1), (u'more', 1)]] resultRDD를 testresult2디렉토리에 결과물 저장 >>> resultRDD.saveAsTextFile("file:/home/nation909/dev/Apps/spark-2.1.0-bin-hadoop2.7/source/Python/ch1/testresult2") 스파크SQL

데이터셋
- SQL과 유사한 방식의 연산 제공(스파크 2.0부터 데이터프레임은 데이터셋으로 통합)
- 스파크SQL에서 사용하는 분산 데이터 모델

스파크세션
- 데이터프레임을 생성하기 위해 SparkSEssion을 이용해야 함.
- 스파크세션은 인스턴스 생성을 위한 build()메서드를 제공(이 메서드는 기존인스턴스를 재사용하거나 새로운 인스턴스를 생성할 수 있음)

데이터프레임
- org.aoache.spark.sql.Row타입의 데이터로 구성된 데이터셋
- 데이터베이스의 테이블 또는 R의 데이터프레임과 유사한 방법으로 데이터를 다룰 수 있는 다양한 메서드 제공

DataFrameReader
- 스파크세션의 read()메서드를 통해 접근하며, jdbc, json, parquet등 다양한 유형의 데이터 소스로부터 데이터프레임을 생성하는 메서드 제공

DataFrameWriter
- Dataset의 write()메서드를 통해 접근하며, 데이터셋에 저장된 데이터를 파일시스템, 데이터베이스 등 다양한 저장소에 저장할 때 사용하는 메서드 제공

로우, 칼럼
- 데이터프레임을 구성하는 요소인 로우와 칼럼을 표현하는 모델이자 API
- 데이터프레임에 포함된 데이터를 처리할 때 사용하는 대부분의 메서드 제공

functions
- 칼럼과 더불어 데이터프레임을 이용해 데이터를 처리할 때 사용할 수 있는 각종 함수를 제공하는 오브젝트
- sum, stddev와 같은 다양한 집계, 통계함수를 제공

StructType, StructField
- 데이터에 대한 스키마 정보를 나타내는 API
- StructType은 데이터프레임의 레코드에 대한 구조 정보를 나타냄
- StructField는 레코드의 필드 정보를 나타냄

GroupedData, GroupedDataSet
- groupBy()메서드 등에 의해 그루핑 연산을 수행할때 사용

스파크SQL코드 작성 흐름
1. 스파크세션 생성
2. 스파크세션으로부터 데이터셋 또는 데이터프레임 생성
3. 생성된 데이터셋 또는 데이터프레임을 이용해서 데이터 처리
4. 처리된 결과 데이터를 외부 저장소에 저장
5. 스파크세션 종료


1. 스파크세션 생성
from pyspark.sql import SparkSession

spark = SparkSession\
.builder\
.appName("sample")\
.master("local[*]")\
.getOrCreate()

# builder()메서드는 스파크세션을 생성 할 수있는 빌더인스턴스를 생성
# appName()과 master()는 빌더가 제공하는 메서드로서 각각 애플리케이션 이름과 마스터정보를 설정
# config()메서드를 이용해 config("spark.local.ip", "127.0.0.1")와 같은 형태로 지정
from pyspark.sql import SparkSession

# 1. 스파크 세션 생성
spark = SparkSession\
.builder\
.appName("sample")\
.master("local[*]")\
.getOrCreate()
# builder()메서드는 스파크세션을 생성 할 수있는 빌더인스턴스를 생성
# appName() master()는 빌더가 제공하는 메서드로서 각각 애플리케이션 이름과 마스터정보를 설정
# config()메서드를 이용해 config("spark.local.ip", "127.0.0.1")와 같은 형태로 지정


# 2. 스파크세션으로부터 데이터셋 또는 데이터프레임 생성
source = "file:/home/nation909/dev/Apps/spark-2.1.0-bin-hadoop2.7/README.md"
df = spark.read.text(source)
# 스파크세션의 read()메서드는 DataFrameReader 인스턴스를 돌려주는데,
# 다양한 유형의 데이터로 부터 데이터프레임을 생성 할 수있음
# 데이터소스로 텍스트파일을 지정했기 때문에 text()메서드 사용


# 3. 생성된 데이터프레임 또는 데이터셋을 이용해 데이터처리
# 데이터프레임을 사용해 단어를 분리하고 단어별 개수를 세는 코드
from pyspark.sql.functions import *

wordDF = df.select(explode(split(col("value"), " ")).alias("word"))
result = wordDF.groupBy("word").count()

print wordDF
print result
주석으로 인해 에러가 날 수도 있다...(주석 지우면 에러 안남)


반응형