diff --git a/pom.xml b/pom.xml
index dea8aad..9922b96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
io.github.oneteme.traceapi
traceapi-core
- 0.0.20
+ 0.0.21
jar
traceapi-core
traceapi-core
diff --git a/src/main/java/org/usf/traceapi/core/ScheduledSessionDispatcher.java b/src/main/java/org/usf/traceapi/core/ScheduledSessionDispatcher.java
index 63a9497..f7e5534 100644
--- a/src/main/java/org/usf/traceapi/core/ScheduledSessionDispatcher.java
+++ b/src/main/java/org/usf/traceapi/core/ScheduledSessionDispatcher.java
@@ -12,8 +12,10 @@
import static org.usf.traceapi.core.State.DISPACH;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -50,12 +52,12 @@ public ScheduledSessionDispatcher(SessionDispatcherProperties properties, Predic
public boolean add(Session... sessions) {
if(state != DISABLE) { // CACHE | DISPATCH
- safeQueue(q-> addAll(q, sessions));
+ doSync(q-> addAll(q, sessions));
log.trace("{} sessions buffered", queue.size());
return true;
}
else {
- log.warn("{} sessions rejected because dispatcher.state={}", sessions.length, state);
+ log.warn("{} sessions rejected, dispatcher.state={}", sessions.length, state);
return false;
}
}
@@ -84,25 +86,26 @@ private void dispatch() {
}
catch (Exception e) {// do not throw exception : retry later
log.warn("error while dispatching {} sessions, attempts={} because : {}", cs.size(), attempts, e.getMessage()); //do not log exception stack trace
- safeQueue(q-> {
- q.addAll(0, cs);
+ }
+ if(attempts > 0) { //exception | !dispatch
+ doSync(q-> {
+ q.addAll(0, cs);
if(properties.getBufferMaxSize() > -1 && q.size() > properties.getBufferMaxSize()) {
var diff = q.size() - properties.getBufferMaxSize();
q.subList(properties.getBufferMaxSize(), cs.size()).clear(); //remove exceeding cache sessions (LIFO)
log.warn("{} last sessions have been removed from buffer", diff);
}
- return null;
});
- }
+ }
}
}
public List peekSessions() {
- return safeQueue(q-> {
+ return applySync(q-> {
if(q.isEmpty()) {
return emptyList();
}
- var s = queue.stream();
+ var s = q.stream();
if(nonNull(filter)) {
s = s.filter(filter);
}
@@ -110,21 +113,21 @@ public List peekSessions() {
});
}
- public List popSessions() {
- return safeQueue(q-> {
+ List popSessions() {
+ return applySync(q-> {
if(q.isEmpty()) {
return emptyList();
}
if(isNull(filter)) {
- var c = new ArrayList<>(q);
+ var c = new SessionList(q);
q.clear();
return c;
}
var c = new SessionList(q.size());
for(var it=q.iterator(); it.hasNext();) {
- var s = it.next();
- if(filter.test(s)) {
- c.add(s);
+ var o = it.next();
+ if(filter.test(o)) {
+ c.add(o);
it.remove();
}
}
@@ -132,12 +135,18 @@ public List popSessions() {
});
}
- private T safeQueue(Function, T> queueFn) {
+ private void doSync(Consumer> cons) {
+ synchronized(queue){
+ cons.accept(queue);
+ }
+ }
+
+ private T applySync(Function, T> fn) {
synchronized(queue){
- return queueFn.apply(queue);
+ return fn.apply(queue);
}
}
-
+
public void shutdown() throws InterruptedException {
updateState(DISABLE); //stop add Sessions
log.info("shutting down scheduler service");
@@ -146,7 +155,7 @@ public void shutdown() throws InterruptedException {
while(!executor.awaitTermination(5, SECONDS)); //wait for last save complete
}
finally {
- dispatch();
+ tryDispatch();
}
}
@@ -161,6 +170,10 @@ public SessionList() {
public SessionList(int initialCapacity) {
super(initialCapacity);
}
+
+ public SessionList(Collection extends Session> c) {
+ super(c);
+ }
}
@FunctionalInterface
diff --git a/src/main/java/org/usf/traceapi/core/Session.java b/src/main/java/org/usf/traceapi/core/Session.java
index 90fc8b0..1739717 100644
--- a/src/main/java/org/usf/traceapi/core/Session.java
+++ b/src/main/java/org/usf/traceapi/core/Session.java
@@ -42,16 +42,16 @@ default void lock(){
}
default void unlock() {
- if(getLock().get() > 0) {
- getLock().decrementAndGet();
- }
- else {
- log.warn("no more lock");
- }
+ getLock().decrementAndGet();
}
default boolean wasCompleted() {
- return getLock().get() == 0;
+ var c = getLock().get();
+ if(c < 0) {
+ log.warn("illegal session lock state={}, {}", c, this);
+ return true;
+ }
+ return c == 0;
}
static String nextId() {