From 27f977c47437c9710621a5f4200e2f8b91f179e7 Mon Sep 17 00:00:00 2001 From: u$f Date: Wed, 22 May 2024 18:49:31 +0200 Subject: [PATCH] edit --- pom.xml | 2 +- .../core/ScheduledSessionDispatcher.java | 49 ++++++++++++------- .../java/org/usf/traceapi/core/Session.java | 14 +++--- 3 files changed, 39 insertions(+), 26 deletions(-) diff --git a/pom.xml b/pom.xml index dea8aad..9922b96 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 io.github.oneteme.traceapi traceapi-core - 0.0.20 + 0.0.21 jar traceapi-core traceapi-core diff --git a/src/main/java/org/usf/traceapi/core/ScheduledSessionDispatcher.java b/src/main/java/org/usf/traceapi/core/ScheduledSessionDispatcher.java index 63a9497..f7e5534 100644 --- a/src/main/java/org/usf/traceapi/core/ScheduledSessionDispatcher.java +++ b/src/main/java/org/usf/traceapi/core/ScheduledSessionDispatcher.java @@ -12,8 +12,10 @@ import static org.usf.traceapi.core.State.DISPACH; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -50,12 +52,12 @@ public ScheduledSessionDispatcher(SessionDispatcherProperties properties, Predic public boolean add(Session... sessions) { if(state != DISABLE) { // CACHE | DISPATCH - safeQueue(q-> addAll(q, sessions)); + doSync(q-> addAll(q, sessions)); log.trace("{} sessions buffered", queue.size()); return true; } else { - log.warn("{} sessions rejected because dispatcher.state={}", sessions.length, state); + log.warn("{} sessions rejected, dispatcher.state={}", sessions.length, state); return false; } } @@ -84,25 +86,26 @@ private void dispatch() { } catch (Exception e) {// do not throw exception : retry later log.warn("error while dispatching {} sessions, attempts={} because : {}", cs.size(), attempts, e.getMessage()); //do not log exception stack trace - safeQueue(q-> { - q.addAll(0, cs); + } + if(attempts > 0) { //exception | !dispatch + doSync(q-> { + q.addAll(0, cs); if(properties.getBufferMaxSize() > -1 && q.size() > properties.getBufferMaxSize()) { var diff = q.size() - properties.getBufferMaxSize(); q.subList(properties.getBufferMaxSize(), cs.size()).clear(); //remove exceeding cache sessions (LIFO) log.warn("{} last sessions have been removed from buffer", diff); } - return null; }); - } + } } } public List peekSessions() { - return safeQueue(q-> { + return applySync(q-> { if(q.isEmpty()) { return emptyList(); } - var s = queue.stream(); + var s = q.stream(); if(nonNull(filter)) { s = s.filter(filter); } @@ -110,21 +113,21 @@ public List peekSessions() { }); } - public List popSessions() { - return safeQueue(q-> { + List popSessions() { + return applySync(q-> { if(q.isEmpty()) { return emptyList(); } if(isNull(filter)) { - var c = new ArrayList<>(q); + var c = new SessionList(q); q.clear(); return c; } var c = new SessionList(q.size()); for(var it=q.iterator(); it.hasNext();) { - var s = it.next(); - if(filter.test(s)) { - c.add(s); + var o = it.next(); + if(filter.test(o)) { + c.add(o); it.remove(); } } @@ -132,12 +135,18 @@ public List popSessions() { }); } - private T safeQueue(Function, T> queueFn) { + private void doSync(Consumer> cons) { + synchronized(queue){ + cons.accept(queue); + } + } + + private T applySync(Function, T> fn) { synchronized(queue){ - return queueFn.apply(queue); + return fn.apply(queue); } } - + public void shutdown() throws InterruptedException { updateState(DISABLE); //stop add Sessions log.info("shutting down scheduler service"); @@ -146,7 +155,7 @@ public void shutdown() throws InterruptedException { while(!executor.awaitTermination(5, SECONDS)); //wait for last save complete } finally { - dispatch(); + tryDispatch(); } } @@ -161,6 +170,10 @@ public SessionList() { public SessionList(int initialCapacity) { super(initialCapacity); } + + public SessionList(Collection c) { + super(c); + } } @FunctionalInterface diff --git a/src/main/java/org/usf/traceapi/core/Session.java b/src/main/java/org/usf/traceapi/core/Session.java index 90fc8b0..1739717 100644 --- a/src/main/java/org/usf/traceapi/core/Session.java +++ b/src/main/java/org/usf/traceapi/core/Session.java @@ -42,16 +42,16 @@ default void lock(){ } default void unlock() { - if(getLock().get() > 0) { - getLock().decrementAndGet(); - } - else { - log.warn("no more lock"); - } + getLock().decrementAndGet(); } default boolean wasCompleted() { - return getLock().get() == 0; + var c = getLock().get(); + if(c < 0) { + log.warn("illegal session lock state={}, {}", c, this); + return true; + } + return c == 0; } static String nextId() {