$백엔드 개발자 Rueun의 기술 블로그|Java · Spring · 클린 아키텍처🌱
#Development

Redis Streams로 비동기 분석 파이프라인 설계하기

@2025-10-02·21 min read·📖 series: 클린 아키텍처로 실제 서비스 만들기

Redis Streams로 비동기 분석 파이프라인 설계하기

문서 분석 시스템의 핵심 기능은 분석이다. 그런데 분석은 빠르지 않다. 문서에서 이미지를 추출하고, 유사 이미지를 탐색하고, 표절 여부를 판정하는 과정은 문서 크기와 내용에 따라 수 초에서 수십 초까지 걸린다.

이 긴 작업을 사용자의 HTTP 요청에 묶어두면 어떻게 될까? 클라이언트는 응답을 수십 초 동안 기다려야 하고, 서버의 쓰레드는 그 시간 동안 블로킹된다. 동시 요청이 늘어나면 쓰레드 풀이 금방 고갈된다.

해결책은 비동기 처리다. 사용자에게는 즉시 응답하고, 분석은 별도 프로세스에서 비동기로 실행한다. 이 글에서는 이 비동기 파이프라인을 어떻게 설계했는지, 그리고 왜 Redis Streams를 선택했는지를 정리한다.

왜 비동기인가

동기 처리와 비동기 처리의 차이를 구체적으로 살펴보자.

동기 처리 (문제 상황):

클라이언트 → POST /documents → 서버 (문서 저장 + 분석 시작 + 분석 완료 대기) → 응답
                                ←————— 30초 ——————————————————→

비동기 처리 (채택한 방식):

클라이언트 → POST /documents → 서버 (문서 저장 + 이벤트 발행) → 202 Accepted
                                ←— 100ms —→

(별도 프로세스) 이벤트 수신 → 분석 파이프라인 실행 → 완료

비동기 방식에서 사용자는 100ms 이내에 응답을 받는다. 분석 상태는 폴링이나 웹소켓으로 별도로 조회할 수 있다. 서버의 HTTP 쓰레드는 문서 저장까지만 담당하고, 분석은 독립된 프로세스(analysis-processor)가 맡는다.

메시지 브로커 선택: Kafka vs RabbitMQ vs Redis Streams

비동기 파이프라인을 구현하려면 메인 서비스에서 분석 프로세서로 이벤트를 전달할 방법이 필요하다. 세 가지 후보를 검토했다.

Kafka

장점: 높은 처리량, 로그 보존, 다중 소비자, 강력한 파티셔닝 전략, 대규모 이벤트 스트리밍에 검증된 솔루션.

단점: 운영 복잡성이 높다. ZooKeeper(또는 KRaft) 클러스터, 브로커 클러스터, 토픽 관리, 모니터링 설정까지 상당한 인프라 투자가 필요하다. 메시지 순서가 중요한 파이프라인에서 파티션 설계가 까다롭다.

이 시스템의 이벤트 처리량은 Kafka를 정당화할 만큼 크지 않았다. Kafka의 강점은 초당 수십만 건의 이벤트를 처리하거나, 이벤트 로그를 장기 보존해야 할 때 빛난다.

RabbitMQ

장점: 유연한 라우팅(Exchange, Binding), 직관적인 메시지 모델, 비교적 쉬운 운영.

단점: 별도의 RabbitMQ 클러스터를 구성하고 운영해야 한다. 이미 Redis를 인프라에서 사용 중인데, 메시지 브로커 하나를 위해 새로운 인프라 컴포넌트를 추가하는 것이 부담스러웠다.

Redis Streams

장점: 이미 인프라에 Redis가 있다. Redis Streams는 Redis 5.0부터 내장된 기능으로, 별도 설치나 운영이 필요 없다. Consumer Group을 통한 at-least-once 보장, 메시지 ACK, 처리되지 않은 메시지 재처리가 가능하다.

단점: Kafka에 비해 처리량이 낮고, 메시지 보존 정책이 단순하다. Redis 메모리가 가득 차면 오래된 메시지가 삭제될 수 있다. 운영 중인 Redis 클러스터에 장애가 생기면 캐시뿐 아니라 메시지 브로커도 함께 영향을 받는다.

최종 선택: Redis Streams

결정 기준은 다음과 같았다.

  1. 운영 복잡성 최소화: 이미 Redis를 사용 중이므로 추가 인프라 없이 도입 가능하다.
  2. 처리량 요구사항: 이 시스템의 이벤트 처리량은 Redis Streams로 충분히 감당할 수 있다.
  3. 기능 충분성: Consumer Group, ACK 기반 at-least-once, 미처리 메시지 재처리 기능이 이 파이프라인에 필요한 모든 것을 제공한다.
