Notice
Recent Posts
Recent Comments
Link
관리 메뉴

look-forest

Routing Model을 이용한 Log 수집 본문

Middleware/RabbitMQ (메시지 브로커)

Routing Model을 이용한 Log 수집

studyHub 2026. 2. 22. 21:16

Routing 모델

메시지를 Routing Key에 따라 특정 큐에 전달하는 기능.

Direct Exchange와 Topic Exchange 모두에서 사용 가능.

특징 Direct Exchange Topic Exchange
라우팅 키 매칭 방식 정확히 일치 패턴 기반 (*, # 지원)
특징 매칭에 제한이 있음 매우 유연(다양한 패턴 매칭)
사용 사례 단순 명확한 목적의 라우팅 복잡하고 동적인 라우팅
  • Topic 의 경우 패턴 매칭 기반 라우팅으로 binding key와 매칭되는 메시지만 수신
    • * : 한 단어에 해당하는 모든 단어 수신
    • # : 0개 이상의 단어 (와일드 카드)

주요 특징

  1. 고성능
    • 메시지를 필요한 곳에만 전달하기 때문에, 네트워크 부하 감소 효과가 있고 브로드캐스팅보다 자원 효율적 사용
  2. 라우팅 키 기반의 메시지 분배
    • 각 큐는 하나 이상의 라우팅 키매칭
      (Direct Exchange의 경우 라우팅 키가 정확히 일치해야 해서 1:1 로 동작한다고 알고 있는데,
      동일한 라우팅 키로 여러 큐에 바인딩 할 수 있으므로 1:N으로 동작)
    • Fanout 방식과 달리, 메시지가 특정 큐로만 전달
  3. 바인딩 설정
    • Exchange와 큐 사이의 관계를 바인딩 키(binding key)를 통해 매핑하여 메시지가 전달
    • 라우팅 키와 바인딩 키가 일치 하지 않을 경우 메시지가 전달이 안됨
    • 다수의 큐와 라우팅을 관리할 경우 복잡성이 늘어남

메시지 흐름

    1. Producer가 메시지와 함께 라우팅 키를 설정해 Exchange로 전송
    2. Exchange는 바인딩 키를 확인하고, 해당 키와 매칭되는 큐로 메시지를 전달
    3. Consumer는 해당 큐에서 메시지를 소비

Routing 모델의 활용 사례

  • 로그 수집
    • 각각 log.* 나 *.error으로 끝나는 모든 에러
  • 주문 상태 전이 (주문 완료시 order.completed 를 키로 배송 지시, 재고 차감 지시, 이메일 마케팅 전송 큐 등에 분배)
    • 주문의 완료(order.completed) 이후 배송지시(order.completed.shipped), 재고 차감(order.completed.inventory),
      마케팅 이메일 발송(order.completed.email) 등의 큐 전달
  • 채팅의 방 개설 단위로 분류하여 메시지 전달
    • chat.room1, chat.room2,,

Routing Model을 이용한 Log 수집

구현 기능

  • Exception이 발생했을 때, 해당 메시지를 송/수신하는 로직으로 메시지를 routing
  • 에러 로그별 매칭되는 큐로 전달도 가능

로그를 수집하는 Routing 모델 예제 (using DirectExchange)

RabbitMQConfig

3개의 Queue Bean 선언(Error, Warn, Info), 3개의 Binding Bean 선언 (to DirectExchage) 

@Configuration
public class RabbitMQConfig {
	public static final String ERROR_QUEUE = "error_queue";
	public static final String WARN_QUEUE = "warn_queue";
	public static final String INFO_QUEUE = "info_queue";

	public static final String DIRECT_EXCHANGE = "direct_exchange";

	@Bean
	public DirectExchange directExchange() {
		return new DirectExchange(DIRECT_EXCHANGE);
	}

	@Bean
	public Queue errorQueue() {
		return new Queue(ERROR_QUEUE, false);
	}

	@Bean
	public Queue warnQueue() {
		return new Queue(WARN_QUEUE, false);
	}

	@Bean
	public Queue infoQueue() {
		return new Queue(INFO_QUEUE, false);
	}

	@Bean
	public Binding errorBinding() {
		return BindingBuilder.bind(errorQueue()).to(directExchange()).with("error");
	}

	@Bean
	public Binding warnBinding() {
		return BindingBuilder.bind(warnQueue()).to(directExchange()).with("warn");
	}

	@Bean
	public Binding infoBinding() {
		return BindingBuilder.bind(infoQueue()).to(directExchange()).with("info");
	}
}

 

LogConsumer

@Component
public class LogConsumer {

	@RabbitListener(queues = RabbitMQConfig.ERROR_QUEUE)
	public void consumeError(String message) {
		System.out.println("[ERROR]를 받음 : " + message);
	}

	@RabbitListener(queues = RabbitMQConfig.WARN_QUEUE)
	public void consumeWarn(String message) {
		System.out.println("[WARN]를 받음 : " + message);
	}

	@RabbitListener(queues = RabbitMQConfig.INFO_QUEUE)
	public void consumeInfo(String message) {
		System.out.println("[INFO]를 받음 : " + message);
	}
}

 

LogPublisher

Exchange에 라우팅 키와 함께 메시지 전송

@Component
@RequiredArgsConstructor
public class LogPublisher {

	private final RabbitTemplate rabbitTemplate;

	public void publish(String routingKey, String message) {
		rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, routingKey, message);
		System.out.println("[#] message published: " + routingKey + ":" + message);
	}
}

 

