메모장

빅데이터 분산 컴퓨팅 정리(19~23강 python spark 관련) 본문

교육(KOCW, 오프라인)/빅데이터분산컴퓨팅(하둡에코시스템)

빅데이터 분산 컴퓨팅 정리(19~23강 python spark 관련)

hiandroid 2017. 9. 5. 18:53
반응형

RDD(Resilient Distributed Dataset)

 - Resilient: 메모리 내에서 데이터가 손실되는 경우, 다시 생성할 수 있다.

 - Distributed: 클러스터를 통해 메모리에 분산되어 저장된다.

 - DataSet: 초기 데이터는 파일을 통해 가져올 수 있다.

 - RDD는 스파크에서의 기본적인 데이터의 단위


대부분의 스파크 프로그래밍은 RDD를 통한 동작으로 구성된다.


RDD생성 방법

1. 텍스트파일로 생성

ex.

sc.textFile("myfile.txt")

sc.textFile("mydata/*.log")

sc.textFile("myfile1.txt, myfile2.txt")


2. 메모리에 있는 데이터를 통해 생성

ex.

num = [1,2,3,4]

rdd = sc.parallelize(num)


parallelize로 생성


3. RDD를 통해 생성

>>> newRDD = sc.textFile("text.txt")

>>> newRDD_uc = newRDD.map(lambda line: \ line.upper())   - line을 읽어서 전부 대문자로 변경

>>> newRDD_uc.count()

4


RDD함수의 종류

Actions: 값을 리턴(RDD의 특정 값을 가져옴)

RDD -> value


Transformations: 현재의 것에 기초하여 새로운 RDD를 정의

Base RDD -> New RDD


RDD에 있는 데이터는 절대 바꿀수 없다.


주요 Transformation 함수

map(function): 주어진 RDD의 각 레코드(라인) 별로 기능을 수행하여 새로운 RDD를 생성

ex.

>>> newRDD = sc.textFile("text.txt")

>>> newRDD_uc = newRDD.map(lambda line: \ line.upper())   - line을 읽어서 전부 대문자로 변경


filter(function): 주어진 RDD를 라인(레코드)별로 조건에 맞는 라인으로 새로운 RDD를 생성

ex.

>>> newRDD = sc.textFIle("text.txt")

>>> newRDD.count()

4


>>> newRDD_ft = newRDD.filter(lambda line: \ line.startswith('T')    - line을 읽어서 첫 글자가 T로 시작하는 것만 남김

>>> newRDD_ft.count()

2


flatMap(function): base RDD의 각 라인별 엘리먼트를 각 엘리먼트 단위로 매핑

ex.

>>> sc.textFIle("test.txt") \ .map(lambda line: line.split()) \

map일 경우

[ ["time", "can"]

["time","bend"]

["would", "you"] ]

flatMap일 경우

["time", "can", "time", "bend", "would", "you"]


distinct: 중복제거

ex.

>>> sc.textFIle("test.txt") \ .flatMap(lambda line: line.split()).distinct()

flatMap만 할 경우

["time", "can", "time", "bend", "would", "you"]

distinct할 경우

["time", "can", "bend", "would", "you"]


주요 Action 함수

count(): RDD의 요소의 개수를 반환

ex.

>>> newRDD = sc.textFIle("test.txt")

>>> newRDD.count()

4


take(n): RDD의 첫번째 요소부터 n개의 요소를 리스트로 반환

ex.

>>> newRDD = sc.textFile("test.txt")

>>> newRDD take(2)    - newRDD의 1,2번째를 리스트로 반환시킴

[    u`Time can bring you down',

     u`Time can bend your knees'

]


collect(n): RDD의 모든 요소를 반환

ex.

>>> newRDD = sc.textFIle("test.txt")

>>> newRDD.collect()

[u" Time can bring you donw",

u' Time can bend your knees',

u' Would you know my name',

u" If I saw you in heaven"]

 - 앞의 u는 유니코드를 뜻함


saveAsTextFile(path): RDD를 파일로 저장

ex.

>>> newRDD = sc.textFIle("test.txt")

>>> newRDD_ft = newRDD.filter(lambda line: \ linestartswith('T'))

>>> newRDD_ft.saveAsTextFle("output")

 - T로 시작하는 라인만 가져와서 output이라는 파일로 저장



Lazy Execution

 - 맨 나중에 실행됨(action이 되기전에는 기본 틀만 만들고 action이 되면 그때부터 실질적 작업 수행)

 - RDD의 데이터는 action함수로 인한 작업이 수행 될 때까지, 처리되지 않음


체인형식

>>> sc.textFIle("test.txt").map(lambda line: line.upper()) \ .filter(lambda line: line.startswith('T')).count()

2



여러번 사용할 경우

def toUpper(s):

return s.upper()


newRDD = sc.textFile("test.txt")

newRDD = map(toUpper).take(2)



한번만 사용할 경우

newRDD.map(lambda line: line.upper()).take(2)




Spark context

 - spark에서는 driver와 executors사이에 통신이 일어남

 - driver는 spark job을 가지고 있으며, 이를 실행하기 위해서는 executors에게 일(tasks)을 나누어 줘야 함

 - executors에서 작업이 끝나면 다시 driver에게 결과를 리턴


    <-> Worker(executor)

driver(sparkContext) <-> Master(cluster manager) <-> Worker(executor)

    <-> Worker(executor)



map함수

def sub(value):

return (value - 1)


subRDD = xrangeRDD.map(sub)


print "원본 데이터: ", xrangeRDD.glom().collect()

print "sub 데이터: ", subRDD.glom().collect()


결과 창

원본 데이터 : [[1,], [2], [3], [4], [5], ... [8, 9]]

sub 데이터 : [[0], [1], [2], [3], [4], ... [7, 8]]


flatMap 함수

list of list는 list로 풀어서 생성

tuple경우 key, value를 모두 분해하여 list로 생성


wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']

wordsRDD = sc.parallelize(wordsList, 4)


RDDMap = wordsRDD.map(lambda x: (x, x + 's'))

RDDFlatMap = wordsRDD.flatMap(lambda x: (x, x + 's'))


print RDDMap.collect()

print RDDFlatMap.collect()

print RDDMap.count()

print RDDFlatMap.count()


결과창

[{'cat', 'cats'}, {'elephant', 'elephants'}, {'rat', 'rats'}, '{rat', 'rats'}, {'cat', 'cats'}]

