-
Notifications
You must be signed in to change notification settings - Fork 97
Patch for Github issue #524 #526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
6c5dd25
5a18f2f
e325921
ee88f45
9b75259
bc4e6da
58ddfcb
8add4dc
90c2539
09f292f
d9753a6
ecd53c0
9c87d7b
7c29fef
b6cf766
b1af68c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,9 @@ | |
| import java.util.concurrent.BlockingQueue; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.RejectedExecutionException; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
@@ -67,6 +69,7 @@ public class AnalyticsClient { | |
| private final ScheduledExecutorService flushScheduler; | ||
| private final AtomicBoolean isShutDown; | ||
| private final String writeKey; | ||
| private volatile Future<?> looperFuture; | ||
|
|
||
| public static AnalyticsClient create( | ||
| HttpUrl uploadUrl, | ||
|
|
@@ -130,7 +133,9 @@ public AnalyticsClient( | |
|
|
||
| this.currentQueueSizeInBytes = 0; | ||
|
|
||
| if (!isShutDown.get()) looperExecutor.submit(new Looper()); | ||
| if (!isShutDown.get()) { | ||
| this.looperFuture = looperExecutor.submit(new Looper()); | ||
| } | ||
|
|
||
| flushScheduler = Executors.newScheduledThreadPool(1, threadFactory); | ||
| flushScheduler.scheduleAtFixedRate( | ||
|
|
@@ -218,6 +223,8 @@ public void shutdown() { | |
| // we can shutdown the flush scheduler without worrying | ||
| flushScheduler.shutdownNow(); | ||
|
|
||
| // Wait for the looper to complete processing before shutting down executors | ||
| waitForLooperCompletion(); | ||
| shutdownAndWait(looperExecutor, "looper"); | ||
| shutdownAndWait(networkExecutor, "network"); | ||
|
|
||
|
|
@@ -226,19 +233,76 @@ public void shutdown() { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Wait for the looper to complete processing all messages before proceeding with shutdown. This | ||
| * prevents the race condition where the network executor is shut down before the looper finishes | ||
| * submitting all batches. | ||
| */ | ||
| private void waitForLooperCompletion() { | ||
| if (looperFuture != null) { | ||
| try { | ||
| // Wait for the looper to complete processing the STOP message and finish | ||
| // Use a reasonable timeout to avoid hanging indefinitely | ||
| looperFuture.get(5, TimeUnit.SECONDS); | ||
| log.print(VERBOSE, "Looper completed successfully."); | ||
| } catch (Exception e) { | ||
| log.print(ERROR, e, "Error waiting for looper to complete: %s", e.getMessage()); | ||
MichaelGHSeg marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // Cancel the looper if it's taking too long or if there's an error | ||
| if (!looperFuture.isDone()) { | ||
| looperFuture.cancel(true); | ||
| log.print(VERBOSE, "Looper was cancelled due to timeout or error."); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
243
to
259
|
||
|
|
||
| public void shutdownAndWait(ExecutorService executor, String name) { | ||
| boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); | ||
| try { | ||
| executor.shutdown(); | ||
| final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS); | ||
|
|
||
| log.print( | ||
| VERBOSE, | ||
| "%s executor %s.", | ||
| name, | ||
| executorTerminated ? "terminated normally" : "timed out"); | ||
| boolean terminated = executor.awaitTermination(1, TimeUnit.SECONDS); | ||
| if (terminated) { | ||
| log.print(VERBOSE, "%s executor terminated normally.", name); | ||
| return; | ||
| } | ||
| if (isLooperExecutor) { | ||
| // not terminated within timeout -> force shutdown | ||
| log.print(VERBOSE, "%s did not terminate in %d ms; requesting shutdownNow().", name, 1); | ||
MichaelGHSeg marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| List<Runnable> dropped = executor.shutdownNow(); // interrupts running tasks | ||
| log.print( | ||
| VERBOSE, | ||
| "%s shutdownNow returned %d queued tasks that never started.", | ||
| name, | ||
| dropped.size()); | ||
|
|
||
| // optional short wait to give interrupted tasks a chance to exit | ||
| boolean terminatedAfterForce = executor.awaitTermination(1, TimeUnit.SECONDS); | ||
| log.print( | ||
| VERBOSE, | ||
| "%s executor %s after shutdownNow().", | ||
| name, | ||
| terminatedAfterForce ? "terminated" : "still running (did not terminate)"); | ||
|
|
||
| if (!terminatedAfterForce) { | ||
| // final warning — investigate tasks that ignore interrupts | ||
| log.print( | ||
| ERROR, | ||
| "%s executor still did not terminate; tasks may be ignoring interrupts.", | ||
| name); | ||
| } | ||
| } | ||
| } catch (InterruptedException e) { | ||
| // Preserve interrupt status and attempt forceful shutdown | ||
| log.print(ERROR, e, "Interrupted while stopping %s executor.", name); | ||
| Thread.currentThread().interrupt(); | ||
| if (isLooperExecutor) { | ||
| List<Runnable> dropped = executor.shutdownNow(); | ||
| log.print( | ||
| VERBOSE, | ||
| "%s shutdownNow invoked after interrupt; %d tasks returned.", | ||
| name, | ||
| dropped.size()); | ||
| } | ||
MichaelGHSeg marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
|
|
@@ -299,8 +363,22 @@ public void run() { | |
| "Batching %s message(s) into batch %s.", | ||
| batch.batch().size(), | ||
| batch.sequence()); | ||
| networkExecutor.submit( | ||
| BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); | ||
| try { | ||
| networkExecutor.submit( | ||
| BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); | ||
| } catch (RejectedExecutionException e) { | ||
| log.print( | ||
| ERROR, | ||
| e, | ||
| "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", | ||
| batch.sequence()); | ||
| // Notify callbacks about the failure | ||
| for (Message msg : batch.batch()) { | ||
| for (Callback callback : callbacks) { | ||
| callback.failure(msg, e); | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+373
to
+388
|
||
|
|
||
| currentBatchSize.set(0); | ||
| messages.clear(); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.