11package org .ezcode .codetest .application .submission .service ;
22
33import java .util .List ;
4+ import java .util .Optional ;
45import java .util .UUID ;
56import java .util .concurrent .CompletableFuture ;
67import java .util .concurrent .Executor ;
1415import org .ezcode .codetest .application .submission .model .ReviewResult ;
1516import org .ezcode .codetest .application .submission .model .SubmissionContext ;
1617import org .ezcode .codetest .application .submission .port .ExceptionNotifier ;
18+ import org .ezcode .codetest .application .submission .port .LockManager ;
1719import org .ezcode .codetest .application .submission .port .QueueProducer ;
1820import org .ezcode .codetest .domain .submission .exception .SubmissionException ;
1921import org .ezcode .codetest .domain .submission .exception .code .SubmissionExceptionCode ;
@@ -61,24 +63,21 @@ public class SubmissionService {
6163 private final QueueProducer queueProducer ;
6264 private final Executor judgeTestcaseExecutor ;
6365 private final ExceptionNotifier exceptionNotifier ;
66+ private final LockManager lockManager ;
6467
6568 public SseEmitter enqueueCodeSubmission (Long problemId , CodeSubmitRequest request , AuthUser authUser ) {
69+
70+ boolean acquired = lockManager .tryLock (authUser .getId (), problemId );
71+
72+ if (!acquired ) {
73+ throw new SubmissionException (SubmissionExceptionCode .ALREADY_JUDGING );
74+ }
75+
6676 SseEmitter emitter = new SseEmitter (10 * 60 * 1000L );
6777 String emitterKey = authUser .getId () + "_" + UUID .randomUUID ();
6878
69- emitter .onCompletion (() -> log .info ("[SSE 완료] 정상 종료됨" ));
70- emitter .onTimeout (() -> {
71- log .warn ("[SSE 타임아웃] 연결 시간이 초과되었습니다" );
72- emitter .completeWithError (new SubmissionException (SubmissionExceptionCode .EMITTER_SEND_ERROR ));
73- emitterStore .remove (emitterKey );
74- });
75- emitter .onError (e -> {
76- log .error ("[SSE 에러 발생] 예외: {}" , e .toString (), e );
77- emitterStore .remove (emitterKey );
78- });
79-
8079 log .info ("[SSE 저장] emitterKey: {}" , emitterKey );
81- emitterStore .save (emitterKey , emitter );
80+ emitterStore .saveWithCallbacks (emitterKey , emitter );
8281
8382 queueProducer .enqueue (
8483 new SubmissionMessage (emitterKey , problemId , request .languageId (), authUser .getId (), request .sourceCode ())
@@ -95,49 +94,24 @@ public void submitCodeStream(SubmissionMessage msg) {
9594 User user = userDomainService .getUserById (msg .userId ());
9695 Language language = languageDomainService .getLanguage (msg .languageId ());
9796 ProblemInfo problemInfo = problemDomainService .getProblemInfo (msg .problemId ());
98- SseEmitter emitter = emitterStore .get (msg .emitterKey ()).orElseThrow (
99- () -> new SubmissionException (SubmissionExceptionCode .EMITTER_NOT_FOUND )
100- );
97+ SseEmitter emitter = emitterStore .getOrElseThrow (msg .emitterKey ());
10198
10299 int totalTestcaseCount = problemInfo .getTestcaseCount ();
103100 SubmissionContext context = SubmissionContext .initialize (totalTestcaseCount );
104101
105102 for (Testcase tc : problemInfo .testcaseList ()) {
106- CompletableFuture .runAsync (() -> {
107- try {
108- log .info ("[Judge RUN] Thread = {}" , Thread .currentThread ().getName ());
109- String token = judgeClient .submitAndGetToken (
110- new CodeCompileRequest (msg .sourceCode (), language .getJudge0Id (), tc .getInput ())
111- );
112- JudgeResult result = judgeClient .pollUntilDone (token );
113-
114- AnswerEvaluation evaluation = submissionDomainService .handleEvaluationAndUpdateStats (
115- TestcaseEvaluationInput .from (tc , result ), problemInfo , context
116- );
117- emitter .send (JudgeResultResponse .fromEvaluation (result , evaluation ));
118- } catch (Exception e ) {
119- if (context .notified ().compareAndSet (false , true )) {
120- emitter .completeWithError (e );
121- emitterStore .remove (msg .emitterKey ());
122- exceptionNotificationHelper (e );
123- }
124- } finally {
125- context .countDown ();
126- }
127- }, judgeTestcaseExecutor );
103+ runTestcaseAsync (tc , msg , language .getJudge0Id (), problemInfo , context , emitter );
128104 }
129105
130106 if (!context .latch ().await (60 , TimeUnit .SECONDS )) {
131107 emitter .completeWithError (new SubmissionException (SubmissionExceptionCode .TESTCASE_TIMEOUT ));
132- emitterStore .remove (msg .emitterKey ());
133108 return ;
134109 }
135110
136111 emitter .send (SseEmitter .event ()
137112 .name ("final" )
138113 .data (context .toFinalResult (totalTestcaseCount )));
139- emitter .complete ();
140- emitterStore .remove (msg .emitterKey ());
114+ emitter .complete ();
141115
142116 SubmissionData submissionData = SubmissionData .base (
143117 user , problemInfo , language , msg .sourceCode (), context .getCurrentMessage ()
@@ -148,8 +122,10 @@ public void submitCodeStream(SubmissionMessage msg) {
148122 );
149123 } catch (Exception e ) {
150124 emitterStore .get (msg .emitterKey ()).ifPresent (emitter -> emitter .completeWithError (e ));
151- emitterStore .remove (msg .emitterKey ());
152125 exceptionNotificationHelper (e );
126+ } finally {
127+ emitterStore .remove (msg .emitterKey ());
128+ lockManager .releaseLock (msg .userId (), msg .problemId ());
153129 }
154130 }
155131
@@ -162,7 +138,6 @@ public List<GroupedSubmissionResponse> getSubmissions(AuthUser authUser) {
162138 }
163139
164140 public CodeReviewResponse getCodeReview (Long problemId , CodeReviewRequest request ) {
165-
166141 Problem problem = problemDomainService .getProblem (problemId );
167142 Language language = languageDomainService .getLanguage (request .languageId ());
168143
@@ -171,8 +146,36 @@ public CodeReviewResponse getCodeReview(Long problemId, CodeReviewRequest reques
171146 return new CodeReviewResponse (reviewResult .reviewContent ());
172147 }
173148
174- private void exceptionNotificationHelper (Exception e ) {
175- if (e instanceof SubmissionException se ) {
149+ private void runTestcaseAsync (
150+ Testcase tc , SubmissionMessage msg , Long judge0Id ,
151+ ProblemInfo problemInfo , SubmissionContext context , SseEmitter emitter
152+ ) {
153+ CompletableFuture .runAsync (() -> {
154+ try {
155+ log .info ("[Judge RUN] Thread = {}" , Thread .currentThread ().getName ());
156+ String token = judgeClient .submitAndGetToken (
157+ new CodeCompileRequest (msg .sourceCode (), judge0Id , tc .getInput ())
158+ );
159+ JudgeResult result = judgeClient .pollUntilDone (token );
160+
161+ AnswerEvaluation evaluation = submissionDomainService .handleEvaluationAndUpdateStats (
162+ TestcaseEvaluationInput .from (tc , result ), problemInfo , context
163+ );
164+ emitter .send (JudgeResultResponse .fromEvaluation (result , evaluation ));
165+ } catch (Exception e ) {
166+ if (context .notified ().compareAndSet (false , true )) {
167+ emitter .completeWithError (e );
168+ emitterStore .remove (msg .emitterKey ());
169+ exceptionNotificationHelper (e );
170+ }
171+ } finally {
172+ context .countDown ();
173+ }
174+ }, judgeTestcaseExecutor );
175+ }
176+
177+ private void exceptionNotificationHelper (Throwable t ) {
178+ if (t instanceof SubmissionException se ) {
176179 var code = se .getResponseCode ();
177180 exceptionNotifier .sendEmbed (
178181 "채점 예외" ,
@@ -192,10 +195,9 @@ private void exceptionNotificationHelper(Exception e) {
192195 • 성공 여부: false
193196 • 상태코드: 500
194197 • 메시지: %s
195- """ .formatted (e . getMessage ()),
198+ """ .formatted (Optional . ofNullable ( t . getMessage ()). orElse ( "No message" )),
196199 "submitCodeStream"
197200 );
198201 }
199-
200202 }
201203}
0 commit comments