
1. 들어가며
요즘 이 책에서 '한글'로 설명하는 내용을 '코드'로 구현해보고 있습니다. 회사에서 업무 일정이 조금 빠듯하고 지칠 때도 있지만, 그래도 최근에 이 책을 정말 재밌게 읽고 있습니다. 내용이 좀 딱딱할 수 있지만 한 줄 한줄이 지금까지 쌓은 인사이트들의 집합이라는 생각을 하면서 추가했습니다. 재밌게 읽으실 수 있기를 바랍니다.
모든 코드는 hwanseok-dev/data-platform에 포함되어 있습니다.
본문에서 다루는 코드는 ingest-2023-11-17 태그를 기준으로 작성되었습니다.
2. 수집 계층이란
데이터 플랫폼에서 수집 계층은 소스 시스템에서 데이터를 가져오는 역할을 합니다. 소스 시스템에서 생성 및 수집된 데이터를 우리의 서버로 가져오는 첫 번째 관문입니다.
3. 소스 시스템에서 수집되는 데이터의 대분류
수집되는 데이터는 크게 두 가지의 형태로 분류할 수 있습니다.
1. 스트리밍 데이터
2. 배치 데이터
스트리밍 데이터는 한 번에 하나의 데이터에 액세스해서 수집하는 형태의 데이터입니다. 배치 데이터는 CSV, JSON, XML 파일처럼 FTP나 별도의 배치 액세스를 통해서 가져오는 데이터를 말합니다. 스트리밍 데이터와 배치 데이터를 각각 StreamPack, BatchPack이라는 클래스로 구현했습니다. StreamPack과 BatchPack은 모두 AbstractPack을 상속합니다. 수집되는 방식의 차이와 별개로 우리의 수집 서버에 도달하려면 반드시 가지고 있어야하는 정보들을 AbstractPack의 필드로 추가했습니다.
3.1. AbstractPack :: 수집되는 모든 데이터에 있어야하는 정보들을 담고 있는 클래스
@Getter
public abstract class AbstractPack {
private final Long time; // 데이터 수집 시간
private final Integer agentId; // 에이전트 식별자
private final String agentName; // 에이전트 이름
private final Map<String, String> tags = new HashMap<>(); // 소스 시스템의 식별자
}
소스 시스템에서 생성된 데이터를 가져오는 주체를 에이전트라고 부릅니다. 에이전트가 데이터를 가져올 때에는 시간(time) 그리고 에이전트를 식별하기 위한 정보(agentId, agentName)를 함께 보내줍니다. 여러개의 에이전트를 통합 관리하기 위해서는 시스템에서 관리하기 편한 int 타입의 식별자(agentId)
그리고 사람이 구분하기 위한 String 타입의 에이전트 이름(agentName)
가 반드시 필요합니다
Agent 1 : 12312312429
Agent 2 : 1492748292
...
이렇게만 구분되어 있으면 수집된 데이터에 문제가 발생했을 때, 어떤 소스 시스템에서 수집된 데이터가 문제인지 직관적으로 알기 어렵습니다.
이 외에도 소스 시스템에 설치된 애플리케이션의 역할, 소스 시스템이 설치된 Region 그리고 Node/Pod/Container 등의 정보를 통해서 식별된 에이전트를 구분해서 관리하기 위한 정보가 필요합니다. 이러한 정보들을 tag라고 명명하고 다양한 형태의 식별자를 공통된 방식으로 처리하기 위해 Map<String, String> 타입으로 처리했습니다.
3.2. StreamPack :: 스트리밍 방식으로 생성, 수집되는 데이터
public class StreamPack extends AbstractPack {
private final Map<String, Object> fields = new HashMap<>(); // 소스 시스템에서 발생한 데이터
}
StreamPack은 tags보다 더욱 범용적인 형태로 데이터를 수집할 수 있도록 Map<String, Object> 타입으로 정의했습니다. 숫자 타입으로 수집되는 모니터링 시계열 데이터, 문자열로 수집되는 errorClass 정보, json 형태로 수집되는 debugLog 정보, 매우 긴 문자열로 수집되는 stackTrace 정보 등을 모두 수용할 수 있도록 Object 타입을 사용해야합니다.
3.3. BatchPack :: 배치방식으로 생성, 수집되는 데이터
public class BatchPack extends AbstractPack {
private String fileName;
private String document;
}
BatchPack은 파일에 저장되어 있는 다양한 형태의 각 line을 담고 있습니다.
// csv 포멧
field1,field2,field3
value01,value02,valu03
value11,value12,value13
// log 파일 포멧
[2023-05-25 14:11:00,444 GMT][INFO ][io.dataplatform.ingest.App.run(15)] System.exit()
[2023-05-25 14:11:00,502 GMT][INFO ][io.undertow.stop(259)] stopping server: Undertow - 2.2.22.Final
4. 데이터 수집하기
이렇게 정의된 StreamPack과 BatchPack을 수집하는 최초의 진입점은 아래와 같이 구현합니다.
@PostMapping("/api/v1/ingest/stream")
public ResponseEntity<Boolean> ingestStream(@RequestBody StreamPack pack) { ... }
@PostMapping("/api/v1/ingest/batch")
public ResponseEntity<Boolean> ingestBatch(@RequestParam(value = "file") MultipartFile file) { ... }
4.1. HttpHeaderUtil :: 에이전트 식별자 조회하기
Controlller에서 가장 먼저 해야하는 역할은 요청정보를 검증하는 것이라 생각합니다. 우리는 agentId, agentName가 항상 수집되는지 검증해야 합니다. 에이전트의 정보는 HttpHeader를 통해서 조회할 수 있도록 설계했습니다.
추후 Application Load Balancer의 도입이 필요할 때 URI, Http Header의 정보만을 사용해서 처리할 수 있도록 하기 위함입니다. payload안에 숨겨진 데이터는 전체를 조회하지 않는 이상 복잡한 처리를 수행하기 어렵습니다. 그리고 에이전트 정보와 에이전트가 수집한 데이터도 데이터의 성격이 다르기 때문에 각각의 방법으로 처리하는 것이 개발 과정에서 복잡성을 낮춰주기도 합니다.
@PostMapping("/api/v1/ingest/stream")
public ResponseEntity<Boolean> ingestStream(@RequestBody StreamPack pack,
HttpServletRequest request) {
Integer agentId = null;
String agentName = null;
String resourceType = null;
try {
agentId = HttpHeaderUtil.getOrThrowAgentId(request);
agentName = HttpHeaderUtil.getOrThrowAgentName(request);
resourceType = HttpHeaderUtil.getOrThrowResourceType(request);
streamPackSerivce.process(agentId, agentName, resourceType, pack);
} ...
}
resourceType은 수집 계층 이후에서 최적화된 처리를 하기 위한 정보입니다. 같은 방식으로 미리 수집해둡니다.
@Service
public class StreamPackService {
public void process(Integer agentId, String agentName, String resourceType, StreamPack pack){
// update pack
pack.updateTime();
pack.updateAgentInfo(agentId, agentName);
pack.putTag("resourceType", resourceType);
...
}
}
위 코드에서는 Service에서 time, agentInfo를 업데이트하고 있습니다. 몇 가지 개선 포인트가 보이지만 우선 빠른 진행을 위해서 다음 단계로 넘어가보겠습니다.
문제점 1 : time, agentInfo를 처리하는 로직은 StreamPack, BatchPack 모두 처리해야하는 공통적이고 반복적이고 반드시 일어나야하는 행위이다. 이 과정을 Service에서 처리하는 것보다 Controller 앞단에서 수행해서 앞으로 작성할 Service의 핵심 로직과 분리가 필요해보인다.
문제점 2 : StreamPack이 항상 agentId, agentName 필드를 가지고 있다. 소스 시스템에서 수집 서버로 매번 같은 데이터가 반복적으로 전송된다. 하루에 수천만번에서 수억번 전송될 수 있는 데이터이기 때문에 좀 더 compact한 방식으로 처리할 수 있는 방법이 필요하다.
4.2. 배치 데이터 처리하기
배치 데이터는 배치 데이터인 이유는 크게 두 가지로 나뉩니다. 데이터가 N분마다 생성되거나, N분마다 수집되거나. 스트림으로 처리하기 어려울만큼 데이터가 많거나, 스트림으로 처리할 기술력/시간이 배치로 처리됩니다. 어쨌든 하나의 파일에 여러개의 데이터가 한 번에 저장되어있기 때문에, 이를 풀어헤지는 작업이 필요합니다.
public void process(Integer agentId, String agentName, String resourceType, MultipartFile file){
// split line by line
String fileName = file.getName();
DecompressionStrategy decompressor = chooseDecompressor(file.getName());
}
배치 데이터를 수집할 때 제일 먼저 해야하는 것은 배치 데이터의 종류를 파악해서 그에 맞는 대응 방법을 결정하는 것입니다.
private DecompressionStrategy chooseDecompressor(String fileName){
if (fileName.endsWith(".gz")) {
return new GzipDecompressionStrategy();
}
return new CommonDecompressionStrategy();
}
아래의 예시에서는 파일 이름으로 GZIPInputStream을 사용해야하는지 여부를 결정합니다.
public class GzipDecompressionStrategy implements DecompressionStrategy{
@Override
public BufferedReader getBufferedReader(InputStream inputStream) throws IOException {
return new BufferedReader(new InputStreamReader(new GZIPInputStream(inputStream)));
}
}
다음으로 파일에 포함된 데이터가 각각 분리되었을 때 어떻게 처리할 것인지를 결정합니다.
private Function<String, BatchPack> chooseBatchPackGenerator(String resourceType, Integer agentId, String agentName, String fileName){
String uppercase = resourceType.toUpperCase();
switch (uppercase) {
case "VPC" :
case "ELB" :
case "CLOUD_FRONT":
case "WAF":
return line -> {
BatchPack batchPack = new BatchPack();
batchPack.updateTime();
batchPack.updateAgentInfo(agentId, agentName);
batchPack.setFileName(fileName);
batchPack.setDocument(line);
batchPack.putResourceType(uppercase);
return batchPack;
};
...
}
}
CSV 포멧인 경우, 각 파일의 첫 번째 라인에 Column 정보가 포함되어 있어 이 정보를 미리 저장해두어야 합니다. JSON 포멧인 경우 JsonParser를 통해서 반정형 데이터를 정형 데이터로 변환할 수 있습니다. 각각의 배치 파일에 맞는 처리 방법을 구분해서 적용해야 합니다.
이렇게 압축해제 방법과 처리 방법을 분리해서 관리하면, 실제로 처리를 수행하는 로직을 매우 간단하고 확장성있게 가져갈 수 있습니다.
try (InputStream is = file.getInputStream();
BufferedReader reader = decompressor.getBufferedReader(is)) {
String line = reader.readLine();
while (line != null) {
// convert line to pack
BatchPack pack = generator.apply(line);
// add to core
BatchPackCore.getInstance().add(pack);
count.increase();
line = reader.readLine();
}
// add meta
BatchIngestCountMeta.getInstance().add(agentId, agentName, resourceType, count.getValue());
}
5. 마치며
수집 계층에서 고려해야할 주요 사항은 다음과 같습니다.
- 스트림/배치 데이터를 구분해서 처리한다.
StreamPack과 BatchPack을 별도의 클래스로 구분해서 별도의 방식으로 처리했습니다. - 데이터를 수집한 에이전트에 대한 식별하는 스트림/배치에 상관없이 공통적으로 처리되어야 한다
time, AgentInfo에 대한 정보는 AbstractPack에서 공통적으로 처리되었습니다. - 스트림/배치 데이터 각각에 대해서는 최대한 범용적인 포멧으로 처리되어야 한다.
에이전트를 식별하기 위한 Tag들은 Map<String, String>으로, StreamPack의 데이터는 Map<String, Object>로, BatchPack의 데이터를 String document로 구현했습니다. 여기서 주목할 점은 소스 시스템 별로 개별적인 수집 클래스를 만들지 않는다는 점입니다. 소스 시스템에서 수집 계층으로 데이터를 가져오기 위해 필요한 데이터 변환 과정이 크지 않도록 해야합니다. 동시에 새로운 소스 시스템이 추가되어도 기존 방식으로 수집되던 파이프라인에는 영향이 없어야 합니다. 예를 들면, 새로운 소스 시스템이 추가되어도 시스템 재시작이 필요없어야 합니다. - 수집 계층을 일급 객체로 구축하는 것이 견고한 아키텍쳐를 만들어줍니다.
배치 파일을 압축 해제하기 위한 DecompressionStrategy, 압축 해제된 각 line을 BatchPack으로 처리하기 위한 Function<String, BatchPack>을 압축을 해제하는 로직에서 분리했습니다. 각 소스시스템에 최적화된 방식를 제공하면서 유지보수의 편리성도 가져가기 위한 방법입니다.
'DEV' 카테고리의 다른 글
Everything is a File (How a storage device is treated as a file) (0) | 2025.04.13 |
---|---|
Java Thread Tutorial | Thread 생성부터 synchronized까지 (0) | 2023.11.30 |