⚠️Redis Streams의 한계를 인지하고 선택하라

Redis Streams는 Redis 메모리를 사용한다. 메시지 처리 지연이 지속되면 메모리 압박이 생긴다. MAXLEN 옵션으로 스트림 크기를 제한하거나, 주기적으로 처리된 메시지를 정리해야 한다. 또한 Redis 클러스터 장애 시 캐시와 메시지 파이프라인이 동시에 영향받는 단일 장애점이 될 수 있다. 이 트레이드오프를 수용할 수 있는 규모와 요구사항일 때만 선택해야 한다.

이벤트 발행 구조: DocumentUploadedEvent

문서 업로드가 완료되면 DocumentUploadedEvent가 발행된다. 이 이벤트를 구독하는 analysis-processor가 분석 파이프라인을 시작한다.

이벤트 클래스는 도메인 레이어에 정의된다.

JAVA
public record DocumentUploadedEvent(
    Long documentId,
    String documentKey,
    DocumentMeta documentMeta
) {
    public static DocumentUploadedEvent from(Document document, DocumentMeta meta) {
        return new DocumentUploadedEvent(
            document.getId(),
            document.getDocumentKey(),
            meta
        );
    }
}

서비스에서 이벤트를 발행하는 코드는 단순하다.

JAVA
@RequiredArgsConstructor
@Service
public class DocumentService {
    private final DocumentRepository documentRepository;
    private final ApplicationEventPublisher eventPublisher;
 
    @Transactional
    public DocumentResponse uploadDocument(UploadDocumentCommand command) {
        Document document = Document.createNew(
            command.domain(), command.serviceType(),
            command.memberSrl(), command.clientIp(),
            command.title(), command.evaluationSettings(),
            command.documentCategory(), command.fileId()
        );
 
        Document saved = documentRepository.save(document);
 
        // 도메인 이벤트 발행
        eventPublisher.publishEvent(
            DocumentUploadedEvent.from(saved, command.documentMeta())
        );
 
        return DocumentResponse.from(saved);
    }
}

TransactionalEventListener가 왜 중요한가

이벤트 발행에서 가장 중요한 설계 결정은 언제 이벤트를 발행하느냐다.

Spring의 기본 @EventListener는 이벤트가 발행되는 즉시 처리한다. 이 경우 트랜잭션이 커밋되기 전에 이벤트가 처리될 수 있다.

트랜잭션 시작
  → Document DB 저장
  → eventPublisher.publishEvent() 호출
    → (즉시) 분석 프로세서가 이벤트 수신
    → (즉시) documentId로 DB 조회 시도
    → "Document not found" 오류 발생 (아직 커밋 안 됨!)
트랜잭션 커밋

이 타이밍 문제를 해결하는 것이 @TransactionalEventListener다.

JAVA
@Component
@RequiredArgsConstructor
public class DocumentUploadEventHandler {
    private final RedisStreamPublisher redisStreamPublisher;
 
    // 트랜잭션이 성공적으로 커밋된 후에만 실행
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void handleDocumentUploaded(DocumentUploadedEvent event) {
        redisStreamPublisher.publish(
            "analysis:document-uploaded",
            event
        );
    }
}

AFTER_COMMIT 단계를 지정하면 다음을 보장한다.

  • DB 트랜잭션이 성공적으로 커밋된 후에만 이벤트 리스너가 실행된다.
  • 트랜잭션이 롤백되면 이벤트 리스너는 실행되지 않는다.
🚨트랜잭션 커밋 전 이벤트 발행의 위험성

트랜잭션 커밋 전에 이벤트를 발행하면 두 가지 문제가 생긴다. 첫째, 소비자가 아직 DB에 없는 데이터를 조회하려 해서 실패한다. 둘째, 이후 트랜잭션이 롤백되어도 이미 발행된 이벤트는 취소할 수 없다. 결과적으로 DB에는 데이터가 없는데 분석 작업이 진행되는 불일치가 발생한다. @TransactionalEventListener(phase = AFTER_COMMIT)은 이 두 문제를 모두 해결한다.

Redis Streams 발행 구현

JAVA
@RequiredArgsConstructor
@Component
public class RedisStreamPublisher {
    private final RedisTemplate<String, String> redisTemplate;
    private final ObjectMapper objectMapper;
 
    public void publish(String streamKey, Object event) {
        try {
            String payload = objectMapper.writeValueAsString(event);
            Map<String, String> fields = Map.of("payload", payload);
            redisTemplate.opsForStream().add(
                RecordId.autoGenerate(),
                streamKey,
                fields
            );
        } catch (JsonProcessingException e) {
            throw new CoreException(CommonErrorType.INTERNAL_SERVER_ERROR,
                "Failed to serialize event: " + e.getMessage());
        }
    }
}

