SQS consumer는 만들고 나서가 시작이다
첫 구현은 메시지 처리만 다룬다
SQS consumer의 첫 구현은 보통 단순하다.
ReceiveMessage
-> unmarshal
-> handle
-> DeleteMessage정상 메시지를 읽고 처리한 뒤 삭제하면 된다. 테스트도 쉽게 통과한다.
하지만 운영 기준에서는 여기서 끝나지 않는다. 메시지 수신 실패가 반복될 때 어떻게 할지, handler가 실패한 메시지를 삭제할지, 반복 실패는 어디까지 재시도할지, 서버 종료 시 long polling 중인 consumer가 어떻게 멈출지 정해야 한다.
consumer는 메시지를 읽는 코드가 아니라 실패를 분류하는 코드에 가깝다.
Receive 실패와 handler 실패를 구분하기
처음에는 handler가 성공하면 삭제하고, 실패하면 삭제하지 않으면 된다고 생각했다.
messages, _ := queue.Receive(ctx)
for _, message := range messages {
if err := handle(ctx, message); err == nil {
_ = queue.Delete(ctx, message)
}
}이 정책 자체는 맞는 방향이다. 실패 메시지를 삭제하지 않으면 visibility timeout 이후 다시 처리된다.
다만 몇 가지 질문이 남는다.
ReceiveMessage자체가 계속 실패하면 얼마나 자주 재시도할까.- 잘못된 JSON 메시지는 계속 재시도해야 할까.
- 도메인 검증 실패는 영구 실패일까, 일시 실패일까.
- receive count가 충분히 높아진 메시지는 누가 DLQ로 보내나.
- 서버 종료 시 long polling 대기 중이면 언제 멈추나.
이 질문을 정리하지 않으면 consumer는 정상 상황에서는 잘 돌지만, 장애 상황에서는 시끄럽고 예측하기 어려운 코드가 된다.
삭제할 실패와 재시도할 실패
consumer에서 나눠야 하는 실패는 크게 세 가지다.
첫 번째는 메시지 형식 오류다. JSON이 깨졌거나 필수 필드가 없다. 재시도해도 성공할 가능성이 낮다. 이런 메시지는 로그를 남기고 삭제하는 편이 낫다.
두 번째는 도메인 일시 실패다. DB 연결 실패, lock timeout, 외부 서비스 일시 오류 같은 경우다. 메시지를 삭제하지 않고 재시도해야 한다.
세 번째는 반복 실패다. 같은 메시지가 여러 번 실패하면 애플리케이션이 직접 삭제하기보다 SQS redrive 정책으로 DLQ에 보내는 편이 운영상 명확하다.
malformed message -> delete
transient failure -> keep
repeated failure -> keep, let redrive move to DLQ
success -> deleteDLQ와 visibility timeout 기준 정하기
consumer 루프에는 long polling과 backoff를 함께 둔다.
long polling은 빈 큐에서 불필요한 요청을 줄인다. backoff는 receive 자체가 실패할 때 빠른 재시도를 막는다.
handler 실패는 에러 종류에 따라 나눈다. 영구 실패는 삭제하고, 일시 실패는 삭제하지 않는다. receive count가 임계값을 넘으면 에러 로그를 남기되 삭제하지 않는다. DLQ 이동은 SQS redrive 정책이 담당한다.
종료 흐름도 별도로 본다. context cancel 시 receive 대기나 backoff 대기에서 빠르게 종료되어야 한다. 서버 shutdown에서는 consumer goroutine이 끝났는지 기다리는 편이 좋다.
long polling과 shutdown 처리
package consumer
import (
"context"
"errors"
"time"
)
type Message struct {
ID string
ReceiptHandle string
Body []byte
ReceiveCount int
}
type Queue interface {
Receive(ctx context.Context, max int, waitSeconds int32) ([]Message, error)
Delete(ctx context.Context, receiptHandle string) error
}
type Handler interface {
Handle(ctx context.Context, msg Message) error
}
type Consumer struct {
queue Queue
handler Handler
maxReceiveCount int
}
func (c *Consumer) Start(ctx context.Context) {
backoff := time.Second
for {
messages, err := c.queue.Receive(ctx, 10, 10)
if err != nil {
if ctx.Err() != nil {
return
}
if !sleep(ctx, backoff) {
return
}
backoff = nextBackoff(backoff, 30*time.Second)
continue
}
backoff = time.Second
for _, msg := range messages {
c.handleOne(ctx, msg)
}
}
}
func (c *Consumer) handleOne(ctx context.Context, msg Message) {
err := c.handler.Handle(ctx, msg)
if err == nil {
_ = c.queue.Delete(ctx, msg.ReceiptHandle)
return
}
if errors.Is(err, ErrBadMessage) {
_ = c.queue.Delete(ctx, msg.ReceiptHandle)
return
}
if msg.ReceiveCount >= c.maxReceiveCount {
// 삭제하지 않는다. redrive policy가 DLQ로 이동시킨다.
return
}
// 일시 실패는 삭제하지 않는다.
}
var ErrBadMessage = errors.New("bad message")
func sleep(ctx context.Context, d time.Duration) bool {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return false
case <-timer.C:
return true
}
}
func nextBackoff(current, max time.Duration) time.Duration {
next := current * 2
if next > max {
return max
}
return next
}코드의 핵심은 복잡한 추상화가 아니다. 실패를 삭제할지, 남길지, DLQ에 맡길지 명확히 나누는 것이다.
consumer는 운영 정책을 코드로 표현한다
SQS consumer는 정상 메시지를 처리하는 순간보다 실패 메시지를 만났을 때 품질이 드러난다.
처음 구현에서는 receive와 delete만 보여도 충분해 보인다. 하지만 운영 전에 최소한 다음 항목은 확인해야 한다.
- receive 실패에 backoff가 있는가
- malformed 메시지는 계속 재시도하지 않는가
- 일시 실패 메시지는 삭제하지 않는가
- 반복 실패는 DLQ로 이동할 수 있는가
- context cancel 시 빠르게 종료되는가
- 로그에 message id와 도메인 식별자가 남는가
이 체크리스트를 통과하면 consumer는 단순한 polling 루프에서 운영 가능한 구성 요소에 가까워진다.