본문 바로가기

Big Data/Flink

Flink 시작하기 #2 Basic API Concept



#DataSet and DataStream
Flink는 크게 Streaming과 Batch 를 처리하는 방식에 따라 사용하는 API가 나뉜다.
DataStream은 Streaming을 처리하기 위한 클래스이고 Batch는 DataSet을 사용한다. 두 클래스를 처리하는 elements가 무한으로 들어오느냐 아니면 끝이 있는 bounded 데이터냐가 가장 큰 차이이다.
두 클래스 모두 immutable한 속성을 가지고 있다. 즉, 변경되지 않는 콜렉션의 개념이다.


#Anatomy of a Flink Program
Flink 프로그램은 다음의 순서로서 동자
  1. Obtain an execution environment: (ExecutionEnviroment를 생성해 DataStream, DataSet을 만들기 위한 준비)
  2. Load/create the initial data, : (data Source를 생성해 input 데이터를 가져옴)
  3. Specify transformations on this data, : (데이터를 변환 및 가공)
  4. Specify where to put the results of your computations : (계산된 결과를 저장하거나 활용)
  5. Trigger the program execution : (주기적으로 프로그램 실행)


#Lazy Evaluation
flink는 lazy evaluation방식을 사용한다. DataStream은 chain 방식으로 데이터를 단계별로 변환 시키는 구조를 가지고 있다.
변환하는 각 메소드를 실행할 때 마다 계산을 수행하지 않는다. Sink 메소드가 실행되는 순간 지정된 transformation들이 실행되는 구조이다.


#Specifying Keys
일부 transformations (join, keyBy, groupBy등) 들은 key 를 가지는 데이터 타입이 필요하다. 하지만 Flink는 key/value가 필수 조건이 아니다.
꼭 key/value로 구분되지 않는 데이터를 사용한다면 가상으로 key를 설정해서 해당 transformation operator를 사용할 수 있다.

key를 지정하는 여러 방식이 있다. 기본적으로 keyBy 메소드를 사용해서 key를 지정한다. 

-순서로 지정
Tuple로 데이터 타입을 지정할 경우 순서로 지정이 가능하다. 위 예제 기준으로 keyBy(0)으로 지정했다면 (Int, String, Long) 중 첫 번째 Int값이 키가 되는 것이다.
val input: DataStream[(IntStringLong)] = // [...]
val keyed = input.keyBy(0)

-필드 익스프레션으로 지정
Java의 POJO 나 scala의 case class를 데이터 타입으로 사용할 경우 필드 이름으로 설정이 가능하다.
case class WC(word: String, count: Int)
 
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word")

-Selector function으로 지정
데이터의 값을 그대로 사용하지 않고 값을 변경하여 key를 지정할 수 있다. KeySelector 인터페이스를 사용하여 지정이 가능하다.
// Java
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> kyed = words
  .keyBy(new KeySelector<WC, String>() {
     public String getKey(WC wc) { return wc.word; }
   });

// Scala
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )

#Specifying Transformation Functions
transformation를 수행하기 위해 function을 지정해야한다. flink는 다양한 지정방식이 있고 언어마다 지정에 대한 스타일이 다르다.

-Java
java는 함수형 언어가 아니기 때문에 scala보다 약간 코드가 길어질수밖에 없다. 하지만 개인적으로는 좀 더 명확하다는 것이 장점이라고 생각한다.

Implementing an interface
가장 일반적인 방법으로 인터페이스를 구현하는 것이다.
class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
});
data.map(new MyMapFunction());
Anonymous classes
이건 Flink에 특성이라기보단 Java의 특성인 익명 클래스를 사용하는 것이다. 재사용이 필요없는 transformation이라면 익명클래스가 더 효율적이다.
data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
});
Java 8 Lambdas
자바는 기본적으로 함수형 언어가 아니지만 1.8이 되면서 람다를 지원하게 되었다. 따라서 Flink에서도 사용이 가능하다. (Java 8 Guide.)
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
Rich function
모든 transformation function은 RichFunction으로 사용이 가능하다. RichFunction은 open, close, getRuntimeContext, setRuntimeContext 네가지 메소드를 추가로 사용할 수 있다. 사용방법은 인터페이스와 같다. 익명함수도 당연히 가능하다.
class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
});
class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
});

data.map(new MyMapFunction());

-Scala
scala는 함수형 언어이기 때문에 flink의 transformation 메소드들과 굉장히 잘 어울린다. 코드를 쉽고 간단하게 지정이 가능하다.

Lambda Functions
scala의 기본 람다 방식을 모두 사용할 수 있다. scala를 사용한다면 가장 일반적인 방식이다.
val data: DataSet[String] = // [...]
data.filter { _.startsWith("http://") }
val data: DataSet[Int] = // [...]
data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }
Rich function
java와 마찬가지로 RichFunction을 사용할 수 있다.
class MyMapFunction extends RichMapFunction[StringInt] {
  def map(in: String):Int = { in.toInt }
})

data.map(new MyMapFunction())

#Supported Data types
flink는 DataStream, DataSet에서 사용하는 elements 타입에 제약이 있다. 
다음은 elements 타입으로 사용할 수 있는 7가지 타입이다.

  1. Java Tuples and Scala Case Classes
  2. Java POJOs
  3. Primitive Types
  4. Regular Classes
  5. Values
  6. Hadoop Writables
  7. Special Types


#Accumulators & Counters
accumulator는 주로 counter를 위해 사용한다. flink는 분산처리 시스템이기 때문에 parallel로 여러 노드에서 실행될 경우 counter를 job단위로 제대로 code에서 구현하기는 어렵다 그래서 accumulator를 사용한다.
기본적으로  flink는 IntCounter, LongCounter, DoubleCounter를 제공하고 있다. 
이외에 custom accumulator를 Accumulator / SimpleAccumulator 인터페이스를 구현하여 작성할 수 있다.


#참조