2016.04.26

"긴장하라 스파크와 스톰", 떠오르는 아파치 에이펙스

Ian Pointer | InfoWorld
데이터토렌트(DataTorrent)의 RTS 플랫폼에서 파생된 새로운 오픈소스 스트리밍 분석 솔루션인 에이펙스(Apex)는 놀라운 속도와 간소한 프로그래밍 기능을 제공한다. 한번 살펴보자.

스트리밍 분석이라고 하면 대부분의 경우 가장 먼저 스파크(Spark)를 떠올린다. 스파크는 사실 유사 스트리밍 기술이지만 스파크 2.0에 도입된 정형화된 스트리밍(Structured Streaming) 기능은 큰 진전이다. 또 하나 떠오르는 이름은 진정한 스트리밍 솔루션인 스톰(Storm)이다. 현재 버전 1.0이며 사용하기 어렵다는 평을 극복하기 위해 애쓰는 중이다.

이 스파크와 스톰 두 가지 외에 2015년 6월 처음 등장한 아파치 에이펙스(Apache Apex)도 있다. 에이펙스는 코어 프로세싱 엔진, 다양한 대시보드와 입수(ingesting) 및 모니터링 도구, 그리고 데이터 과학자를 위한 그래픽 방식의 흐름 기반 프로그래밍 시스템인 dt어셈블(dtAssemble)을 포함하는, 데이터토렌트와 그 강력한 RTS 플랫폼에서 파생된 솔루션이다.

RTS는 스파크만큼의 큰 인기는 없지만 GE와 캐피털 원(Capital One)의 실무 환경에 구축됐으며 초당 수십억 건의 이벤트를 처리하고 20ms 이내에 이벤트에 응답할 정도로 확장이 가능하다는 점을 입증했다.

RTS 플랫폼의 중심에 위치하는 처리 엔진인 에이펙스는 데이터토렌트가 아파치에 제공한 것이다. 에이펙스는 기존 하둡 생태계에서 실행되도록 설계됐으며 YARN을 사용해 필요에 따라 확장 또는 축소되고, 내결함성(fault tolerance)을 위해 HDFS를 활용한다. 완전한 RTS 플랫폼의 다양한 부가 기능들을 제공하지는 않지만 데이터 처리 플랫폼에서 사람들이 기대할 법한 대부분의 주요 기능을 제공한다.

에이펙스 애플리케이션 샘플
아주 기본적인 에이펙스 파이프라인을 살펴보면서 핵심 개념을 파악해 보자. 이 예시를 통해 카프카(Kafka)의 로깅 라인을 읽으면서 로그 라인 형식의 수를 확인하고 이 수를 콘솔에 쓸 것이다. 이번 기고에서 코드의 일부를 소개하겠지만 깃허브에서 완전한 애플리케이션을 찾을 수 있다.

에이펙스의 핵심 개념은 입력을 받고 출력을 생성하는 메소드(methods)를 구현하는 자바 클래스인 연산자(operator)다(스톰에 익숙하다면 볼트 및 스파우트 개념과 비슷하다고 느낄 것이다). 또한 각 연산자는 데이터 입력 또는 출력을 위한 포트 집합을 정의한다. 메서드는 InputPort에서 입력을 읽거나 OutportPort를 통해 다운스트림으로 데이터를 전송한다.

연산자를 통한 데이터 흐름은 스트림을 시간 기반 데이터 윈도우로 분할하는 방식으로 모델링되지만 스파크의 마이크로배칭(microbatching)과 달리 입력 데이터 처리를 위해 윈도우의 끝까지 기다릴 필요가 없다.


DataTorrent

다음 예에서는 3개의 연산자가 필요하며 각각의 연산자는 에이펙스가 지원하는 3가지 형식의 연산자에 해당한다. 카프카에서 라인을 읽기 위한 입력 연산자, 로깅 형식을 계산하기 위한 일반(generic) 연산자, 그리고 콘솔에 쓰기 위한 출력 연산자다. 첫 번째와 세 번째의 경우 에이펙스의 말할(Malhar) 라이브러리를 사용하면 되지만, 표시되는 여러 형식의 로깅 수를 계산하기 위해서는 맞춤형 비즈니스 로직을 구현해야 한다.

로그카운터오퍼레이터(LogCounterOperator)를 구현하기 위한 코드는 다음과 같다.

