본문 바로가기

Big Data/Storm

Apache Storm #2 (WordCount 샘플 Topology 만들기)

Apache Storm

WordCount 샘플 Topology 만들기





1. 프로젝트 생성

IDE로 IntelliJ를 사용하여 Java Maven 프로젝트를 생성한다. Java 버젼은 1.8로 선택했다. 



프로젝트가 만들어지면 pom.xml에서 storm-core 디펜던시를 추가한다. 



이렇게 하면 Storm 코드를 local에서 테스트하기 위한 모든 준비가 끝이다.



2. WordCounting 샘플


Word Counting은 가장 많이 사용되는 샘플이다. Storm으로 해당 샘플을 작성해본다.


2.1 Spout 


Spout는 데이터를 읽어오는 역할을 한다. 스톰은 Tuple 형식으로 데이터를 다룬다.

아래 작성한 SentenceSpout는 5줄의 미리 작성한 text를 연속적으로 계속 생산하는 역할을 한다. BaseRushSpout를 상속하여 기본 메소드를 정의하면 된다. open() 에서 Collector를 세팅하고, declareOutputField()에서 output 데이터를 설정한다. nextTuple()이 실제 Tuple을 생성하는 메소드다. 





2.2 Bolt

Spout로 계속 만들어지는 문장의 단어들을 카운팅하기 위해서는 여러 operation이 필요할 것이다.

첫번째로 문장을 단어로 바꿔주는 Bolt와 그 단어들을 카운팅하는 Bolt. 그리고 그것을 출력하는 Bolt 총 3개를 만들것이다.


2.2.1 SplitBolt

문장을 단어로 변경할 Bolt를 만들자. BaseRichBolt를 상속받아 작성을 한다. SentenceSpout에서 OutputField로 만든 "sentence" 키로 입력을 받아 "word"로 출력하게 될 것이다.

BaseRichSpout에서 필요한 메스드와 유사한 동작을 하는 메소드를 구현하면되는데 prepare()가 spout의 open()과 유사하게 Collector를 정의한다. spout와 동일하게 declareOutputFields()로 출력 데이터를 정의하고, execute()를 통해 입력받은 tuple을 처리하는 구조이다.

간단하기 해당 Bolt에서는 문장을 짤라 단어로 출력을 하고 있다.




2.2.2 WordCountBolt

단어들로 짤라진 데이터를 카운팅하기 위한 Bolt이다. 입력은 "word" 키로 만들어진 Tuple이고 출력은 "word","count"의 단어당 카운팅 횟수를 출력할 것이다. HashMap을 통해 word를 카운팅한 값을 계속 유지하고 카운팅 될때마다 collector로 출력을 한다.




2.2.3 ReportBolt

카운팅된 입력 Tuple을 받아서 화면에 출력하는 Bolt이다. 출력용(출력을 다른 BOlt에서 사용할 일이 없음) 이기 때문에 출력 데이터를 정의 할 필요가 없다. execute() 에서는 단순히 입력받은 word count 값을 HashMap에 저장 유지하고 있을 것이다.

cleanup()은 Bolt가 셧다운될대 실행되는 메소드이다. 이때 최종 카운팅될 워드 카운팅을 출력할 것 이다.




2.3 Topology

만들어놓은 Spout와 Bolt들을 연결하는 Topology를 만들 것 이다. 

TopologyBuilder에 Spout와 Bolt를 연결하고 Cluster에 submit을 하면 실행 할 수 있다.


우선 setSpout로 SentenceSpot를 연결한다. Spout와 Bolt는 setSpout, setBolt 메소드로 Key(String)값과 같이 등록한다. 


SentenceSpout -> SplitBolt -> WordCountBolt -> ReportBolt 구조로 데이터가 흘러가면서 연산이 되게 등록을 한다.( Grouping에 대한 설명은 생략한다. 참조: http://bcho.tistory.com/997 )


10초 후 등록한 Topology를 종료시키면 REportBolt에서 cleanup에서 작성한 출력이 나올 것이다.




2.4 최종 출력

많은 로그와 함께 아래 그림처럼 워드가 카운팅된 화면이 출력됬다. Spout에서 waiting시간을 5ms로 했는데 이것을 좀더 짧게 설정한다면 10초동안 유입된 데이터가 많아지기 떄문에 카운팅 숫자는 전체적으로 올라갈 것이다.





3. 병렬 분산 처리

위 예제는 모든 Spout, Bolt를 싱글 처리하는 예제다. 이 예제를 병렬 분산 처리되는 Topology로 변경하여 실행해 본다.


setSpout, setBolt 메소드에서 3번쨰 인자로 숫자를 입력할 수 있는데 이것은 실행되는 쓰레드 갯수라고 보면된다. 

SplitBolt를 setNumTasks()는 Task생성 갯수이다. SplitBolt는 4개의 Bolt가 2개의 쓰레드에서 실행된다고 보면 된다.


Config의 SetNumWorkers()를 통해 Work의 갯수를 지정할 수 있다. 







Source Ref.: https://github.com/Minsub/StormStudy


끝!



'Big Data > Storm' 카테고리의 다른 글

Apache Storm #3 (Stream windows API: Trident)  (0) 2016.10.28
Apache Storm #1 (Cluster 환경 구축)  (0) 2016.10.20