['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']

5

10


filter 함수 적용

filter함수에는 true/false를 리턴하는 함수를 넘겨야 하며, 모든 데이터에 대해 true인 값들만 유지


def five(value):

if(value < 5):

return True

else:

return False;


filteredRDD = subRDD.filter(five)

filteredRDD2 = subRDD.filter(lambda x: x>5)


print "sub 데이터: ", subRDD.glom().collect()

print "Partition별 5보다 작은 데이터", filteredRDD.glom().collect()

print "Partition별 5보다 큰 데이터", filteredRDD2.glom.collect()

print "filteredRDD Partition 개수 : ", filteredRDD.getNumPartition()

print "filteredRDD2 Partition 개수 : ", filteredRDD2.getNumPartitions()


결과창

sub 데이터 : [[0], [1], [2], [3], [4], [5], [6], [7, 8]]

Partition별 5보다 작은 데이터 [[0], [1], [2], [3], [4], [], [], []]

Partition별 5보다 큰 데이터 [[], [], [], [], [], [], [6], [7, 8]]

filteredRDD Partition개수 : 8

filteredRDD2 Partition개수 : 8



wordCount

first() : 첫 번째 파티션의 첫번쨰 element만 가져옴

take(n): 첫 번째 파티션부터 n개수만큼 리턴

 - n이 첫 번째 파티션개수보다 많으면 2번째 파티션으로 부터 가져옴

takeOrdered(n): 오름차순으로 n개 리턴

 - tuple일 경우, key값 기준으로 오름차순

top(n): 내림차순으로 정렬하여 n개 리턴

 - tuple일 경우, key값 기준으로 내림차순



파일 변수에 저장

text = sc.textFile("file:/home/nation909/dev/Apps/spark-2.1.0-bin-hadoop2.7/sample.txt", 4)

>>> text.glom().collect()


reduce 함수


>>> data = range(1,11)

>>> data

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


>>> data1 = sc.parallelize(data,4)

>>> data1.glom().collect()

[[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]] 


>>> data1.reduce(lambda a,b : a+b)
55
>>> data1.reduce(lambda a, b : a - b)
21


>>> data2 = sc.parallelize(data,2)
>>> data2.glom().collect()
[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], []]

>>> data2.reduce(lambda a,b : a+b)
55
>>> data2.reduce(lambda a, b : a - b)
-53

즉, 리듀스는 +나 *할때만 사용가능(-나 /를 할 경우 파티션에 따라 값이 다르게 나옴)

takeSample 함수

withReplacement: 중복허용여부
num: sample 개수
seed: 값을 줄 경우, 항상 같은 samples 추출

중복 허용
>>> data1.takeSample(withReplacement=True, num=6)
[4, 9, 3, 9, 1, 3]

중복 불가능

>>> data1.takeSample(withReplacement=False, num=6)

[4, 2, 5, 6, 8, 7]


see를 줄경우 항상 값은 값을 추출
>>> data1.takeSample(withReplacement=True, num=6, seed=500)
[7, 9, 7, 7, 10, 8]
>>> data1.takeSample(withReplacement=True, num=6, seed=500)
[7, 9, 7, 7, 10, 8]

>>> data1.takeSample(withReplacement=False, num=6, seed=500)
[1, 3, 6, 4, 7, 10]
>>> data1.takeSample(withReplacement=False, num=6, seed=500)
[1, 3, 6, 4, 7, 10]

reduceByKey, GroupByKey 함수 (groupByKey는 비추천 함)
- 항상 key값이 있어야 함


reduceByKey

- 같은 노드의 같은 키 값 기준으로 values를 미리 병합

- 셔플링할때, 네트워크의 부하를 줄여줌


groupByKey

- 특별한 작업없이 모든 페어데이터들이 키 값을 기준으로 셔플링 일어남

- 네트워크의 부하가 많이 생김

- 하나의 key값이 많은 데이터가 몰릴 경우 out of memory발생 가능



반응형