public class LogCounterOperator extends BaseOperator {
private HashMap<String, Integer> counter;
public transient DefaultInputPort<String> input = new DefaultInputPort<String>() {
@Override
public void process(String text) {
String type = text.substring(0, text.indexOf(' '));
Integer currentCounter = counter.getOrDefault(type, 0);
counter.put(type, currentCounter+1);
}
};
public transient DefaultOutputPort<Map<String, Integer>> output = new DefaultOutputPort<>();
@Override
public void endWindow() {
output.emit(counter);
}
@Override
public void setup(OperatorContext context){
counter = new HashMap();
}
}


로그 형식 계산에는 간단한 해쉬맵(HashMap)을 사용하고 연산자를 통한 데이터 흐름 처리에 입력용과 출력용 하나씩, 두 개의 포트를 정의한다. 형식이 지정되므로 호환되지 않는 연산자를 끼워 맞추려고 하면 배포 후 문제가 발생하는 것이 아니라 컴파일 시에 오류가 발생한다. 여기서는 입력과 출력 포트를 하나씩만 정의했지만 입출력을 여러 개 지정하는 것도 가능하다.

일반 연산자의 라이프사이클은 단순하다. 에이펙스는 먼저 초기화가 필요한 부분에서 setup()을 호출한다. 앞서 예에서 setup()은 해쉬맵 생성을 처리한다. 그런 다음 beginWindow()를 호출해서 입력 처리의 새 윈도우/배치가 시작됨을 나타낸 다음 이 윈도우 중에 연산자를 통과해 흐르는 모든 데이터 항목에 대해 process()를 호출한다.

현재 윈도우의 시간이 소진되면 에이펙스는 endWindow()를 호출한다. 윈도우별 로직은 필요 없으므로 베이스오퍼레이터(BaseOperator)에 있는 빈 beginWindow()정의를 그대로 둔다. 다만 매 윈도우의 끝에서는 현재 수를 전송해야 하므로 출력 포트를 통해 해쉬맵을 내보낸다.

한편 오버라이드된 process() 메소드는 로그 라인의 첫 번째 단어를 취해 카운터를 업데이트하는 비즈니스 로직을 처리한다. 마지막으로, 에이펙스가 필요한 정리 작업을 위해 파이프라인을 폐기할 때 호출되는 teardown() 메소드가 있다. 이 예에서는 아무것도 할 필요가 없지만 예시를 위해 HashMap을 비웠다.

연산자를 구축했으므로 이제 파이프라인 자체를 구축할 수 있다. 이번에도 역시 스톰 토폴로지 경험이 있다면 다음의 코드가 친숙할 것이다.

public void populateDAG(DAG dag, Configuration conf) {
KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("KafkaInput", new KafkaSinglePortStringInputOperator());
kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
LogCounterOperator logCounter = dag.addOperator("LogCounterOperator", new LogCounterOperator());
ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator());
dag.addStream("LogLines", kafkaInput.outputPort, logCounter.input);
dag.addStream("Console", logCounter.output, console.input);
}


먼저 DAG, 즉 연산자 노드를 정의한다. 그런 다음 그래프의 가장자리(에이펙스 용어로 "스트림")를 정의한다. 이러한 스트림은 연산자의 출력 포트를 다른 연산자의 입력 포트에 연결한다. 여기서는 카프카를 로그카운터오퍼레이터(LogCounterOperator)에 연결하고 출력 포트를 콘솔아웃풋오퍼레이터(ConsoleOutputOperator)에 연결한다. 그게 전부다. 컴파일해서 애플리케이션을 실행하면 해쉬맵이 표준 출력으로 인쇄되는 것을 볼 수 있다.

{INFO=1}
{ERROR=1, INFO=1}
{ERROR=1, INFO=2}
{ERROR=1, INFO=2, DEBUG=1}


이런 식이다.

말할(Malhar), 유용한 블록 상자
연산자의 좋은 점은 작고 잘 정의된 코드라서 구축하고 테스트하기 쉽다는 데 있다. 레고 블록처럼 서로 잘 결합된다. 대부분의 경우 자체 레고 블록을 만들어야 할 필요도 없다.

이는 말할(Malhar) 덕분이다. 레고로 치면 말할은 표준 2x4 블록부터 아주 가끔씩만 필요한 힌지 블록까지 모든 블록이 들어가 있는 거대한 레고 바구니라고 할 수 있다. 스플렁크(Splunk)에서 값을 읽고, 이 정보를 FTP 사이트에 저장된 텍스트 파일의 정보와 결합한 다음, 그 결과를 H베이스(HBase)에 저장해야 하는가? 말할을 사용하면 다 된다.

