Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;

Expand Down Expand Up @@ -193,5 +194,10 @@ public void addImageToExistingProblem(Long problemId, MultipartFile imageFile) {
problemDomainService.saveProblem(problem);
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void problemCountAdjustment(Long problemId, boolean isSolved) {
int correctInc = isSolved ? 1 : 0;
problemDomainService.problemCountAdjustment(problemId, correctInc);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.ezcode.codetest.application.submission.dto.event;

public record ProblemCountAdjustmentEvent(

Long problemId,

boolean isSolved

) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,8 @@ public String getLanguageVersion() {
public String getUserEmail() {
return user.getEmail();
}

public Long getProblemId() {
return getProblem().getId();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.ezcode.codetest.application.submission.port;

import org.ezcode.codetest.application.submission.dto.event.GitPushStatusEvent;
import org.ezcode.codetest.application.submission.dto.event.ProblemCountAdjustmentEvent;
import org.ezcode.codetest.application.submission.dto.event.SubmissionErrorEvent;
import org.ezcode.codetest.application.submission.dto.event.SubmissionJudgingFinishedEvent;
import org.ezcode.codetest.application.submission.dto.event.TestcaseListInitializedEvent;
Expand All @@ -17,4 +18,6 @@ public interface SubmissionEventService {
void publishSubmissionError(SubmissionErrorEvent event);

void publishGitPushStatus(GitPushStatusEvent event);

void publishProblemCountAdjustment(ProblemCountAdjustmentEvent event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.ezcode.codetest.application.submission.dto.event.ProblemCountAdjustmentEvent;
import org.ezcode.codetest.application.submission.dto.event.SubmissionErrorEvent;
import org.ezcode.codetest.application.submission.dto.event.SubmissionJudgingFinishedEvent;
import org.ezcode.codetest.application.submission.dto.event.TestcaseEvaluatedEvent;
Expand All @@ -24,6 +25,7 @@
import org.ezcode.codetest.domain.submission.model.TestcaseEvaluationInput;
import org.ezcode.codetest.domain.submission.service.SubmissionDomainService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -56,11 +58,13 @@ public void runTestcases(SubmissionContext ctx) throws InterruptedException {
}
}

@Transactional
public void finalizeAndPublish(SubmissionContext ctx) {
SubmissionResult submissionResult = submissionDomainService.finalizeSubmission(ctx);

publishFinalResult(ctx);
publishProblemSolve(submissionResult);
publishProblemCountAdjustment(ctx, submissionResult);
}

public void publishSubmissionError(String sessionKey, Exception e) {
Expand Down Expand Up @@ -103,4 +107,10 @@ private void publishFinalResult(SubmissionContext ctx){
private void publishProblemSolve(SubmissionResult submissionResult) {
problemEventService.publishProblemSolveEvent(submissionResult);
}

private void publishProblemCountAdjustment(SubmissionContext ctx, SubmissionResult submissionResult) {
submissionEventService.publishProblemCountAdjustment(
new ProblemCountAdjustmentEvent(ctx.getProblemId(), submissionResult.isSolved())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public void enqueueCodeSubmission(Long problemId, CodeSubmitRequest request, Aut
}

@Async("judgeSubmissionExecutor")
@Transactional
public void processSubmissionAsync(SubmissionMessage msg) {
try {
log.info("[Submission RUN] Thread = {}", Thread.currentThread().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ public class ExecutorConfig {
@Bean(name = "consumerExecutor")
public Executor consumerExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setCorePoolSize(40);
executor.setMaxPoolSize(80);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("consumer-");
executor.initialize();
return executor;
Expand All @@ -25,9 +25,9 @@ public Executor consumerExecutor() {
@Bean(name = "judgeSubmissionExecutor")
public Executor judgeSubmissionExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setCorePoolSize(40);
executor.setMaxPoolSize(80);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("submission-");
executor.initialize();
return executor;
Expand All @@ -36,9 +36,9 @@ public Executor judgeSubmissionExecutor() {
@Bean(name = "judgeTestcaseExecutor")
public Executor judgeTestcaseExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(25);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(500);
executor.setCorePoolSize(40);
executor.setMaxPoolSize(80);
executor.setQueueCapacity(2000);
executor.setThreadNamePrefix("testcase-");
executor.initialize();
return executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ public interface ProblemRepository {
Optional<Problem> findProblemWithTestcasesById(Long problemId);

boolean existsByTitleAndIsDeletedIsFalse(String title);

void problemCountAdjustment(Long problemId, int correctInc);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;

import lombok.RequiredArgsConstructor;

Expand Down Expand Up @@ -125,4 +126,8 @@ public ProblemInfo getProblemInfo(Long problemId) {
public void saveProblem(Problem problem) {
problemRepository.save(problem);
}

public void problemCountAdjustment(Long problemId, int correctInc) {
problemRepository.problemCountAdjustment(problemId, correctInc);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ public record SubmissionResult(
boolean hasBeenSolved

) {
public static SubmissionResult from(UserProblemResult result, List<String> problemCategory, boolean hasBeenSolved) {
public static SubmissionResult of(UserProblemResult upr, List<String> problemCategory, boolean allPassed) {
boolean before = upr.isCorrect();
boolean now = allPassed && !before;

return SubmissionResult.builder()
.userId(result.getUser().getId())
.userId(upr.getUser().getId())
.problemCategory(problemCategory)
.isSolved(result.isCorrect())
.hasBeenSolved(hasBeenSolved)
.isSolved(now)
.hasBeenSolved(before)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,22 @@ public SubmissionResult finalizeSubmission(SubmissionContext ctx) {

boolean allPassed = ctx.isPassed();

return getUserProblemResult(ctx.user().getId(), ctx.getProblem().getId()).map(
result -> {
ctx.incrementTotalSubmissions();
if (!result.isCorrect() && allPassed) {
modifyUserProblemResult(result, true);
ctx.incrementCorrectSubmissions();
return SubmissionResult.from(result, ctx.getCategories(), false);
}
return SubmissionResult.from(result, ctx.getCategories(), true);
})
.orElseGet(() -> {
ctx.incrementTotalSubmissions();
if (allPassed) {
ctx.incrementCorrectSubmissions();
}
return SubmissionResult.from(createUserProblemResult(
UserProblemResult.builder()
.user(ctx.user())
.problem(ctx.getProblem())
.isCorrect(allPassed)
.build()
), ctx.getCategories(), false);
UserProblemResult upr = getUserProblemResult(ctx.user().getId(), ctx.getProblem().getId())
.map(existing -> {
if (!existing.isCorrect() && allPassed) {
modifyUserProblemResult(existing, true);
}
);
return existing;
})
.orElseGet(() -> createUserProblemResult(
UserProblemResult.builder()
.user(ctx.user())
.problem(ctx.getProblem())
.isCorrect(allPassed)
.build()
));

return SubmissionResult.of(upr, ctx.getCategories(), allPassed);
}

public boolean handleEvaluationAndUpdateStats(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package org.ezcode.codetest.infrastructure.event.config;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;

import org.ezcode.codetest.infrastructure.event.listener.RedisJudgeQueueConsumer;
Expand All @@ -14,10 +11,8 @@
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.connection.stream.Consumer;

import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
Expand All @@ -30,6 +25,9 @@ public class RedisStreamConfig {

private final RedisTemplate<String, String> redisTemplate;
private final Executor consumerExecutor;
private final RedisStreamConsumerRegistrar consumerRegistrar;

private StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;

@PostConstruct
public void initConsumerGroup() {
Expand All @@ -39,7 +37,7 @@ public void initConsumerGroup() {
if (Boolean.FALSE.equals(exists)) {
log.info("Redis Stream 'judge-queue'를 생성합니다.");
redisTemplate.opsForStream().add("judge-queue", Map.of(
"emitterKey", "dummy",
"sessionKey", "dummy",
"problemId", "0",
"languageId", "0",
"userId", "0",
Expand All @@ -53,21 +51,20 @@ public void initConsumerGroup() {
connection.xGroupDelConsumer(
"judge-queue".getBytes(),
"judge-group",
getConsumerName().replace("consumer-", "")
consumerRegistrar.getConsumerName()
);
return null;
});
} catch (Exception e) {
log.warn("DELCONSUMER 중 오류 발생: {}", e.getMessage());
log.warn("기존 컨슈머 삭제 중 오류 발생: {}", e.getMessage());
}

redisTemplate.opsForStream().createGroup("judge-queue", ReadOffset.latest(), "judge-group");

log.info("Redis Stream 'judge-queue'에 대한 Consumer Group 'judge-group'을 생성했습니다.");
} catch (Exception e) {
log.info("예외 발생: {}, 메시지: {}", e.getClass(), e.getMessage());
if (e.getCause() instanceof io.lettuce.core.RedisBusyException) {
log.info("Redis Consumer Group 'judge-group'이 이미 존재하여 생성을 건너뜁니다.");
log.info("이미 존재하는 Consumer Group이므로 생성을 생략합니다.");
} else {
log.error("Redis Consumer Group 초기화에 실패했습니다.", e);
throw e;
Expand All @@ -80,34 +77,20 @@ public StreamMessageListenerContainer<String, MapRecord<String, String, String>>
RedisConnectionFactory factory,
RedisJudgeQueueConsumer consumer
) {
StreamMessageListenerContainer
.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.executor(consumerExecutor)
.pollTimeout(Duration.ofSeconds(2))
.errorHandler(e ->
log.error("[Redis Listener] 예외 발생 - 컨테이너가 죽었을 수 있음", e))
.build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
StreamMessageListenerContainer.create(factory, options);

container.receive(
Consumer.from("judge-group", getConsumerName()),
StreamOffset.create("judge-queue", ReadOffset.lastConsumed()),
consumer
);
this.container = StreamMessageListenerContainer.create(factory, options);

container.start();
return container;
}
consumerRegistrar.registerConsumer(this.container, consumer);
this.container.start();

private String getConsumerName() {
try {
return "consumer-" + InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
log.warn("호스트명 확인 실패, UUID 사용: {}", e.getMessage());
return "consumer-" + UUID.randomUUID().toString().substring(0, 8);
}
return this.container;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.ezcode.codetest.infrastructure.event.config;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.UUID;

import org.ezcode.codetest.infrastructure.event.listener.RedisJudgeQueueConsumer;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Getter
@Component
public class RedisStreamConsumerRegistrar {

private final String consumerName;

public RedisStreamConsumerRegistrar() {
this.consumerName = initConsumerName();
}

public void registerConsumer(StreamMessageListenerContainer<String, MapRecord<String, String, String>> container,
RedisJudgeQueueConsumer consumer) {
container.receive(
Consumer.from("judge-group", consumerName),
StreamOffset.create("judge-queue", ReadOffset.lastConsumed()),
consumer
);
}

private String initConsumerName() {
try {
return "consumer-" + InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
log.warn("호스트명 확인 실패, UUID 사용: {}", e.getMessage());
return "consumer-" + UUID.randomUUID().toString().substring(0, 8);
}
}
}
Loading
Loading