그림 발췌

https://blogs.msdn.microsoft.com/bigdatasupport/2015/09/14/understanding-sparks-sparkconf-sparkcontext-sqlcontext-and-hivecontext/

Cluster에서 운영

- 분산 모드에서 스파크는 하나의 Driver Program과 여러 Worker Node, Master/Slave 구조를 가진다.

DriverWorker NodeExecutor와 통신하는데, Driver는 자신만의 자바 프로세스에서 돌아가며, Executor 또한 독립된 자바 프로세스다.

- 하나의 Spark ApplicationCluster Manager라고 불리는 외부 서비스를 사용하여 여러 개의 머신에서 실행 된다.



- Driver: DriverMain Method가 실행되는 프로세스로, SparkContext를 생성하고 RDD를 만들고, TransformationAction을 실행하는 사용자 코드를 실행하는 프로세스

1) 사용자 프로그램을 Task로 변환

2) Executor에서 Task들의 Scheduling


- Executor: Spark Task를 실행하는 작업 프로세스. Spark application 실행 시 최초 한 번 실행되며, 대개 application이 끝날 때까지 계속 동작하지만, 오류로 죽더라도 Spark application은 계속 실행함

1) Application을 구성하는 작업들을 실행하여, 드라이버에 결과를 리턴

2) Executor 안에 존재하는 블록 매니저라는 서비스를 통해 사용자 프로그램에서 Cache하는 RDD를 저장하기 위한 메모리 저장소 제공


- Cluster Manager: Spark와 붙이거나 뗄 수 있는 컴포넌트. (Yarn, mesos, 내장 Manager 등 다양한 외부 매니저들 위에서 동라가게)


Program 실행하기

- 어떤 Cluster Manager를 사용하든 Spark는 사용자 프로그램을 Sparksubmit 할 수 있는  단일 스크립트인 Spark-submit을 제공

$ ./bin/spark-submit <script.py>  //localSpark program 실행

$ ./bin/spark-submit --master spark://host  :7077 --executor-memory 10g <script.py>

--master flag는 접속할 cluster의 주소를 지정해주는데, spark:// URLSpark의 단독 모드를 사용한 cluster 의미


- Spark-submit –master flag

1) spark://host:port: spark 단독 클러스터의 지정 포트 접속 (default: 7077)

2) memos://host:port: mesos cluster에 지정 포트 접속 (default: 5050)

3) yarn: yarn cluster 접속 (hadoop directory -> HADOOP_CONF_DIR 환경 변수 설정 필요)

4) local: 로컬 모드에서 싱글 코어로 실행

5) local[N]: N개코어로 로컬모드에서 실행

6) local[*]: 로컬모드에서 머신이 가지고 있는 만큼의 코어로 실행


- executor-memory <MEM>: executor 당 메모리 할당


- 자세한 내용은 spark-submit --help  참고

'기타 > 분산 컴퓨팅' 카테고리의 다른 글

Spark Accumulator  (0) 2017.04.07
Spark Data 불러오기/저장하기  (0) 2017.04.07
SparkContext, Reduce/Group By Key  (0) 2017.04.07
Spark Pair RDD 개념  (0) 2017.04.07
Spark RDD 개념 및 예제  (0) 2017.04.07

본 내용은 러닝스파크 책 참고


Accumulator

- Sparkmap()이나 조건 지정을 위해 filter()에 함수를 넘길 때, 보통 Driver Program에 정의된 변수를 사용

(Driver ProgramWorker Node) - 해당 내용은 kkn1220.tistory.com/131에서 참고


- 하지만, 클러스터에서 실행 중인 각각의 작업들은 변수의 복사본을 받아 작업하게 되므로, 업데이트된 내용이 다시 드라이버 프로그램으로 리턴 해주지는 않는다이에 Accumulator 변수는 이러한 제한을 풀어준다.


- Accumulator: Worker Node에서 Driver Program으로 보내는 값의 집합 연산에  대해 문법 제공. 가장 흔한 사용 방법 중 하나는, 작업 수행 중에 발생하는 일에 대한 개수를 디버깅 목적으로 헤아림

-> Worker node에 복사본을 전달하고, 어플리케이션 종료 시점에 각 worker node로부터 수집하여 결과를 생성


- 라인수 세기 예제

- 해당 예제에서 Transformation 계산이 끝난 후, 카운터의 값을 출력

- 올바른 값은 반드시 saveAsTextFile이 실행된 다음에 얻을 수 있음. 그 위의 map()lazy execution 방식을 따르기 때문에 수반되는 accumulator 값 증가는 반드시 map()이 실제로 실행 되는 시점인 saveAs~ 액션이 수행되어야만 강제하기 때문



요약하면, Accumulator는 다음과 같이 동작함

1) Driver Program에서 SparkContext.accumulator(IV) 메소드를 호출하여 초기 값을 가진 Accumulator 를 만든다.

2) 반환 타입은 Accumulator[T] 객체이며, T는 초기 값의 타입이다.

3) Spark Closure의 작업 노드 코드에서 Accumulator+= 메소드를 써서 값을 더한다.

4) Driver Program에서 value 속성을 불러 Accumulator 값에 접근한다.


여기에서 Worker nodetask에서는 AccumulatorValue()에 접근하지 못하는 것을 유념! 즉 이 task의 관점에

Accumulator쓰기 전용변수인 셈. 이는 Accumulator가 매 업데이트마다 통신할 필요가 없도록 효율적으로 구되어 있기 때문

'기타 > 분산 컴퓨팅' 카테고리의 다른 글

Spark Cluster 운영  (0) 2017.04.07
Spark Data 불러오기/저장하기  (0) 2017.04.07
SparkContext, Reduce/Group By Key  (0) 2017.04.07
Spark Pair RDD 개념  (0) 2017.04.07
Spark RDD 개념 및 예제  (0) 2017.04.07

본문의 내용은 러닝스파크 책 참고


Data불러오기/저장하기

- 스파크에서는 Text file, JSON, CSV, object file 등 다양한 파일 포맷을 지원

1) Text파일

- 불러오기

- <변수명> = sc.textFile(<file_path>)

새로운 RDD 생성 (Python단어라인 추출하기)


- 저장하기

- result.saveAsTextFile(<output_file_name>)


2) JSON

- 불러오기 

가장 간단한 방법은 데이터를 Text File로 불러온 뒤, JSON parser를 사용하여 값들을 매핑


데이터 샘플


- 저장하기 //팬더를 좋아하는 사람으로 필터

result.saveAsTextFile(<output_file_name>)  



3) CSV (Comma Separated Value)

-데이터 샘플

- 10,539 줄의 Traffic

- Comma로 구분이 안되어 있어 , 변환 작업


-데이터 불러오기 (TextFile로 불러오기)


- 전체적으로 CSV 불러오기


- 저장하기


'기타 > 분산 컴퓨팅' 카테고리의 다른 글

Spark Cluster 운영  (0) 2017.04.07
Spark Accumulator  (0) 2017.04.07
SparkContext, Reduce/Group By Key  (0) 2017.04.07
Spark Pair RDD 개념  (0) 2017.04.07
Spark RDD 개념 및 예제  (0) 2017.04.07

+ Recent posts