에이펙스가 매력적인 이유는 말할에 이렇게 방대한 연산자가 포함된 덕분에 사용하는 입장에서는 대부분의 경우 비즈니스 로직에 대해서만 생각하면 된다는 데 있다. 말할 연산자에 대한 문서는 그다지 많지 않지만 리포지토리의 거의 모든 항목에는 테스트가 포함되어 있으므로 어떻게 작동하는지 간편히 확인해볼 수 있다.

에이펙스에는 몇 가지 기능이 더 있다. 일반적인 메트릭 및 보고 외에 dtCli 애플리케이션(dtCliapplication)을 사용하면 제출된 애플리케이션을 런타임에 동적으로 변경할 수 있다. 전체 애플리케이션을 내릴 필요 없이 로깅 라인을 HDFS에 쓰는 연산자를 추가하고 싶은가? 다른 DAG 기반 시스템과 달리 에이펙스에서는 그게 가능하다.

오픈소스 데이터 스트림 처리 엔진은 많지만 그 가운데서도 에이펙스는 돋보이는 주자다. 폭넓은 커넥터(connectors)를 제공하는 말할 라이브러리가 있고 에이펙스 자체도 내결함성과 낮은 지연, 안정성을 위한 튼튼한 기반을 제공하므로 손쉽게 실무에 바로 적용이 가능하다. 한 가지 유의해야 할 점은 연산자 개념은 플링크(Flink)와 스파크의 고수준 구조보다는 저수준의 작동부에 조금 더 가깝다는 점이다.

개발자가 기존 프레임워크의 애플리케이션을 더 쉽게 이식할 수 있도록 데이터토렌트가 아파치 빔(Apache Beam)을 위한 아파치 러너(Apex runner)를 구현해주면 좋을 것 같다. 어쨌든 스트리밍 데이터 처리 엔진을 고려한다면 에이펙스도 한 번 살펴볼 것을 권한다. editor@itworld.co.kr


2016.04.26

"긴장하라 스파크와 스톰", 떠오르는 아파치 에이펙스

Ian Pointer | InfoWorld
데이터토렌트(DataTorrent)의 RTS 플랫폼에서 파생된 새로운 오픈소스 스트리밍 분석 솔루션인 에이펙스(Apex)는 놀라운 속도와 간소한 프로그래밍 기능을 제공한다. 한번 살펴보자.

스트리밍 분석이라고 하면 대부분의 경우 가장 먼저 스파크(Spark)를 떠올린다. 스파크는 사실 유사 스트리밍 기술이지만 스파크 2.0에 도입된 정형화된 스트리밍(Structured Streaming) 기능은 큰 진전이다. 또 하나 떠오르는 이름은 진정한 스트리밍 솔루션인 스톰(Storm)이다. 현재 버전 1.0이며 사용하기 어렵다는 평을 극복하기 위해 애쓰는 중이다.

이 스파크와 스톰 두 가지 외에 2015년 6월 처음 등장한 아파치 에이펙스(Apache Apex)도 있다. 에이펙스는 코어 프로세싱 엔진, 다양한 대시보드와 입수(ingesting) 및 모니터링 도구, 그리고 데이터 과학자를 위한 그래픽 방식의 흐름 기반 프로그래밍 시스템인 dt어셈블(dtAssemble)을 포함하는, 데이터토렌트와 그 강력한 RTS 플랫폼에서 파생된 솔루션이다.

RTS는 스파크만큼의 큰 인기는 없지만 GE와 캐피털 원(Capital One)의 실무 환경에 구축됐으며 초당 수십억 건의 이벤트를 처리하고 20ms 이내에 이벤트에 응답할 정도로 확장이 가능하다는 점을 입증했다.

RTS 플랫폼의 중심에 위치하는 처리 엔진인 에이펙스는 데이터토렌트가 아파치에 제공한 것이다. 에이펙스는 기존 하둡 생태계에서 실행되도록 설계됐으며 YARN을 사용해 필요에 따라 확장 또는 축소되고, 내결함성(fault tolerance)을 위해 HDFS를 활용한다. 완전한 RTS 플랫폼의 다양한 부가 기능들을 제공하지는 않지만 데이터 처리 플랫폼에서 사람들이 기대할 법한 대부분의 주요 기능을 제공한다.

