diff --git a/db/mysql/init/02-data.sql b/db/mysql/init/02-data.sql index b32cbf6..9813463 100644 --- a/db/mysql/init/02-data.sql +++ b/db/mysql/init/02-data.sql @@ -6,7 +6,7 @@ VALUES ('host1@test.com', '$2a$10$FAkvDPjLjmwJhd9o6YDStePThk5E1ZKHNNFFcjKqSFj.Vz ('member1@test.com', '$2a$10$FAkvDPjLjmwJhd9o6YDStePThk5E1ZKHNNFFcjKqSFj.VzF6qbWQa', '멤버1', NOW(), NOW(), 'system', 'system'); INSERT INTO `groups` (product_name, category, description, original_price, target_participants, divided_unit, recruitment_minutes, deadline_at, meeting_location, meeting_at, host_member_id, host_member_name, status, current_participants, created_time, updated_time, created_by, updated_by) -VALUES ('코스트코 견과류 믹스 3kg', '식품', '신선한 견과류입니다.', 30000, 102, '500g씩', 1440, NOW() + INTERVAL 1 DAY, '강남역 3번 출구', NOW() + INTERVAL 2 DAY, 1, '호스트1', 'RECRUITING', 2, NOW(), NOW(), 'system', 'system'); +VALUES ('코스트코 견과류 믹스 3kg', '식품', '신선한 견과류입니다.', 30000, 4, '500g씩', 1440, NOW() + INTERVAL 1 DAY, '강남역 3번 출구', NOW() + INTERVAL 2 DAY, 1, '호스트1', 'RECRUITING', 2, NOW(), NOW(), 'system', 'system'); INSERT INTO group_members (group_id, user_id, nickname, group_member_type, group_member_status, queue_number, joined_at, created_time, updated_time, created_by, updated_by) VALUES (1, 1, '호스트1', 'HOST', 'JOINED', 1, NOW(), NOW(), NOW(), 'system', 'system'), @@ -18,7 +18,7 @@ DROP PROCEDURE IF EXISTS generate_dummy_users$$ CREATE PROCEDURE generate_dummy_users() BEGIN DECLARE i INT DEFAULT 1; - WHILE i <= 5000 DO + WHILE i <= 5 DO INSERT INTO users (email, password, nickname, created_time, updated_time, created_by, updated_by) VALUES ( CONCAT('test', i, '@test.com'), diff --git a/docker-compose.yml b/docker-compose.yml index 507a9e3..7ea4eba 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,8 +17,11 @@ services: ports: - "8080:8080" depends_on: - - mysql - - redis + mysql: + condition: service_healthy + redis: + condition: service_started + restart: on-failure mysql: image: mysql:8.0 @@ -44,6 +47,12 @@ services: volumes: - ./db/mysql/data:/var/lib/mysql - ./db/mysql/init:/docker-entrypoint-initdb.d + healthcheck: + test: [ "CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-p1234" ] + interval: 5s + timeout: 3s + retries: 10 + start_period: 40s redis: image: redis:7-alpine @@ -51,11 +60,11 @@ services: deploy: resources: limits: - cpus: '0.5' - memory: 512M + cpus: '1.0' + memory: 1024M reservations: - cpus: '0.25' - memory: 256M + cpus: '1.0' + memory: 1024M restart: always ports: - "6379:6379" @@ -72,4 +81,5 @@ services: ports: - "9104:9104" depends_on: - - mysql + mysql: + condition: service_healthy diff --git a/script/k6/script.js b/script/k6/script.js index 5d85e64..699da02 100644 --- a/script/k6/script.js +++ b/script/k6/script.js @@ -12,7 +12,7 @@ export let options = { scenarios: { concurrent_join: { executor: 'per-vu-iterations', - vus: 5000, + vus: 5, iterations: 1, }, }, diff --git a/src/main/java/com/app/groupdeal/application/group/consumer/GroupJoinEventConsumer.java b/src/main/java/com/app/groupdeal/application/group/consumer/GroupJoinEventConsumer.java new file mode 100644 index 0000000..177c474 --- /dev/null +++ b/src/main/java/com/app/groupdeal/application/group/consumer/GroupJoinEventConsumer.java @@ -0,0 +1,139 @@ +package com.app.groupdeal.application.group.consumer; + +import com.app.groupdeal.application.group.service.GroupMemberService; +import com.app.groupdeal.application.group.service.GroupService; +import com.app.groupdeal.domain.group.model.GroupMember; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.connection.stream.*; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.Duration; +import java.util.List; +import java.util.Map; + + +@Slf4j +@Component +@RequiredArgsConstructor +public class GroupJoinEventConsumer { + + private final GroupMemberService groupMemberService; + private final GroupService groupService; + private final StringRedisTemplate redisTemplate; + + private static final String STREAM_KEY = "group:join:stream"; + private static final String CONSUMER_GROUP = "group-join-processor"; + private static final String CONSUMER_NAME = "worker-1"; + + @PostConstruct + public void init() { + createConsumerGroupIfNotExists(); + + Thread consumerThread = new Thread(() -> startConsuming(), "event-consumer-worker-1"); + consumerThread.setDaemon(false); + consumerThread.start(); + + log.info("✅ Consumer 스레드 시작 완료"); + } + + private void createConsumerGroupIfNotExists() { + try { + redisTemplate.opsForStream().createGroup(STREAM_KEY, CONSUMER_GROUP); + log.info("✅ Consumer Group 생성: {}", CONSUMER_GROUP); + } catch (Exception e) { + log.info("ℹ️ Consumer Group 이미 존재: {}", CONSUMER_GROUP); + } + } + + private void startConsuming() { + log.info("🚀 이벤트 소비 시작: {}", CONSUMER_NAME); + + while (!Thread.currentThread().isInterrupted()) { + try { + consumeBatch(); + } catch (Exception e) { + log.error("❌ 이벤트 소비 중 오류", e); + sleep(1000); + } + } + } + + private void consumeBatch() { + List> records = + redisTemplate.opsForStream().read( + Consumer.from(CONSUMER_GROUP, CONSUMER_NAME), + StreamReadOptions.empty() + .count(10) + .block(Duration.ofSeconds(2)), + StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()) + ); + + if (records != null && !records.isEmpty()) { + log.info("📥 이벤트 {} 건 수신", records.size()); + + for (MapRecord record : records) { + processEvent(record); + } + } + } + + @Transactional + protected void processEvent(MapRecord record) { + String eventId = record.getId().getValue(); + + try { + Map event = record.getValue(); + + Long groupId = Long.parseLong((String) event.get("groupId")); + Long userId = Long.parseLong((String) event.get("userId")); + String nickname = (String) event.get("nickname"); + Integer queueNumber = Integer.parseInt((String) event.get("queueNumber")); + + log.info("⚙️ [이벤트 처리 시작] eventId={}, groupId={}, userId={}, queueNumber={}", + eventId, groupId, userId, queueNumber); + + GroupMember member = groupMemberService.joinGroup(groupId, userId, nickname, queueNumber); + groupService.increaseParticipant(groupId); + + log.info("✅ [이벤트 처리 완료] eventId={}, userId={}, queueNumber={}, GroupMemberId={}", + eventId, member.getUserId(), member.getQueueNumber(), member.getGroupMemberId()); + + redisTemplate.opsForStream().acknowledge(STREAM_KEY, CONSUMER_GROUP, record.getId()); + + } catch (Exception e) { + log.error("❌ [이벤트 처리 실패] eventId={}", eventId, e); + handleFailedEvent(record, e); + } + } + + private void handleFailedEvent(MapRecord record, Exception e) { + log.warn("⚠️ 실패 이벤트를 DLQ로 이동: {}", record.getId().getValue()); + + redisTemplate.opsForStream().add( + "group:join:dlq", + Map.of( + "originalEventId", record.getId().getValue(), + "error", e.getMessage(), + "data", record.getValue().toString() + ) + ); + + redisTemplate.opsForStream().acknowledge(STREAM_KEY, CONSUMER_GROUP, record.getId()); + } + + private void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("⚠️ Consumer 중단됨"); + } + } +} + diff --git a/src/main/java/com/app/groupdeal/application/group/dto/QueueResult.java b/src/main/java/com/app/groupdeal/application/group/dto/QueueResult.java new file mode 100644 index 0000000..cd64557 --- /dev/null +++ b/src/main/java/com/app/groupdeal/application/group/dto/QueueResult.java @@ -0,0 +1,20 @@ +package com.app.groupdeal.application.group.dto; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public class QueueResult { + private boolean success; + private Long queueNumber; + private String eventId; + + public static QueueResult success(Long queueNumber, String eventId) { + return new QueueResult(true, queueNumber, eventId); + } + + public static QueueResult full(Long queueNumber) { + return new QueueResult(false, queueNumber, null); + } +} diff --git a/src/main/java/com/app/groupdeal/application/group/facade/GroupFacadeService.java b/src/main/java/com/app/groupdeal/application/group/facade/GroupFacadeService.java index 2803cb2..d97346a 100644 --- a/src/main/java/com/app/groupdeal/application/group/facade/GroupFacadeService.java +++ b/src/main/java/com/app/groupdeal/application/group/facade/GroupFacadeService.java @@ -1,5 +1,6 @@ package com.app.groupdeal.application.group.facade; +import com.app.groupdeal.application.group.dto.QueueResult; import com.app.groupdeal.application.group.service.GroupMemberService; import com.app.groupdeal.application.group.service.GroupQueueService; import com.app.groupdeal.application.group.service.GroupService; @@ -116,4 +117,32 @@ public JoinGroupResponseDto joinGroup(Long groupId, Long userId, String nickname } } + @Transactional + public JoinGroupResponseDto joinGroupWithEvent(Long groupId, Long userId, String nickname) { + + Group group = groupService.findById(groupId); + group.validateJoinable(); + + QueueResult result = queueService.issueQueueNumberWithEvent( + groupId, + userId, + nickname, + group.getTargetParticipants() + ); + + if (!result.isSuccess()) { + throw new BusinessException(ErrorType.GROUP_FULL); + } + + log.info("✅ [유저 {}] 참여 요청 접수 (순번: {}, 이벤트: {})", + userId, result.getQueueNumber(), result.getEventId()); + + return JoinGroupResponseDto.pending( + group, + userId, + nickname, + result.getQueueNumber() + ); + } + } diff --git a/src/main/java/com/app/groupdeal/application/group/service/GroupQueueService.java b/src/main/java/com/app/groupdeal/application/group/service/GroupQueueService.java index 22dc38a..e4dca30 100644 --- a/src/main/java/com/app/groupdeal/application/group/service/GroupQueueService.java +++ b/src/main/java/com/app/groupdeal/application/group/service/GroupQueueService.java @@ -1,11 +1,13 @@ package com.app.groupdeal.application.group.service; +import com.app.groupdeal.application.group.dto.QueueResult; import com.app.groupdeal.domain.group.model.Group; import com.app.groupdeal.domain.group.repository.GroupRepository; import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Service; import java.time.Duration; @@ -17,6 +19,46 @@ public class GroupQueueService { private final StringRedisTemplate redisTemplate; + private final RedisScript queueAndPublishScript; + + + @PostConstruct + public void init() { + } + + public QueueResult issueQueueNumberWithEvent( + Long groupId, + Long userId, + String nickname, + Integer maxParticipants) { + + String queueKey = "group:" + groupId + ":queue"; + String streamKey = "group:join:stream"; + + List result = redisTemplate.execute( + queueAndPublishScript, + List.of(queueKey, streamKey), + groupId.toString(), + userId.toString(), + nickname, + maxParticipants.toString() + ); + + String status = (String) result.get(0); + Long queueNumber = Long.parseLong(result.get(1).toString()); + + if ("FULL".equals(status)) { + log.warn("❌ [그룹 {}] [유저 {}] 정원 초과 (순번: {})", + groupId, userId, queueNumber); + return QueueResult.full(queueNumber); + } + + String eventId = (String) result.get(2); + log.info("✅ [그룹 {}] [유저 {}] 순번 발급 + queueNumber: {} (eventId: {})", + groupId, userId, queueNumber, eventId); + + return QueueResult.success(queueNumber, eventId); + } public Long issueQueueNumber(Long groupId) { @@ -47,4 +89,4 @@ public void resetQueue(Long groupId) { redisTemplate.delete(key); log.info("🗑️ [그룹 {}] Redis 순번 삭제", groupId); } -} \ No newline at end of file +} diff --git a/src/main/java/com/app/groupdeal/domain/group/model/GroupMember.java b/src/main/java/com/app/groupdeal/domain/group/model/GroupMember.java index 6e57207..06c1345 100644 --- a/src/main/java/com/app/groupdeal/domain/group/model/GroupMember.java +++ b/src/main/java/com/app/groupdeal/domain/group/model/GroupMember.java @@ -23,6 +23,10 @@ public class GroupMember extends BaseDomain { private LocalDateTime joinedAt; private LocalDateTime leftAt; + public Long getId() { + return this.groupMemberId; + } + @Builder public GroupMember(Long groupMemberId, Long groupId, Long userId, String nickname, GroupMemberType groupMemberType, diff --git a/src/main/java/com/app/groupdeal/global/config/RedisConfig.java b/src/main/java/com/app/groupdeal/global/config/RedisConfig.java index 2ced909..8368552 100644 --- a/src/main/java/com/app/groupdeal/global/config/RedisConfig.java +++ b/src/main/java/com/app/groupdeal/global/config/RedisConfig.java @@ -4,12 +4,46 @@ import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.script.RedisScript; + +import java.util.List; @Configuration public class RedisConfig { + @Bean + public RedisScript queueAndPublishScript() { + String script = """ + local queue_key = KEYS[1] + local stream_key = KEYS[2] + local group_id = ARGV[1] + local user_id = ARGV[2] + local nickname = ARGV[3] + local max_participants = tonumber(ARGV[4]) + + local queue_num = redis.call('INCR', queue_key) + + if queue_num > max_participants then + redis.call('DECR', queue_key) + return { 'FULL', queue_num - 1 } + end + + local event_id = redis.call('XADD', stream_key, '*', + 'groupId', group_id, + 'userId', user_id, + 'nickname', nickname, + 'queueNumber', queue_num, + 'timestamp', redis.call('TIME')[1] + ) + + return { 'SUCCESS', queue_num, event_id } + """; + + return RedisScript.of(script, List.class); + } + @Bean public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } -} \ No newline at end of file +} diff --git a/src/main/java/com/app/groupdeal/presentation/group/controller/GroupController.java b/src/main/java/com/app/groupdeal/presentation/group/controller/GroupController.java index d84ba15..5e41a61 100644 --- a/src/main/java/com/app/groupdeal/presentation/group/controller/GroupController.java +++ b/src/main/java/com/app/groupdeal/presentation/group/controller/GroupController.java @@ -63,7 +63,7 @@ public ResponseEntity> joinGroupTest( @PathVariable Long groupId, @RequestParam Long userId, @RequestParam String nickname){ - JoinGroupResponseDto response = groupFacadeService.joinGroup(groupId, userId, nickname); + JoinGroupResponseDto response = groupFacadeService.joinGroupWithEvent(groupId, userId, nickname); return ResponseEntity.ok(ApiResponse.success(response)); } diff --git a/src/main/java/com/app/groupdeal/presentation/group/dto/JoinGroupResponseDto.java b/src/main/java/com/app/groupdeal/presentation/group/dto/JoinGroupResponseDto.java index 0b35276..386b681 100644 --- a/src/main/java/com/app/groupdeal/presentation/group/dto/JoinGroupResponseDto.java +++ b/src/main/java/com/app/groupdeal/presentation/group/dto/JoinGroupResponseDto.java @@ -25,6 +25,9 @@ public class JoinGroupResponseDto { private Integer currentParticipants; private Integer progressRate; + private ProcessingStatus processingStatus; + private String message; + private MemberInfo joinMember; @Getter @@ -40,6 +43,11 @@ public static class MemberInfo { private LocalDateTime joinedAt; } + public enum ProcessingStatus { + PENDING, + COMPLETED + } + public static JoinGroupResponseDto ofWithQueue(Group group, GroupMember groupMember) { return JoinGroupResponseDto.builder() .groupId(group.getGroupId()) @@ -48,6 +56,7 @@ public static JoinGroupResponseDto ofWithQueue(Group group, GroupMember groupMem .targetParticipants(group.getTargetParticipants()) .currentParticipants(group.getCurrentParticipants()) .progressRate(group.calculateProgressRate()) + .processingStatus(ProcessingStatus.COMPLETED) .joinMember(MemberInfo.builder() .userId(groupMember.getUserId()) .nickname(groupMember.getNickname()) @@ -58,6 +67,30 @@ public static JoinGroupResponseDto ofWithQueue(Group group, GroupMember groupMem .build(); } + public static JoinGroupResponseDto pending( + Group group, + Long userId, + String nickname, + Long queueNumber) { + return JoinGroupResponseDto.builder() + .groupId(group.getGroupId()) + .productName(group.getProductName()) + .status(group.getStatus()) + .targetParticipants(group.getTargetParticipants()) + .currentParticipants(group.getCurrentParticipants()) + .progressRate(group.calculateProgressRate()) + .processingStatus(ProcessingStatus.PENDING) + .message("참여 요청이 접수되었습니다") + .joinMember(MemberInfo.builder() + .userId(userId) + .nickname(nickname) + .memberType(null) + .queueNumber(queueNumber.intValue()) + .joinedAt(null) + .build()) + .build(); + } + public static JoinGroupResponseDto of(Group group, GroupMember groupMember) { return JoinGroupResponseDto.builder() .groupId(group.getGroupId()) @@ -66,6 +99,7 @@ public static JoinGroupResponseDto of(Group group, GroupMember groupMember) { .targetParticipants(group.getTargetParticipants()) .currentParticipants(group.getCurrentParticipants()) .progressRate(group.calculateProgressRate()) + .processingStatus(ProcessingStatus.COMPLETED) .joinMember(MemberInfo.builder() .userId(groupMember.getUserId()) .nickname(groupMember.getNickname())