본문 바로가기
역량 UP!/Architecture

Saga Pattern(outbox pattern) 좀 더 보기!

by 태하팍 2025. 10. 24.
반응형

2025.10.24 - [역량 UP!/Architecture] - MSA에서 분산 트랜잭션 처리는?(Saga+outbox)


위 내용을 토대로 아키텍처를 설계했는데
Saga pattern중 오케스트레이션 말고 코레오그래피의 흐름을 따라가며 좀 더 보자!

1) Outbox 테이블

CREATE TABLE outbox (
  id            BIGINT PRIMARY KEY AUTO_INCREMENT,
  aggregate_type   VARCHAR(40) NOT NULL,   -- "ORDER","INVENTORY","PAYMENT"
  aggregate_id     VARCHAR(64) NOT NULL,   -- ex) orderId
  event_type       VARCHAR(60) NOT NULL,   -- "OrderCreated","StockReserved"...
  event_version    INT NOT NULL,           -- 이벤트 스키마 버전 (호환성 관리)
  payload          JSON NOT NULL,          -- 이벤트 본문
  headers          JSON NULL,              -- traceId, correlationId, idempotencyKey 등
  status           ENUM('PENDING','SENT','FAILED') DEFAULT 'PENDING',
  created_at       DATETIME NOT NULL,
  last_attempt_at  DATETIME NULL,
  attempts         INT DEFAULT 0,
  UNIQUE KEY uk_idem (aggregate_type, aggregate_id, event_type, event_version, JSON_EXTRACT(payload,'$.idempotencyKey'))
);

aggregate_id: 순서보장과 멱등을 위한 핵심 키(ex. order_id)
payload: event body(Json)
status/attempts: 메시지 릴레이(producer)가 재시도 관리
idempotencykey: 같은 이벤트 중복 발행, 소비 방지

트랜잭션 내 작업 예시(주문 서비스):

  1. orders INSERT (status=PENDING)
  2. outbox INSERT (event_type=‘OrderCreated’, payload에 orderId 등)
  3. 한 트랜잭션 커밋.

2) Kafka 이벤트—토픽/키/포맷

Topic 설계(권장)

  • order-events, inventory-events, payment-events (도메인별 Topic)
  • 키 = aggregateId(orderId) 로 고정 → 같은 주문의 이벤트는 같은 파티션에 들어감 → 순서 보장 (Kafka는 같은 key 내 순서를 보장).
{
  "eventId": "a8f3…",           // UUID (멱등 체크용)
  "eventType": "PaymentFailed",  // 타입
  "eventVersion": 1,             // 스키마 버전
  "aggregateType": "ORDER",
  "aggregateId": "O123",
  "occurredAt": "2025-10-22T12:34:56Z",
  "traceId": "t-9b7…",           // 분산추적
  "idempotencyKey": "O123#PaymentFailed#1",

  "data": {                      // 본문
    "orderId": "O123",
    "reason": "CARD_DECLINED",
    "pgTxnId": "PG-777",
    "amount": 39000,
    "currency": "KRW"
  }
}
  • eventId / idempotencyKey: 컨슈머 중복처리(멱등) 용.
  • traceId: Zipkin/Jaeger로 추적.

아키텍처의 흐름은 아래와 같음
주문 → 재고예약 → 결제 → 완료/실패의 흐름을 이벤트로 연결.

정상 플로우

  1. OrderCreated (주문서비스 → order-events)
  2. StockReserveRequested (주문 서비스 → inventory-events)
  3. StockReserved (재고서비스 → inventory-events)
  4. PaymentRequested (재고서비스→ payment-events)
  5. PaymentSucceeded (결제서비스 → payment-events)
  6. OrderCompleted (주문서비스 → order-events)

실패/보상(Compensation) 플로우 (결제 실패)

1~4 동일 →
5. PaymentFailed (결제서비스)
6. StockReleaseRequested (주문서비스 발행)
7. StockReleased (재고서비스)
8. OrderCanceled (주문서비스)

핵심: 보상 이벤트(Release/Cancel)를 명시적으로 발행해서 이전 단계의 상태를 되돌립니다.

 

Outbox 테이블에 저장되는 데이터

OrderCreated

BEGIN;

INSERT INTO orders (id, user_id, item_id, qty, status)
VALUES ('O123', 'U1', 'A100', 1, 'PENDING');

INSERT INTO outbox (
  aggregate_type,
  aggregate_id,
  event_type,
  event_version,
  payload,
  headers,
  status,
  created_at
)
VALUES (
  'ORDER',                           -- aggregate_type
  'O123',                            -- aggregate_id
  'OrderCreated',                    -- event_type
  1,                                 -- event_version
  '{"orderId":"O123","userId":"U1","itemId":"A100","qty":1,"status":"PENDING"}',
  '{"traceId":"abc123","idempotencyKey":"O123#OrderCreated"}',
  'PENDING',
  NOW()
);

COMMIT;

Message Relay(중계기)가 읽어서 Kafka에 발행

status='PENDING' 인 Outbox 데이터를 폴링해서 발행합니다. 또는 CDC

SELECT * FROM outbox
WHERE status='PENDING'
ORDER BY created_at
LIMIT 100;

읽은 후 kafka Producer로 전송

ProducerRecord<String, String> record = new ProducerRecord<>(
    "order-events",            // topic
    "O123",                    // key (orderId)
    outboxRow.payload          // value (JSON)
);
producer.send(record);

발행 성공 하면?

