11package org .ezcode .codetest .infrastructure .event .config ;
22
3- import java .net .InetAddress ;
4- import java .net .UnknownHostException ;
53import java .time .Duration ;
64import java .util .Map ;
7- import java .util .UUID ;
85import java .util .concurrent .Executor ;
96
107import org .ezcode .codetest .infrastructure .event .listener .RedisJudgeQueueConsumer ;
1411import org .springframework .data .redis .connection .RedisConnectionFactory ;
1512import org .springframework .data .redis .connection .stream .MapRecord ;
1613import org .springframework .data .redis .connection .stream .ReadOffset ;
17- import org .springframework .data .redis .connection .stream .StreamOffset ;
1814import org .springframework .data .redis .core .RedisTemplate ;
1915import org .springframework .data .redis .stream .StreamMessageListenerContainer ;
20- import org .springframework .data .redis .connection .stream .Consumer ;
2116
2217import jakarta .annotation .PostConstruct ;
2318import lombok .RequiredArgsConstructor ;
@@ -30,6 +25,9 @@ public class RedisStreamConfig {
3025
3126 private final RedisTemplate <String , String > redisTemplate ;
3227 private final Executor consumerExecutor ;
28+ private final RedisStreamConsumerRegistrar consumerRegistrar ;
29+
30+ private StreamMessageListenerContainer <String , MapRecord <String , String , String >> container ;
3331
3432 @ PostConstruct
3533 public void initConsumerGroup () {
@@ -39,7 +37,7 @@ public void initConsumerGroup() {
3937 if (Boolean .FALSE .equals (exists )) {
4038 log .info ("Redis Stream 'judge-queue'를 생성합니다." );
4139 redisTemplate .opsForStream ().add ("judge-queue" , Map .of (
42- "emitterKey " , "dummy" ,
40+ "sessionKey " , "dummy" ,
4341 "problemId" , "0" ,
4442 "languageId" , "0" ,
4543 "userId" , "0" ,
@@ -53,21 +51,20 @@ public void initConsumerGroup() {
5351 connection .xGroupDelConsumer (
5452 "judge-queue" .getBytes (),
5553 "judge-group" ,
56- getConsumerName (). replace ( "consumer-" , "" )
54+ consumerRegistrar . getConsumerName ()
5755 );
5856 return null ;
5957 });
6058 } catch (Exception e ) {
61- log .warn ("DELCONSUMER 중 오류 발생: {}" , e .getMessage ());
59+ log .warn ("기존 컨슈머 삭제 중 오류 발생: {}" , e .getMessage ());
6260 }
6361
6462 redisTemplate .opsForStream ().createGroup ("judge-queue" , ReadOffset .latest (), "judge-group" );
6563
6664 log .info ("Redis Stream 'judge-queue'에 대한 Consumer Group 'judge-group'을 생성했습니다." );
6765 } catch (Exception e ) {
68- log .info ("예외 발생: {}, 메시지: {}" , e .getClass (), e .getMessage ());
6966 if (e .getCause () instanceof io .lettuce .core .RedisBusyException ) {
70- log .info ("Redis Consumer Group 'judge-group'이 이미 존재하여 생성을 건너뜁니다 ." );
67+ log .info ("이미 존재하는 Consumer Group이므로 생성을 생략합니다 ." );
7168 } else {
7269 log .error ("Redis Consumer Group 초기화에 실패했습니다." , e );
7370 throw e ;
@@ -80,34 +77,20 @@ public StreamMessageListenerContainer<String, MapRecord<String, String, String>>
8077 RedisConnectionFactory factory ,
8178 RedisJudgeQueueConsumer consumer
8279 ) {
83- StreamMessageListenerContainer
84- .StreamMessageListenerContainerOptions <String , MapRecord <String , String , String >> options =
85- StreamMessageListenerContainer
80+ var options = StreamMessageListenerContainer
8681 .StreamMessageListenerContainerOptions
8782 .builder ()
8883 .executor (consumerExecutor )
8984 .pollTimeout (Duration .ofSeconds (2 ))
85+ .errorHandler (e ->
86+ log .error ("[Redis Listener] 예외 발생 - 컨테이너가 죽었을 수 있음" , e ))
9087 .build ();
9188
92- StreamMessageListenerContainer <String , MapRecord <String , String , String >> container =
93- StreamMessageListenerContainer .create (factory , options );
94-
95- container .receive (
96- Consumer .from ("judge-group" , getConsumerName ()),
97- StreamOffset .create ("judge-queue" , ReadOffset .lastConsumed ()),
98- consumer
99- );
89+ this .container = StreamMessageListenerContainer .create (factory , options );
10090
101- container .start ();
102- return container ;
103- }
91+ consumerRegistrar .registerConsumer (this .container , consumer );
92+ this .container .start ();
10493
105- private String getConsumerName () {
106- try {
107- return "consumer-" + InetAddress .getLocalHost ().getHostName ();
108- } catch (UnknownHostException e ) {
109- log .warn ("호스트명 확인 실패, UUID 사용: {}" , e .getMessage ());
110- return "consumer-" + UUID .randomUUID ().toString ().substring (0 , 8 );
111- }
94+ return this .container ;
11295 }
11396}
0 commit comments