Consumer Group으로 at-least-once 보장

Redis Streams의 Consumer Group은 다음 두 가지를 보장한다.

  1. 동일 메시지가 그룹 내 하나의 소비자에게만 전달된다. 여러 analysis-processor 인스턴스가 실행되어도 하나만 메시지를 처리한다.
  2. ACK 이전까지 메시지가 재처리 대상이 된다. 소비자가 메시지를 받았지만 처리에 실패하거나 ACK 없이 종료되면, 다른 소비자가 해당 메시지를 다시 처리할 수 있다.
JAVA
@Component
@RequiredArgsConstructor
public class DocumentAnalysisConsumer {
    private final RedisTemplate<String, String> redisTemplate;
    private final AnalysisPipelineService analysisPipelineService;
    private final ObjectMapper objectMapper;
 
    private static final String STREAM_KEY = "analysis:document-uploaded";
    private static final String GROUP_NAME = "analysis-processor-group";
    private static final String CONSUMER_NAME = "processor-" + UUID.randomUUID();
 
    @Scheduled(fixedDelay = 100)
    public void consume() {
        List<MapRecord<String, Object, Object>> records =
            redisTemplate.opsForStream().read(
                Consumer.from(GROUP_NAME, CONSUMER_NAME),
                StreamReadOptions.empty().count(10).block(Duration.ofMillis(50)),
                StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed())
            );
 
        if (records == null || records.isEmpty()) return;
 
        for (MapRecord<String, Object, Object> record : records) {
            try {
                String payload = (String) record.getValue().get("payload");
                DocumentUploadedEvent event =
                    objectMapper.readValue(payload, DocumentUploadedEvent.class);
 
                analysisPipelineService.startAnalysis(event);
 
                // 처리 성공 시 ACK
                redisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP_NAME, record.getId());
 
            } catch (Exception e) {
                // ACK 없이 넘어가면 나중에 XPENDING으로 재처리 가능
                log.error("Failed to process record {}: {}", record.getId(), e.getMessage());
            }
        }
    }
}

처리 성공 시에만 ACK를 보낸다. ACK를 받지 못한 메시지는 Redis의 PEL(Pending Entry List)에 남아 있어 재처리가 가능하다.

Dead Letter Queue 패턴

반복적으로 처리에 실패하는 메시지는 무한 재시도 루프에 빠질 수 있다. 이를 방지하기 위해 support/deadletter 모듈에 Dead Letter Queue 패턴을 구현했다.

JAVA
@Component
@RequiredArgsConstructor
public class DeadLetterHandler {
    private final RedisTemplate<String, String> redisTemplate;
 
    private static final int MAX_RETRY_COUNT = 3;
    private static final String DLQ_KEY = "analysis:dead-letter";
 
    public void handleFailedRecord(String streamKey, String groupName,
                                    RecordId recordId, String payload,
                                    Exception cause) {
        int retryCount = getRetryCount(recordId);
 
        if (retryCount >= MAX_RETRY_COUNT) {
            // 최대 재시도 초과 → DLQ로 이동
            Map<String, String> dlqEntry = Map.of(
                "originalStream", streamKey,
                "recordId", recordId.getValue(),
                "payload", payload,
                "errorMessage", cause.getMessage(),
                "failedAt", Instant.now().toString()
            );
            redisTemplate.opsForStream().add(DLQ_KEY, dlqEntry);
 
            // 원본 스트림에서 ACK (더 이상 재처리 안 함)
            redisTemplate.opsForStream().acknowledge(streamKey, groupName, recordId);
 
            log.warn("Message moved to DLQ after {} retries: {}", MAX_RETRY_COUNT, recordId);
        }
    }
}

DLQ에 쌓인 메시지는 알람과 함께 운영팀이 확인하고 수동으로 재처리하거나 폐기한다.

파이프라인 전체 흐름 다이어그램

비동기 분석 파이프라인 전체 흐름

클라이언트
POST /documents
DocumentService
@Transactional · 문서 저장 → MariaDB
커밋 성공 후
TransactionalEventListener
phase = AFTER_COMMIT · Redis XADD
Redis Streams
analysis:document-uploaded · Consumer Group
XREADGROUP
analysis-processor
AnalysisJob 생성 → 파이프라인 실행
COMPLETED
분석 완료 · ACK
FAILED
재시도 → DLQ
Dead Letter
최대 재시도 초과

트랜잭션 커밋과 이벤트 발행 사이의 실패 시나리오

@TransactionalEventListener(AFTER_COMMIT)을 사용해도 완전한 exactly-once를 보장하지는 않는다. DB 커밋은 성공했지만 Redis에 이벤트를 발행하기 전에 서버가 재시작되면 어떻게 될까?