UPDATE outbox
SET status='SENT',
    last_attempt_at=NOW()
WHERE id = :outboxId;

 

결제 서비스가 구독하는 이벤트

토픽 예시: order-events
파티션 키: orderId (순서 보장)
구독하는 주요 이벤트(예):

  • OrderCreated: 주문이 생성됨(상태: PENDING). → 결제 승인 시도.
  • OrderCanceled: 주문 취소됨 → 결제 중이면 취소(승인취소, 환불) 시도.
    • OrderCreated 수신 → PG 승인 요청 → 실패 (카드한도초과, 네트워크 오류 등)
BEGIN;

UPDATE payments
SET status = 'FAILED',
    fail_reason = 'CARD_DECLINED',
    updated_at = NOW()
WHERE order_id = 'O123';

INSERT INTO outbox (
  aggregate_type, aggregate_id, event_type, event_version,
  payload, headers, status, created_at
) VALUES (
  'PAYMENT', 'O123', 'PaymentFailed', 1,
  '{
     "orderId":"O123",
     "amount":15000,
     "reason":"CARD_DECLINED"
   }',
  '{"traceId":"trace-abc","idempotencyKey":"pay:O123#failed"}',
  'PENDING', NOW()
);

COMMIT;

처리 흐름 (성공 케이스: 승인/확정)

  1. OrderCreated 수신
    • 멱등 체크(이미 같은 orderId로 PENDING 이상 있으면 스킵)
    • paymentsPENDING 저장(없으면 생성)
  2. PG 승인 요청 (idempotencyKey = pay:O123)
    • PG API 호출(승인/authorize, 또는 승인+매입 capture 즉시 처리 형태)
    • PG가 응답 200(승인성공) → AUTHORIZED (또는 CAPTURED)
    • PG가 4xx/5xx → 실패 흐름(아래 2-3)
  3. DB 트랜잭션 (성공 시)
BEGIN;

UPDATE payments
SET status = 'AUTHORIZED',               -- 또는 'CAPTURED'
    pg_txn_id = 'pg-20251022-xyz',
    pg_auth_code = 'A12345',
    updated_at = NOW()
WHERE order_id = 'O123';

INSERT INTO outbox (
  aggregate_type, aggregate_id, event_type, event_version,
  payload, headers, status, created_at
) VALUES (
  'PAYMENT', 'O123', 'PaymentAuthorized', 1,
  '{
     "orderId":"O123",
     "amount":15000,
     "pgTxnId":"pg-20251022-xyz",
     "authCode":"A12345",
     "status":"AUTHORIZED"
   }',
  '{"traceId":"trace-abc","idempotencyKey":"pay:O123#authorized"}',
  'PENDING', NOW()
);

COMMIT;

4. Relay가 Outbox를 발행

  • 토픽: payment-events, key=O123, value=위 JSON
  • 주문 서비스가 구독 → OrderServiceorders.statusPAID 로 업데이트, OrderPaid 이벤트 발행.

결제 성공 이벤트(PaymentAuthirized)

{
  "eventId": "e-7fca3",           // UUID
  "eventType": "PaymentAuthorized",
  "occurredAt": "2025-10-22T15:10:30Z",
  "aggregateId": "O123",          // = orderId (Kafka key도 O123)
  "data": {
    "orderId": "O123",
    "paymentId": "P-555",
    "amount": 15000,
    "method": "CARD",
    "currency": "KRW",
    "authCode": "A1B2C3"
  },
  "headers": {
    "traceId": "tr-9012",
    "producer": "payment-service"
  }
}

 참고: 발행(Publish)과 구독(Consume)은 “Topic”으로 연결된다

Kafka는 발행자(Producer)  구독자(Consumer)  Topic 단위로 연결 합니다.
즉, “같은 Topic 이름”을 알고 있어야 서로 연결됩니다.

OrderService - Producer

@Service
public class OrderEventPublisher {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public OrderEventPublisher(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void publishOrderCreated(Order order) {
        String topic = "order-events"; // 발행할 토픽명
        String key = order.getId();    // partition key (순서보장용)
        
        OrderCreatedEvent event = new OrderCreatedEvent(order);
        String payload = new ObjectMapper().writeValueAsString(event);
        
        kafkaTemplate.send(topic, key, payload);
    }
}

Inventory Service - Consumer

@Service
@KafkaListener(topics = "order-events", groupId = "inventory-group")
public class OrderEventConsumer {

    private final InventoryService inventoryService;

    public OrderEventConsumer(InventoryService inventoryService) {
        this.inventoryService = inventoryService;
    }

    @KafkaHandler
    public void consumeOrderCreated(String message) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        OrderCreatedEvent event = mapper.readValue(message, OrderCreatedEvent.class);
        
        // 주문 생성 이벤트 수신 → 재고 예약 로직 수행
        inventoryService.reserveStock(event.getOrderId(), event.getItems());
    }
}

이렇게 “order-events” 토픽을 구독(consume)하고, OrderCreated 이벤트를 받으면 재고 예약 로직을 수행하게 됩니다.

여러 개 구독 가능?

@KafkaListener(topics = {"order-events", "payment-events"}, groupId = "order-service-group")
public void consumeEvents(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    if (topic.equals("order-events")) {
        // 주문 관련 이벤트
    } else if (topic.equals("payment-events")) {
        // 결제 결과 처리
    }
}

쌉가능합니다! ㅋㅋ

끝~

 

 

 

반응형