에이펙스 애플리케이션 샘플
아주 기본적인 에이펙스 파이프라인을 살펴보면서 핵심 개념을 파악해 보자. 이 예시를 통해 카프카(Kafka)의 로깅 라인을 읽으면서 로그 라인 형식의 수를 확인하고 이 수를 콘솔에 쓸 것이다. 이번 기고에서 코드의 일부를 소개하겠지만 깃허브에서 완전한 애플리케이션을 찾을 수 있다.

에이펙스의 핵심 개념은 입력을 받고 출력을 생성하는 메소드(methods)를 구현하는 자바 클래스인 연산자(operator)다(스톰에 익숙하다면 볼트 및 스파우트 개념과 비슷하다고 느낄 것이다). 또한 각 연산자는 데이터 입력 또는 출력을 위한 포트 집합을 정의한다. 메서드는 InputPort에서 입력을 읽거나 OutportPort를 통해 다운스트림으로 데이터를 전송한다.

연산자를 통한 데이터 흐름은 스트림을 시간 기반 데이터 윈도우로 분할하는 방식으로 모델링되지만 스파크의 마이크로배칭(microbatching)과 달리 입력 데이터 처리를 위해 윈도우의 끝까지 기다릴 필요가 없다.


DataTorrent

다음 예에서는 3개의 연산자가 필요하며 각각의 연산자는 에이펙스가 지원하는 3가지 형식의 연산자에 해당한다. 카프카에서 라인을 읽기 위한 입력 연산자, 로깅 형식을 계산하기 위한 일반(generic) 연산자, 그리고 콘솔에 쓰기 위한 출력 연산자다. 첫 번째와 세 번째의 경우 에이펙스의 말할(Malhar) 라이브러리를 사용하면 되지만, 표시되는 여러 형식의 로깅 수를 계산하기 위해서는 맞춤형 비즈니스 로직을 구현해야 한다.

로그카운터오퍼레이터(LogCounterOperator)를 구현하기 위한 코드는 다음과 같다.

public class LogCounterOperator extends BaseOperator {
private HashMap<String, Integer> counter;
public transient DefaultInputPort<String> input = new DefaultInputPort<String>() {
@Override
public void process(String text) {
String type = text.substring(0, text.indexOf(' '));
Integer currentCounter = counter.getOrDefault(type, 0);
counter.put(type, currentCounter+1);
}
};
public transient DefaultOutputPort<Map<String, Integer>> output = new DefaultOutputPort<>();
@Override
public void endWindow() {
output.emit(counter);
}
@Override
public void setup(OperatorContext context){
counter = new HashMap();
}
}


로그 형식 계산에는 간단한 해쉬맵(HashMap)을 사용하고 연산자를 통한 데이터 흐름 처리에 입력용과 출력용 하나씩, 두 개의 포트를 정의한다. 형식이 지정되므로 호환되지 않는 연산자를 끼워 맞추려고 하면 배포 후 문제가 발생하는 것이 아니라 컴파일 시에 오류가 발생한다. 여기서는 입력과 출력 포트를 하나씩만 정의했지만 입출력을 여러 개 지정하는 것도 가능하다.

일반 연산자의 라이프사이클은 단순하다. 에이펙스는 먼저 초기화가 필요한 부분에서 setup()을 호출한다. 앞서 예에서 setup()은 해쉬맵 생성을 처리한다. 그런 다음 beginWindow()를 호출해서 입력 처리의 새 윈도우/배치가 시작됨을 나타낸 다음 이 윈도우 중에 연산자를 통과해 흐르는 모든 데이터 항목에 대해 process()를 호출한다.

현재 윈도우의 시간이 소진되면 에이펙스는 endWindow()를 호출한다. 윈도우별 로직은 필요 없으므로 베이스오퍼레이터(BaseOperator)에 있는 빈 beginWindow()정의를 그대로 둔다. 다만 매 윈도우의 끝에서는 현재 수를 전송해야 하므로 출력 포트를 통해 해쉬맵을 내보낸다.

한편 오버라이드된 process() 메소드는 로그 라인의 첫 번째 단어를 취해 카운터를 업데이트하는 비즈니스 로직을 처리한다. 마지막으로, 에이펙스가 필요한 정리 작업을 위해 파이프라인을 폐기할 때 호출되는 teardown() 메소드가 있다. 이 예에서는 아무것도 할 필요가 없지만 예시를 위해 HashMap을 비웠다.