이 경우 이벤트가 발행되지 않아 분석이 시작되지 않는다. 이를 처리하기 위한 보완책으로 배치 재처리를 구현했다.

JAVA
@Scheduled(fixedDelay = 60_000) // 1분마다
public void recoverPendingAnalysis() {
    // 일정 시간이 지났는데 PENDING 상태인 분석 작업 조회
    LocalDateTime threshold = LocalDateTime.now().minusMinutes(5);
    List<AnalysisJob> stuckJobs =
        analysisJobRepository.findPendingJobsCreatedBefore(threshold);
 
    for (AnalysisJob job : stuckJobs) {
        log.warn("Recovering stuck analysis job: {}", job.getId());
        redisStreamPublisher.publish("analysis:document-uploaded",
            DocumentUploadedEvent.from(job));
    }
}

5분이 지나도 PENDING 상태인 분석 작업을 주기적으로 확인해 이벤트를 재발행한다. Consumer Group의 at-least-once 보장과 결합하면 메시지 유실 없이 모든 분석이 반드시 처리된다.

ℹ️at-least-once vs exactly-once

at-least-once는 메시지가 최소 한 번은 처리됨을 보장하지만, 중복 처리 가능성이 있다. exactly-once는 정확히 한 번만 처리됨을 보장하지만 구현이 복잡하고 성능 비용이 크다. 이 시스템에서 중복 처리를 허용하는 대신, AnalysisJob의 상태 머신이 이미 완료된 작업을 재처리하지 않도록 막아준다. COMPLETED 상태에서는 다른 상태로 전이가 불가능하므로 중복 이벤트가 오더라도 무시된다.

이 설계의 한계와 주의사항

Redis 클러스터 가용성에 의존한다. Redis가 다운되면 이벤트 발행과 소비가 모두 중단된다. 캐시 장애와 메시지 파이프라인 장애가 동시에 발생한다. 가용성 요구사항이 매우 높다면 Redis Streams보다 독립적인 메시지 브로커를 사용하는 것이 적합하다.

메모리 관리가 필요하다. 스트림 크기를 제한하지 않으면 Redis 메모리가 소진될 수 있다. MAXLEN 옵션으로 스트림의 최대 크기를 제한한다.

JAVA
redisTemplate.opsForStream().add(
    StreamRecords.newRecord()
        .ofMap(fields)
        .withStreamKey(streamKey)
);
// XADD analysis:document-uploaded MAXLEN ~ 10000 * ...

메시지 순서가 보장되지 않을 수 있다. Consumer Group에서 여러 소비자가 병렬로 처리하면 같은 문서에 대한 이벤트가 순서 없이 처리될 수 있다. 이 시스템에서는 문서 키 기준으로 하나의 소비자만 처리하도록 파티셔닝 로직을 추가했다.

전체 설계를 돌아보며

1

동기 처리의 한계 인식

분석처럼 오래 걸리는 작업을 HTTP 요청에 묶어두면 서버 쓰레드가 낭비되고 사용자 경험이 나빠진다.

2

메시지 브로커 선택

Kafka의 강력함보다 운영 단순성을 우선해 이미 인프라에 있는 Redis의 Streams 기능을 활용했다.

3

데이터 정합성 확보

@TransactionalEventListener(AFTER_COMMIT)으로 DB 커밋 전 이벤트 발행 문제를 해결했다.

4

안정성 보장

Consumer Group ACK, Dead Letter Queue, 배치 재처리로 메시지 유실과 무한 루프를 방지했다.

5

도메인 상태 머신과의 연계

AnalysisStatusAnalysisStep의 전이 규칙이 중복 처리와 잘못된 상태 변경을 도메인 레벨에서 차단한다.

비동기 파이프라인 설계에서 가장 중요한 교훈은 기술 선택보다 실패 시나리오를 먼저 설계하라는 것이다. 정상 흐름은 어떤 기술을 써도 동작한다. 트랜잭션과 이벤트 발행의 타이밍, 소비자 장애 후 재처리, 반복 실패 메시지 처리 같은 예외 상황을 미리 설계에 반영했을 때 시스템이 신뢰할 수 있어진다.


이 시리즈는 여기서 마무리된다. 4편에 걸쳐 클린 아키텍처의 선택 배경부터 도메인 상태 머신, Port & Adapter 패턴, 비동기 파이프라인까지 실제 시스템에서 어떻게 적용했는지를 정리했다. 완벽한 설계는 없다. 각 결정에는 트레이드오프가 있고, 그 트레이드오프를 이해하고 팀이 수용할 수 있는 선택을 하는 것이 좋은 아키텍처 결정이라고 생각한다.

§ 목차