Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions db/mysql/init/02-data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ VALUES ('[email protected]', '$2a$10$FAkvDPjLjmwJhd9o6YDStePThk5E1ZKHNNFFcjKqSFj.Vz
('[email protected]', '$2a$10$FAkvDPjLjmwJhd9o6YDStePThk5E1ZKHNNFFcjKqSFj.VzF6qbWQa', '멤버1', NOW(), NOW(), 'system', 'system');

INSERT INTO `groups` (product_name, category, description, original_price, target_participants, divided_unit, recruitment_minutes, deadline_at, meeting_location, meeting_at, host_member_id, host_member_name, status, current_participants, created_time, updated_time, created_by, updated_by)
VALUES ('코스트코 견과류 믹스 3kg', '식품', '신선한 견과류입니다.', 30000, 102, '500g씩', 1440, NOW() + INTERVAL 1 DAY, '강남역 3번 출구', NOW() + INTERVAL 2 DAY, 1, '호스트1', 'RECRUITING', 2, NOW(), NOW(), 'system', 'system');
VALUES ('코스트코 견과류 믹스 3kg', '식품', '신선한 견과류입니다.', 30000, 4, '500g씩', 1440, NOW() + INTERVAL 1 DAY, '강남역 3번 출구', NOW() + INTERVAL 2 DAY, 1, '호스트1', 'RECRUITING', 2, NOW(), NOW(), 'system', 'system');

INSERT INTO group_members (group_id, user_id, nickname, group_member_type, group_member_status, queue_number, joined_at, created_time, updated_time, created_by, updated_by)
VALUES (1, 1, '호스트1', 'HOST', 'JOINED', 1, NOW(), NOW(), NOW(), 'system', 'system'),
Expand All @@ -18,7 +18,7 @@ DROP PROCEDURE IF EXISTS generate_dummy_users$$
CREATE PROCEDURE generate_dummy_users()
BEGIN
DECLARE i INT DEFAULT 1;
WHILE i <= 5000 DO
WHILE i <= 5 DO
INSERT INTO users (email, password, nickname, created_time, updated_time, created_by, updated_by)
VALUES (
CONCAT('test', i, '@test.com'),
Expand Down
24 changes: 17 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ services:
ports:
- "8080:8080"
depends_on:
- mysql
- redis
mysql:
condition: service_healthy
redis:
condition: service_started
restart: on-failure

mysql:
image: mysql:8.0
Expand All @@ -44,18 +47,24 @@ services:
volumes:
- ./db/mysql/data:/var/lib/mysql
- ./db/mysql/init:/docker-entrypoint-initdb.d
healthcheck:
test: [ "CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-p1234" ]
interval: 5s
timeout: 3s
retries: 10
start_period: 40s

redis:
image: redis:7-alpine
container_name: redis
deploy:
resources:
limits:
cpus: '0.5'
memory: 512M
cpus: '1.0'
memory: 1024M
reservations:
cpus: '0.25'
memory: 256M
cpus: '1.0'
memory: 1024M
restart: always
ports:
- "6379:6379"
Expand All @@ -72,4 +81,5 @@ services:
ports:
- "9104:9104"
depends_on:
- mysql
mysql:
condition: service_healthy
2 changes: 1 addition & 1 deletion script/k6/script.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export let options = {
scenarios: {
concurrent_join: {
executor: 'per-vu-iterations',
vus: 5000,
vus: 5,
iterations: 1,
},
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package com.app.groupdeal.application.group.consumer;

import com.app.groupdeal.application.group.service.GroupMemberService;
import com.app.groupdeal.application.group.service.GroupService;
import com.app.groupdeal.domain.group.model.GroupMember;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.time.Duration;
import java.util.List;
import java.util.Map;


@Slf4j
@Component
@RequiredArgsConstructor
public class GroupJoinEventConsumer {

private final GroupMemberService groupMemberService;
private final GroupService groupService;
private final StringRedisTemplate redisTemplate;

private static final String STREAM_KEY = "group:join:stream";
private static final String CONSUMER_GROUP = "group-join-processor";
private static final String CONSUMER_NAME = "worker-1";
Comment on lines +30 to +32

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consume 속도가 이벤트가 많다면 느릴 것입니다.
고려해야할 사항:

  1. Consume 속도를 정확히 측정한다.
  2. Redis Streams 파티션과 워커의 개념을 인지하고, 다중 파티션과 다중 워커의 구조를 바꿔보는 것을 추천합니다.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

지금 PR 범위는 아닙니다!


@PostConstruct
public void init() {
createConsumerGroupIfNotExists();

Thread consumerThread = new Thread(() -> startConsuming(), "event-consumer-worker-1");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CONSUMER_NAME과 이 컨슈머를 매핑한 스레드 간의 이름이 헷갈릴 여지가 있어서 둘다 상수로 관리하면 어떨까요?

아니면 나중에 다중 consumer를 고려하여, 스레드 이름을 짓는건 별도 buildConsumerThreadName() 같은 메서드를 추출해도 괜찮아보이네요.

consumerThread.setDaemon(false);
consumerThread.start();

log.info("✅ Consumer 스레드 시작 완료");
}

private void createConsumerGroupIfNotExists() {
try {
redisTemplate.opsForStream().createGroup(STREAM_KEY, CONSUMER_GROUP);
log.info("✅ Consumer Group 생성: {}", CONSUMER_GROUP);
} catch (Exception e) {
log.info("ℹ️ Consumer Group 이미 존재: {}", CONSUMER_GROUP);
}
}

private void startConsuming() {
log.info("🚀 이벤트 소비 시작: {}", CONSUMER_NAME);

while (!Thread.currentThread().isInterrupted()) {
try {
consumeBatch();
} catch (Exception e) {
log.error("❌ 이벤트 소비 중 오류", e);
sleep(1000);
}
}
}

private void consumeBatch() {
List<MapRecord<String, Object, Object>> records =
redisTemplate.opsForStream().read(
Consumer.from(CONSUMER_GROUP, CONSUMER_NAME),
StreamReadOptions.empty()
.count(10)
.block(Duration.ofSeconds(2)),
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed())
Comment on lines +71 to +74
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

요런 옵션들은 따로 필드 레벨로 분리되면 좋겠습니다.

);

if (records != null && !records.isEmpty()) {
log.info("📥 이벤트 {} 건 수신", records.size());

for (MapRecord<String, Object, Object> record : records) {
processEvent(record);
}
Comment on lines +80 to +82
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Record는 Java DTO Class로 정의할 수는 없을까요?

}
}

@Transactional
protected void processEvent(MapRecord<String, Object, Object> record) {
String eventId = record.getId().getValue();

try {
Map<Object, Object> event = record.getValue();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map<Object, Object> 대신 Java DTO Class로 정의할 수는 없을까요?


Long groupId = Long.parseLong((String) event.get("groupId"));
Long userId = Long.parseLong((String) event.get("userId"));
String nickname = (String) event.get("nickname");
Integer queueNumber = Integer.parseInt((String) event.get("queueNumber"));

log.info("⚙️ [이벤트 처리 시작] eventId={}, groupId={}, userId={}, queueNumber={}",
eventId, groupId, userId, queueNumber);

GroupMember member = groupMemberService.joinGroup(groupId, userId, nickname, queueNumber);
groupService.increaseParticipant(groupId);

log.info("✅ [이벤트 처리 완료] eventId={}, userId={}, queueNumber={}, GroupMemberId={}",
eventId, member.getUserId(), member.getQueueNumber(), member.getGroupMemberId());

redisTemplate.opsForStream().acknowledge(STREAM_KEY, CONSUMER_GROUP, record.getId());

} catch (Exception e) {
log.error("❌ [이벤트 처리 실패] eventId={}", eventId, e);
handleFailedEvent(record, e);
}
}

private void handleFailedEvent(MapRecord<String, Object, Object> record, Exception e) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이벤트 처리가 실패하면 곧바로 DLQ로 옮기기보다는 재시도 가능한 예외는 재시도할 수 있지 않을까요?

log.warn("⚠️ 실패 이벤트를 DLQ로 이동: {}", record.getId().getValue());

redisTemplate.opsForStream().add(
"group:join:dlq",
Map.of(
"originalEventId", record.getId().getValue(),
"error", e.getMessage(),
"data", record.getValue().toString()
)
);

redisTemplate.opsForStream().acknowledge(STREAM_KEY, CONSUMER_GROUP, record.getId());
}

private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("⚠️ Consumer 중단됨");
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.app.groupdeal.application.group.dto;

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public class QueueResult {
private boolean success;
private Long queueNumber;
private String eventId;

public static QueueResult success(Long queueNumber, String eventId) {
return new QueueResult(true, queueNumber, eventId);
}

public static QueueResult full(Long queueNumber) {
return new QueueResult(false, queueNumber, null);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.app.groupdeal.application.group.facade;

import com.app.groupdeal.application.group.dto.QueueResult;
import com.app.groupdeal.application.group.service.GroupMemberService;
import com.app.groupdeal.application.group.service.GroupQueueService;
import com.app.groupdeal.application.group.service.GroupService;
Expand Down Expand Up @@ -116,4 +117,32 @@ public JoinGroupResponseDto joinGroup(Long groupId, Long userId, String nickname
}
}

@Transactional
public JoinGroupResponseDto joinGroupWithEvent(Long groupId, Long userId, String nickname) {

Group group = groupService.findById(groupId);
group.validateJoinable();

QueueResult result = queueService.issueQueueNumberWithEvent(
groupId,
userId,
nickname,
group.getTargetParticipants()
);

if (!result.isSuccess()) {
throw new BusinessException(ErrorType.GROUP_FULL);
}

log.info("✅ [유저 {}] 참여 요청 접수 (순번: {}, 이벤트: {})",
userId, result.getQueueNumber(), result.getEventId());

return JoinGroupResponseDto.pending(
group,
userId,
nickname,
result.getQueueNumber()
);
}

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.app.groupdeal.application.group.service;

import com.app.groupdeal.application.group.dto.QueueResult;
import com.app.groupdeal.domain.group.model.Group;
import com.app.groupdeal.domain.group.repository.GroupRepository;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Service;

import java.time.Duration;
Expand All @@ -17,6 +19,46 @@
public class GroupQueueService {

private final StringRedisTemplate redisTemplate;
private final RedisScript<List> queueAndPublishScript;


@PostConstruct
public void init() {
}

public QueueResult issueQueueNumberWithEvent(
Long groupId,
Long userId,
String nickname,
Integer maxParticipants) {

String queueKey = "group:" + groupId + ":queue";
String streamKey = "group:join:stream";
Comment on lines +35 to +36
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GroupJoinEventConsumer에 이미 streamKey 상수가 있으니 활용되면 좋겠습니다.


List<Object> result = redisTemplate.execute(
queueAndPublishScript,
List.of(queueKey, streamKey),
groupId.toString(),
userId.toString(),
nickname,
maxParticipants.toString()
);

String status = (String) result.get(0);
Long queueNumber = Long.parseLong(result.get(1).toString());

if ("FULL".equals(status)) {
log.warn("❌ [그룹 {}] [유저 {}] 정원 초과 (순번: {})",
groupId, userId, queueNumber);
return QueueResult.full(queueNumber);
}

String eventId = (String) result.get(2);
log.info("✅ [그룹 {}] [유저 {}] 순번 발급 + queueNumber: {} (eventId: {})",
groupId, userId, queueNumber, eventId);

return QueueResult.success(queueNumber, eventId);
}


public Long issueQueueNumber(Long groupId) {
Expand Down Expand Up @@ -47,4 +89,4 @@ public void resetQueue(Long groupId) {
redisTemplate.delete(key);
log.info("🗑️ [그룹 {}] Redis 순번 삭제", groupId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ public class GroupMember extends BaseDomain {
private LocalDateTime joinedAt;
private LocalDateTime leftAt;

public Long getId() {
return this.groupMemberId;
}


@Builder
public GroupMember(Long groupMemberId, Long groupId, Long userId, String nickname, GroupMemberType groupMemberType,
Expand Down
Loading