연산자를 구축했으므로 이제 파이프라인 자체를 구축할 수 있다. 이번에도 역시 스톰 토폴로지 경험이 있다면 다음의 코드가 친숙할 것이다.

public void populateDAG(DAG dag, Configuration conf) {
KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("KafkaInput", new KafkaSinglePortStringInputOperator());
kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
LogCounterOperator logCounter = dag.addOperator("LogCounterOperator", new LogCounterOperator());
ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator());
dag.addStream("LogLines", kafkaInput.outputPort, logCounter.input);
dag.addStream("Console", logCounter.output, console.input);
}


먼저 DAG, 즉 연산자 노드를 정의한다. 그런 다음 그래프의 가장자리(에이펙스 용어로 "스트림")를 정의한다. 이러한 스트림은 연산자의 출력 포트를 다른 연산자의 입력 포트에 연결한다. 여기서는 카프카를 로그카운터오퍼레이터(LogCounterOperator)에 연결하고 출력 포트를 콘솔아웃풋오퍼레이터(ConsoleOutputOperator)에 연결한다. 그게 전부다. 컴파일해서 애플리케이션을 실행하면 해쉬맵이 표준 출력으로 인쇄되는 것을 볼 수 있다.

{INFO=1}
{ERROR=1, INFO=1}
{ERROR=1, INFO=2}
{ERROR=1, INFO=2, DEBUG=1}


이런 식이다.

말할(Malhar), 유용한 블록 상자
연산자의 좋은 점은 작고 잘 정의된 코드라서 구축하고 테스트하기 쉽다는 데 있다. 레고 블록처럼 서로 잘 결합된다. 대부분의 경우 자체 레고 블록을 만들어야 할 필요도 없다.

이는 말할(Malhar) 덕분이다. 레고로 치면 말할은 표준 2x4 블록부터 아주 가끔씩만 필요한 힌지 블록까지 모든 블록이 들어가 있는 거대한 레고 바구니라고 할 수 있다. 스플렁크(Splunk)에서 값을 읽고, 이 정보를 FTP 사이트에 저장된 텍스트 파일의 정보와 결합한 다음, 그 결과를 H베이스(HBase)에 저장해야 하는가? 말할을 사용하면 다 된다.

에이펙스가 매력적인 이유는 말할에 이렇게 방대한 연산자가 포함된 덕분에 사용하는 입장에서는 대부분의 경우 비즈니스 로직에 대해서만 생각하면 된다는 데 있다. 말할 연산자에 대한 문서는 그다지 많지 않지만 리포지토리의 거의 모든 항목에는 테스트가 포함되어 있으므로 어떻게 작동하는지 간편히 확인해볼 수 있다.

에이펙스에는 몇 가지 기능이 더 있다. 일반적인 메트릭 및 보고 외에 dtCli 애플리케이션(dtCliapplication)을 사용하면 제출된 애플리케이션을 런타임에 동적으로 변경할 수 있다. 전체 애플리케이션을 내릴 필요 없이 로깅 라인을 HDFS에 쓰는 연산자를 추가하고 싶은가? 다른 DAG 기반 시스템과 달리 에이펙스에서는 그게 가능하다.

오픈소스 데이터 스트림 처리 엔진은 많지만 그 가운데서도 에이펙스는 돋보이는 주자다. 폭넓은 커넥터(connectors)를 제공하는 말할 라이브러리가 있고 에이펙스 자체도 내결함성과 낮은 지연, 안정성을 위한 튼튼한 기반을 제공하므로 손쉽게 실무에 바로 적용이 가능하다. 한 가지 유의해야 할 점은 연산자 개념은 플링크(Flink)와 스파크의 고수준 구조보다는 저수준의 작동부에 조금 더 가깝다는 점이다.

개발자가 기존 프레임워크의 애플리케이션을 더 쉽게 이식할 수 있도록 데이터토렌트가 아파치 빔(Apache Beam)을 위한 아파치 러너(Apex runner)를 구현해주면 좋을 것 같다. 어쨌든 스트리밍 데이터 처리 엔진을 고려한다면 에이펙스도 한 번 살펴볼 것을 권한다. editor@itworld.co.kr


X