느리고 큰 외부 API 응답, 스트리밍으로 처리하기
클린 아키텍처로 실제 서비스 만들기
- 01멀티 모듈 프로젝트, 왜 클린 아키텍처로 시작했나
- 02상태 머신을 도메인 객체로 표현하는 방법
- 03Port & Adapter 패턴으로 외부 스토리지 연동하기
- 04Redis Streams로 비동기 분석 파이프라인 설계하기
- 05느리고 큰 외부 API 응답, 스트리밍으로 처리하기← 현재
- 06Dead Letter Queue로 장애를 설계 레벨에서 대비하기
- 07QueryDSL로 복잡한 동적 검색 쿼리 설계하기
- 08프로젝트 회고: 잘한 설계, 아쉬운 설계
문제의 시작: 외부 AI API가 너무 느리다
문서 분석 시스템을 개발하면서 가장 먼저 마주친 현실적인 문제는 외부 AI 분석 API의 응답 특성이었다. 문서를 넘기면 AI가 내용을 분석해서 결과를 돌려주는 구조인데, 이 API는 두 가지 특징을 갖고 있었다.
첫째, 응답이 느리다. 짧게는 5초, 길게는 30초 이상 걸리는 경우도 있었다. AI가 문서를 분석하면서 결과를 점진적으로 생성하는 방식(LLM의 token-by-token 생성)이기 때문이다.
둘째, 응답이 크다. 분석 결과가 길어질 경우 수십 KB에서 수백 KB에 달하는 JSON 응답이 반환된다.
처음에는 단순하게 일반 REST 방식으로 구현했다. RestTemplate을 쓰거나 WebClient로 .block()을 호출해서 응답 전체를 한 번에 받는 방식이었다. 결과는 예상대로였다.
- 응답이 올 때까지 스레드가 블로킹된다
- 응답 전체가 메모리에 올라온 뒤에야 처리를 시작할 수 있다
- 클라이언트 입장에서 "진행 중인지 실패한 것인지" 알 수 없는 침묵이 지속된다
- 타임아웃 설정이 까다롭다 — 너무 짧으면 정상 응답도 끊고, 너무 길면 장애 감지가 늦다
이 문제를 해결하기 위해 스트리밍 방식으로 전환하기로 결정했다.
일반 REST vs 스트리밍: 어떻게 다른가
일반 REST vs 스트리밍 방식 비교
스트리밍 방식의 핵심 이점은 첫 번째 데이터가 오는 즉시 처리를 시작할 수 있다는 점이다. 전체 응답을 기다릴 필요가 없고, 메모리에 전체 응답을 올릴 필요도 없다.
AbstractApiClient: 왜 추상 클래스로 설계했나
외부 API를 다루는 클라이언트가 여러 개 필요했다. AI 분석 API, 문서 변환 API, 메타데이터 API 등 각각 다른 베이스 URL을 가지지만, 모두 공통적으로 필요한 것들이 있었다.
- 구조화된 로깅: 어떤 API가, 어느 엔드포인트에, 어떤 파라미터로 호출됐는지
- 에러 처리: 외부 API 오류를 내부 예외(
CoreException)로 일관되게 변환 - 공통 인터페이스:
get(),post(),postStreaming()등 통일된 호출 방식
이 공통 관심사를 각 클라이언트에 중복으로 작성하는 것은 DRY 원칙 위반이다. 그렇다고 인터페이스로만 정의하면 구현을 강제할 수 없다.
추상 클래스가 적합한 이유가 여기 있다. 공통 구현을 추상 클래스에 두고, 각 클라이언트가 다를 수 있는 부분(getBaseUrl())만 추상 메서드로 위임한다.
@Slf4j
@RequiredArgsConstructor
public abstract class AbstractApiClient {
private final HttpClientExecutor httpClientExecutor;
// 각 구현체가 반드시 제공해야 하는 것: 베이스 URL
protected abstract String getBaseUrl();
protected <R extends ExternalApiResponse, Q extends ExternalQueryParams> Mono<R> get(
String uri, Class<R> responseType, Q queryParams) {
final Map<String, Object> params = queryParams != null
? queryParams.toMap()
: Collections.emptyMap();
log.info("[API-GET] baseUrl={}, uri={}, params={}", getBaseUrl(), uri, params);
return httpClientExecutor.get(getBaseUrl(), uri, responseType, params)
.doOnSuccess(response -> log.info("[API-GET-SUCCESS] uri={}", uri))
.doOnError(error -> log.error("[API-GET-ERROR] uri={}, error={}", uri, error.getMessage()))
.onErrorMap(error -> new CoreException(
CommonErrorType.EXTERNAL_API_ERROR, error.getMessage(), error));
}
// POST 스트리밍 전용
protected <T extends ExternalApiRequest> Flux<String> postStreaming(
String uri, T requestBody) {
log.info("[API-POST-STREAMING] baseUrl={}, uri={}", getBaseUrl(), uri);
return httpClientExecutor.postStreaming(getBaseUrl(), uri, requestBody)
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return new String(bytes);
})
.doOnComplete(() -> log.info("[API-POST-STREAMING-COMPLETE] uri={}", uri))
.doOnError(error -> log.error(
"[API-POST-STREAMING-ERROR] uri={}, error={}", uri, error.getMessage()));
}
protected <T extends ExternalApiRequest, R extends ExternalApiResponse> Mono<R> post(
String uri, T requestBody, Class<R> responseType) {
return httpClientExecutor.post(getBaseUrl(), uri, requestBody, responseType)
.onErrorMap(error -> new CoreException(
CommonErrorType.EXTERNAL_API_ERROR, error.getMessage(), error));
}
}구현체는 단 한 가지만 책임진다.
@Component
public class AiAnalysisApiClient extends AbstractApiClient {
@Value("${ai.api.base-url}")
private String baseUrl;
public AiAnalysisApiClient(HttpClientExecutor httpClientExecutor) {
super(httpClientExecutor);
}
@Override
protected String getBaseUrl() {
return baseUrl;
}
public Flux<String> streamAnalysis(AnalysisRequest request) {
return postStreaming("/v1/analyze", request);
}
}공통 상태(필드)나 공통 구현이 있으면 추상 클래스가 적합하다. 이 경우 httpClientExecutor라는 공통 의존성과 get(), post() 등의 공통 구현이 있기 때문에 추상 클래스를 선택했다. 단순히 계약만 정의한다면 인터페이스가 맞다.
postStreaming()의 핵심: DataBuffer를 String으로 변환하기
스트리밍의 핵심 코드를 자세히 살펴보자.
return httpClientExecutor.postStreaming(getBaseUrl(), uri, requestBody)
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer); // ← 이게 핵심
return new String(bytes);
})httpClientExecutor.postStreaming()은 Flux<DataBuffer>를 반환한다. DataBuffer는 Spring WebFlux에서 사용하는 바이트 버퍼 추상화인데, 일반 byte[]와 달리 풀(pool)에서 관리되는 메모리를 사용한다.
DataBufferUtils.release()를 반드시 호출해야 하는 이유
DataBuffer는 Netty의 ByteBuf를 내부적으로 사용하는 경우가 많다. ByteBuf는 레퍼런스 카운팅(reference counting) 방식으로 메모리를 관리하는데, 사용 후 release()를 호출하지 않으면 레퍼런스 카운트가 0이 되지 않아서 메모리 풀로 반환되지 않는다.
- 메모리 릭: 처리한 DataBuffer가 GC 대상이 되지 않고 메모리에 계속 쌓인다
- Netty 경고 로그:
LEAK: ByteBuf.release() was not called before it's garbage-collected같은 경고가 출력된다 - 장기 운영 시 OOM: 트래픽이 많은 환경에서 결국 OutOfMemoryError로 이어진다
올바른 패턴은 다음과 같다.
.map(dataBuffer -> {
// 1. DataBuffer에서 데이터를 byte[]로 복사
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
// 2. 복사 후 즉시 release — 이 순서가 중요하다
DataBufferUtils.release(dataBuffer);
// 3. 복사한 byte[]로 String 생성
return new String(bytes, StandardCharsets.UTF_8);
})데이터를 byte[]로 복사한 직후 release()를 호출하는 것이 포인트다. return 이후에 호출하면 이미 참조가 사라진 후이므로 release() 자체가 실행되지 않을 수 있다.
스트리밍 처리 흐름 전체 다이어그램
스트리밍 처리 흐름 — 계층 간 DataBuffer 변환
동기(.block()) vs 비동기: 이 프로젝트의 선택
Spring WebFlux와 Reactor를 쓰다 보면 반드시 마주치는 질문이 있다. .block()을 써서 동기로 처리할 것인가, 아니면 비동기 파이프라인을 끝까지 유지할 것인가.
이론적으로는 .block() 사용을 최소화해야 한다. Reactor의 이벤트 루프 스레드에서 .block()을 호출하면 해당 스레드가 블로킹되면서 전체 처리량이 저하된다.
하지만 이 프로젝트에서는 의도적으로 일부 동기 처리를 선택했다. 그 이유는 다음과 같다.
비동기(Flux/Mono 유지)로 처리한 것
- AI 분석 스트리밍 응답 수신: 데이터가 오는 즉시 처리해야 하므로 Flux 유지
- 여러 문서 병렬 분석:
Flux.merge()또는Mono.zip()으로 병렬 처리
동기(.block())로 처리한 것
- 분석 결과를 DB에 저장하기 전 최종 취합: JPA는 동기 방식이므로 어차피 블로킹 필요
- Spring MVC 컨트롤러와 연동: 프로젝트가 WebFlux가 아닌 MVC 기반이므로
핵심 판단 기준: 이미 블로킹 I/O가 불가피한 경계(JPA, MVC)에서는 .block()이 자연스럽다. 블로킹 없이 끝까지 비동기로 갈 수 있는 경우에만 Flux/Mono를 유지한다.
실제 서비스 레이어 코드 예시는 아래와 같다.
@Service
@RequiredArgsConstructor
public class DocumentAnalysisService {
private final AiAnalysisApiClient aiApiClient;
private final AnalysisResultRepository resultRepository;
public AnalysisResult analyze(Document document) {
// 스트리밍으로 수신하면서 StringBuilder에 누적
StringBuilder accumulatedResult = new StringBuilder();
aiApiClient.streamAnalysis(new AnalysisRequest(document.getContent()))
.doOnNext(chunk -> {
// 청크 도착 시마다 실시간 처리 가능 (예: WebSocket으로 전달)
accumulatedResult.append(chunk);
})
.blockLast(); // MVC 환경이므로 최종 결과를 기다림
// 누적된 결과를 파싱하고 DB에 저장
AnalysisResult result = AnalysisResult.create(
document.getId(),
accumulatedResult.toString()
);
return resultRepository.save(result);
}
}설계 회고: 이 구조에서 배운 것
AbstractApiClient 구조를 쓰면서 느낀 가장 큰 이점은 새로운 외부 API 클라이언트를 추가할 때의 편함이었다. getBaseUrl()만 구현하면 로깅, 에러 변환, 스트리밍 처리가 모두 자동으로 따라온다.
반면 아쉬운 점도 있었다. 추상 클래스를 상속하는 구조는 테스트 시 모킹이 인터페이스보다 까다롭다. httpClientExecutor를 직접 목으로 교체해야 하는데, 추상 클래스의 protected 메서드를 통해야 하므로 테스트 코드가 약간 복잡해진다.
지금 다시 설계한다면 AbstractApiClient를 인터페이스 + 컴포지션 방식으로 리팩토링하는 것을 고려할 것 같다. 추상 클래스의 공통 구현을 별도 ApiClientTemplate 클래스로 분리하고, 각 클라이언트가 이를 의존성으로 주입받는 방식이다. 이렇게 하면 상속 계층 없이도 공통 로직을 재사용할 수 있다.
그럼에도 현재 구조는 팀 내 코드 일관성을 빠르게 확보하는 데 효과적이었고, 실제 운영 중에 메모리 릭 없이 안정적으로 동작하고 있다.