Notice
Recent Posts
Recent Comments
Link
관리 메뉴

look-forest

경쟁 소비자 패턴(Work Queue 모델)과 큐의 메시지 상태 본문

Middleware/RabbitMQ (메시지 브로커)

경쟁 소비자 패턴(Work Queue 모델)과 큐의 메시지 상태

studyHub 2026. 2. 17. 19:53

Consumer간 작업 분배 - WorkQueue

Work Queues : Competing Consumers Pattern

Work Queue 패턴에서는 여러 소비자가 하나의 큐에서 메시지를 가져가 경쟁적으로 처리한다.

작업 부하를 효율적으로 분산하고, 병렬 처리를 가능하게 만들어 처리량을 향상시키는 효과가 있다.

  • Round-Robin 방식(default)과 Fair Dispatch 방식을 사용하여 메시지를 Consumer 간에 분배
  • Fair Dispatch 방식은 개발자가 코드 레벨에서 컨슈머간 분배 조정.
    메시지 수동 확인(Manual Acknowledgement) 모드로 설정해야함. (AMQP 기본 값 Auto)
    메시지 처리 비중(Prefetch Count) 설정 등을 통해 조정 가능

주요 특징

  1. 경쟁적인 메시지 소비
    여러 Consumer가 동일한 메시지 큐에서 메시지를 가져가 처리하나,
    특정 메시지는 한 번에 하나의 Consumer에 의해 처리되므로 중복 처리는 되지 않는다.
  2. 작업 분산
    메시지가 여러 Consumer 간에 분배되어 병렬로 처리되므로 작업 부하를 효율적으로 분산
  3. 확장성
    Consumer를 추가하거나 제거함으로써 작업 처리 능력을 동적으로 확장하거나 축소
  4. 내결함성
    Consumer 중 하나가 실패하더라도 다른 Consumer가 작업을 이어받아 처리할 수 있어 중단없이 작동

구성

Config는 별반 다를 바 없으나, 오류 확인을 위해 큐의 Duration을 true로 설정하여 서버를 내려도 데이터가 유지되도록 했다.

@Configuration
public class RabbitMQConfig {
	// 큐 네임 설정
	public static final String QUEUE_NAME = "WorkQueue";

	//Spring이 시작될 때 RabbitMQ 서버에 “이 큐를 생성하라”라고 선언하기 위해
	@Bean
	public Queue queue() {
		//QUEUE_NAME은 메시지가 쌓이고 처리될 큐의 이름을 정의
		return new Queue(QUEUE_NAME, true); //영속화 여부. true: 서버 셧다운 시 데이터 보관
	}


	//RabbitTemplate: 래빗엠큐로 메시지를 보내는 데 사용되는 템플릿 클래스
	@Bean
	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		return new RabbitTemplate(connectionFactory); //ConnectionFactory: 래빗엠큐의 연결 관리
	}

	//SimpleMessageListenerContainer: 래빗엠큐에서 메시지를 수신하고 처리하는 리스너 컨테이너
	@Bean
	public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
		MessageListenerAdapter listenerAdapter) {
		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
		container.setConnectionFactory(connectionFactory);
		container.setQueueNames(QUEUE_NAME);
		container.setMessageListener(listenerAdapter);
		container.setAcknowledgeMode(AcknowledgeMode.AUTO); //default이나 명시
		return container;
	}

	//MessageListenerAdapter: 메시지를 처리할 리스너 어댑터
	@Bean
	public MessageListenerAdapter listenerAdapter(WorkQueueConsumer workQueueTask) {
		return new MessageListenerAdapter(workQueueTask, "workQueueTask");
	}
}

 

그 외 Controller, Producer, Consumer는 역할이 별반 다를게 없다.

사실 Work Queue 전용 설정이 따로 있는 것이 아니고, 하나의 Queue를 여러 Consumer가 같이 소비하면 그게 곧 Work Queue 패턴이다. 같은 큐를 바라보는 @RabbitListener 인스터스가 여러 개 있으면 Work Queue 처럼 동작한다.

하나의 RabbitMQ 서버에 여러 Spring 서버를 띄워서 확인해보자.

실행

jar로 빌드하여 spring을 두대 띄워본 후 curl로 요청을 3번 보내보니, 아래와 같이 1,3 / 2 요청이 분산됨을 확인할 수 있었다.

8080 포트로 요청을 3개 보낸 후 8080 및 8081 포트의 Consumer에 메시지가 각각 분배되어 처리


메시지

어드민에서 메시지 확인하기

의도적으로 Consumer에서 오류를 발생하게 하고, 어드민에서 확인해보자.

큐의 상태를 보면 메시지가 상태에 따라 분류되는데,

  • ready
    • 메시지가 큐에 있지만, 아직 컨슈머에게 전달되지 않은 상태
    • 이 상태의 메시지는 대기 중이며, 컨슈머가 연결되면 전달될 준비가 됨
    • 큐의 적재량이 많이지면 ready 메시지 수가 증가
  • unacked
    • 메시지가 컨슈머에게 전달되었으나, 아직 확인되지 않은 상태
    • 컨슈머가 메시지를 처리하고 확인(ACK)를 보내면, RabbitMQ는 해당 메시지를 삭제
    • 컨슈머가 확인을 보내지 못하거나 연결이 끊어지면, 메시지는 다시 ready 상태로 돌아감(설정에 따라 다름)

메시지 삭제가 필요한 경우, admin에서 purge 버튼을 눌러서 삭제

  • purge 명령은 큐에 있는 ready 메시지를 삭제 (unacked 메시지는 purge로 삭제되지 않음)
  • 삭제된 메시지는 복구되지 않으며, 데이터는 소실
  • unacked 메시지를 삭제하려면 컨슈머가 연결을 끊거나 RabbitMQ 큐를 재시작해야 함
    (컨슈머가 연결을 끊으면 unacked 메시지는 다시 ready 상태로 돌아가므로)

일반적인 문제 해결 방법

  • ready가 많을 경우
    • 컨슈머 수를 늘리거나 컨슈머의 메시지 처리 속도를 최적화해야 함.
  • unacked가 많을 경우
    • 컨슈머 코드 수정 (프로그램 에러일 가능성이 높다)
      unacked 상태에서 consumer 를 제대로 소비하도록 프로그램을 수정하면 unacked 에서도 처리가 되어 삭제된다.
    • 컨슈머 연결 상태를 확인 하여 재연결 혹은 재시작 필요
  •  

참고 자료 & 이미지 출처
RabbitMQ를 이용한 비동기 아키텍처 한방에 해결하기