Exception 처리 로직(CustomExsceptionHandler → LogPublisher로 전송)

@Component
@RequiredArgsConstructor
public class CustomExceptionHandler {

	private final LogPublisher logPublisher;

	//에러 처리
	public void handleException(Exception e) {
		String routingKey = e instanceof IllegalArgumentException ? "warn" : "error";
		logPublisher.publish(routingKey, "Exception Log: " + e.getMessage());
	}

	// 메시지 처리
	public void handleMessage(String message) {
		String routingKey = "info";
		logPublisher.publish(routingKey, "Info Log: " + message);
	}
}

 

테스트

curl -X GET "http://localhost:8080/api/logs/error" 
curl -X GET "http://localhost:8080/api/logs/warn" 
curl -X POST "http://localhost:8080/api/logs/info" \
 -H "Content-Type: application/json" \
 -d "\"System initialized successfully.\""


패턴별 로그를 수집하는 Routing 모델 예제 (using TopicExchange)

개별 로그와 더불어 모든 로그를 수집하는 큐를 만들어보자.

  • 패턴을 만들기 위해, Routing Key에 log.을 추가
  • 각각의 로그에 대한 개별 큐에 추가하여, 모든 로그를 수집하는 큐 생성(all_log_queue)
  • all_log_queue에 대한 바인딩 키를 log.*로 설정

RabbitMQConfig 추가/변경 사항

@Configuration
public class RabbitMQConfig {
	...
	public static final String ALL_LOG_QUEUE = "all_log_queue";

	public static final String TOPIC_EXCHANGE = "topic_exchange";

	@Bean
	public TopicExchange topicExchange() {
		return new TopicExchange(TOPIC_EXCHANGE);
	}

	...

	@Bean
	public Queue allLogQueue() {
		return new Queue(ALL_LOG_QUEUE, false);
	}

	...

	@Bean
	public Binding allLogBinding() {
		return BindingBuilder.bind(allLogQueue()).to(topicExchange()).with("log.*");
	}
}

 

Exceptionhandler 변경 사항: 패턴을 만들기 위해 log. 추가

@Component
@RequiredArgsConstructor
public class CustomExceptionHandler {

	private final LogPublisher logPublisher;

	//에러 처리
	public void handleException(Exception e) {
		String routingKey = e instanceof IllegalArgumentException ? "log.warn" : "log.error";
		logPublisher.publish(routingKey, "Exception Log: " + e.getMessage());
	}

	// 메시지 처리
	public void handleMessage(String message) {
		String routingKey = "log.info";
		logPublisher.publish(routingKey, "Info Log: " + message);
	}
}

 

Consumer 추가 사항

@Component
public class LogConsumer {

	...

	@RabbitListener(queues = RabbitMQConfig.ALL_LOG_QUEUE)
	public void consumeAllLogs(String message) {
		System.out.println("[All LOGS]를 받음 : " + message);
	}
}

참고 자료 & 이미지 출처
RabbitMQ를 이용한 비동기 아키텍처 한방에 해결하기