메모장
빅데이터 분산 컴퓨팅 정리(19~23강 python spark 관련) 본문
빅데이터 분산 컴퓨팅 정리(19~23강 python spark 관련)
hiandroid 2017. 9. 5. 18:53RDD(Resilient Distributed Dataset)
- Resilient: 메모리 내에서 데이터가 손실되는 경우, 다시 생성할 수 있다.
- Distributed: 클러스터를 통해 메모리에 분산되어 저장된다.
- DataSet: 초기 데이터는 파일을 통해 가져올 수 있다.
- RDD는 스파크에서의 기본적인 데이터의 단위
대부분의 스파크 프로그래밍은 RDD를 통한 동작으로 구성된다.
RDD생성 방법
1. 텍스트파일로 생성
ex.
sc.textFile("myfile.txt")
sc.textFile("mydata/*.log")
sc.textFile("myfile1.txt, myfile2.txt")
2. 메모리에 있는 데이터를 통해 생성
ex.
num = [1,2,3,4]
rdd = sc.parallelize(num)
parallelize로 생성
3. RDD를 통해 생성
>>> newRDD = sc.textFile("text.txt")
>>> newRDD_uc = newRDD.map(lambda line: \ line.upper()) - line을 읽어서 전부 대문자로 변경
>>> newRDD_uc.count()
4
RDD함수의 종류
Actions: 값을 리턴(RDD의 특정 값을 가져옴)
RDD -> value
Transformations: 현재의 것에 기초하여 새로운 RDD를 정의
Base RDD -> New RDD
RDD에 있는 데이터는 절대 바꿀수 없다.
주요 Transformation 함수
map(function): 주어진 RDD의 각 레코드(라인) 별로 기능을 수행하여 새로운 RDD를 생성
ex.
>>> newRDD = sc.textFile("text.txt")
>>> newRDD_uc = newRDD.map(lambda line: \ line.upper()) - line을 읽어서 전부 대문자로 변경
filter(function): 주어진 RDD를 라인(레코드)별로 조건에 맞는 라인으로 새로운 RDD를 생성
ex.
>>> newRDD = sc.textFIle("text.txt")
>>> newRDD.count()
4
>>> newRDD_ft = newRDD.filter(lambda line: \ line.startswith('T') - line을 읽어서 첫 글자가 T로 시작하는 것만 남김
>>> newRDD_ft.count()
2
flatMap(function): base RDD의 각 라인별 엘리먼트를 각 엘리먼트 단위로 매핑
ex.
>>> sc.textFIle("test.txt") \ .map(lambda line: line.split()) \
map일 경우
[ ["time", "can"]
["time","bend"]
["would", "you"] ]
flatMap일 경우
["time", "can", "time", "bend", "would", "you"]
distinct: 중복제거
ex.
>>> sc.textFIle("test.txt") \ .flatMap(lambda line: line.split()).distinct()
flatMap만 할 경우
["time", "can", "time", "bend", "would", "you"]
distinct할 경우
["time", "can", "bend", "would", "you"]
주요 Action 함수
count(): RDD의 요소의 개수를 반환
ex.
>>> newRDD = sc.textFIle("test.txt")
>>> newRDD.count()
4
take(n): RDD의 첫번째 요소부터 n개의 요소를 리스트로 반환
ex.
>>> newRDD = sc.textFile("test.txt")
>>> newRDD take(2) - newRDD의 1,2번째를 리스트로 반환시킴
[ u`Time can bring you down',
u`Time can bend your knees'
]
collect(n): RDD의 모든 요소를 반환
ex.
>>> newRDD = sc.textFIle("test.txt")
>>> newRDD.collect()
[u" Time can bring you donw",
u' Time can bend your knees',
u' Would you know my name',
u" If I saw you in heaven"]
- 앞의 u는 유니코드를 뜻함
saveAsTextFile(path): RDD를 파일로 저장
ex.
>>> newRDD = sc.textFIle("test.txt")
>>> newRDD_ft = newRDD.filter(lambda line: \ linestartswith('T'))
>>> newRDD_ft.saveAsTextFle("output")
- T로 시작하는 라인만 가져와서 output이라는 파일로 저장
Lazy Execution
- 맨 나중에 실행됨(action이 되기전에는 기본 틀만 만들고 action이 되면 그때부터 실질적 작업 수행)
- RDD의 데이터는 action함수로 인한 작업이 수행 될 때까지, 처리되지 않음
체인형식
>>> sc.textFIle("test.txt").map(lambda line: line.upper()) \ .filter(lambda line: line.startswith('T')).count()
2
여러번 사용할 경우
def toUpper(s):
return s.upper()
newRDD = sc.textFile("test.txt")
newRDD = map(toUpper).take(2)
한번만 사용할 경우
newRDD.map(lambda line: line.upper()).take(2)
Spark context
- spark에서는 driver와 executors사이에 통신이 일어남
- driver는 spark job을 가지고 있으며, 이를 실행하기 위해서는 executors에게 일(tasks)을 나누어 줘야 함
- executors에서 작업이 끝나면 다시 driver에게 결과를 리턴
<-> Worker(executor)
driver(sparkContext) <-> Master(cluster manager) <-> Worker(executor)
<-> Worker(executor)
map함수
def sub(value):
return (value - 1)
subRDD = xrangeRDD.map(sub)
print "원본 데이터: ", xrangeRDD.glom().collect()
print "sub 데이터: ", subRDD.glom().collect()
결과 창
원본 데이터 : [[1,], [2], [3], [4], [5], ... [8, 9]]
sub 데이터 : [[0], [1], [2], [3], [4], ... [7, 8]]
flatMap 함수
list of list는 list로 풀어서 생성
tuple경우 key, value를 모두 분해하여 list로 생성
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
RDDMap = wordsRDD.map(lambda x: (x, x + 's'))
RDDFlatMap = wordsRDD.flatMap(lambda x: (x, x + 's'))
print RDDMap.collect()
print RDDFlatMap.collect()
print RDDMap.count()
print RDDFlatMap.count()
결과창
[{'cat', 'cats'}, {'elephant', 'elephants'}, {'rat', 'rats'}, '{rat', 'rats'}, {'cat', 'cats'}]
['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']
5
10
filter 함수 적용
filter함수에는 true/false를 리턴하는 함수를 넘겨야 하며, 모든 데이터에 대해 true인 값들만 유지
def five(value):
if(value < 5):
return True
else:
return False;
filteredRDD = subRDD.filter(five)
filteredRDD2 = subRDD.filter(lambda x: x>5)
print "sub 데이터: ", subRDD.glom().collect()
print "Partition별 5보다 작은 데이터", filteredRDD.glom().collect()
print "Partition별 5보다 큰 데이터", filteredRDD2.glom.collect()
print "filteredRDD Partition 개수 : ", filteredRDD.getNumPartition()
print "filteredRDD2 Partition 개수 : ", filteredRDD2.getNumPartitions()
결과창
sub 데이터 : [[0], [1], [2], [3], [4], [5], [6], [7, 8]]
Partition별 5보다 작은 데이터 [[0], [1], [2], [3], [4], [], [], []]
Partition별 5보다 큰 데이터 [[], [], [], [], [], [], [6], [7, 8]]
filteredRDD Partition개수 : 8
filteredRDD2 Partition개수 : 8
wordCount
first() : 첫 번째 파티션의 첫번쨰 element만 가져옴
take(n): 첫 번째 파티션부터 n개수만큼 리턴
- n이 첫 번째 파티션개수보다 많으면 2번째 파티션으로 부터 가져옴
takeOrdered(n): 오름차순으로 n개 리턴
- tuple일 경우, key값 기준으로 오름차순
top(n): 내림차순으로 정렬하여 n개 리턴
- tuple일 경우, key값 기준으로 내림차순
파일 변수에 저장
text = sc.textFile("file:/home/nation909/dev/Apps/spark-2.1.0-bin-hadoop2.7/sample.txt", 4)
>>> text.glom().collect()
reduce 함수
>>> data = range(1,11)
>>> data
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> data1 = sc.parallelize(data,4)
>>> data1.glom().collect()
[[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]]
>>> data1.takeSample(withReplacement=False, num=6)
[4, 2, 5, 6, 8, 7]
reduceByKey
- 같은 노드의 같은 키 값 기준으로 values를 미리 병합
- 셔플링할때, 네트워크의 부하를 줄여줌
groupByKey
- 특별한 작업없이 모든 페어데이터들이 키 값을 기준으로 셔플링 일어남
- 네트워크의 부하가 많이 생김
- 하나의 key값이 많은 데이터가 몰릴 경우 out of memory발생 가능
'교육(KOCW, 오프라인) > 빅데이터분산컴퓨팅(하둡에코시스템)' 카테고리의 다른 글
빅데이터 분산 컴퓨팅 정리(9~14강 HIVE관련) (0) | 2017.09.05 |
---|---|
빅데이터 분산 컴퓨팅 정리(5~8강 HDFS와 맵리듀스) (0) | 2017.08.30 |
빅데이터 분산 컴퓨팅 정리(3~4강 하둡설치 및 기본 명령어) (0) | 2017.08.30 |
빅데이터 분산 컴퓨팅 정리(1~2강 분산처리시스템과 하둡컨셉) (0) | 2017.08.30 |