Skip to content

Commit

Permalink
Add ResponseBodyProcessor timeout (#250)
Browse files Browse the repository at this point in the history
* Add ResponseBodyProcessor timeout

* Update documentation

Co-authored-by: Alexander Bigerl <[email protected]>
  • Loading branch information
nck-mlcnv and bigerl committed Jun 13, 2024
1 parent 85e4e3f commit 74d004a
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 16 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ metrics:
## Durations

Durations are used to define time spans in the configuration.
They can be used for the `timeout`-property of the workers or for the `completionTarget`-property of the tasks.
They can be used for the `timeout`-property of the workers or the response body processors or for the `completionTarget`-property of the tasks.
Duration values can be defined as a XSD duration string or as a string with a number and a unit.
The following units are supported:
- `s` or `sec`or `secs` for seconds
Expand Down
9 changes: 5 additions & 4 deletions docs/configuration/response_body_processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ To use a response body processor, it needs to be defined in the configuration fi
in the `responseBodyProcessors` list.

## Properties
| property | required | description | example |
|-------------|----------|------------------------------------------------------------------------------------|-------------------------------------|
| contentType | yes | The content type of the response body. | `"application/sparql-results+json"` |
| threads | no | The number of threads that are used to process the response bodies. (default is 1) | `2` |
| property | required | description | example |
|-------------|----------|--------------------------------------------------------------------------------------------------------------------|-------------------------------------|
| contentType | yes | The content type of the response body. | `"application/sparql-results+json"` |
| threads | no | The number of threads that are used to process the response bodies. (default is 1) | `2` |
| timeout | no | The maximum duration that the response body processor can take to process a response body. (default is 10 minutes) | `10m` |
1 change: 1 addition & 0 deletions example-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,4 @@ storages:
responseBodyProcessors:
- contentType: "application/sparql-results+json"
threads: 1
timeout: 1 min
6 changes: 4 additions & 2 deletions schema/iguana-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,13 @@
"threads": {
"type": "integer",
"minimum": 1
},
"timeout" : {
"type": "string"
}
},
"required": [
"contentType",
"threads"
"contentType"
],
"title": "ResponseBodyProcessor"
},
Expand Down
24 changes: 17 additions & 7 deletions src/main/java/org/aksw/iguana/cc/worker/ResponseBodyProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
import java.util.concurrent.*;

public class ResponseBodyProcessor {
public record Config(String contentType, Integer threads) {
public Config(String contentType, Integer threads) {
public record Config(String contentType, Integer threads, Duration timeout) {
public Config(String contentType, Integer threads, Duration timeout) {
this.contentType = contentType;
this.threads = threads == null ? 1 : threads;
this.timeout = timeout == null ? Duration.ofMinutes(10) : timeout;
}
}

Expand All @@ -26,20 +27,24 @@ public record Key(long contentLength, long xxh64) {}
public ResponseBodyProcessor(Config config) {
this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(config.threads == null ? 1 : config.threads);
this.languageProcessor = LanguageProcessor.getInstance(config.contentType);
this.timeout = config.timeout;
}

public ResponseBodyProcessor(String contentType) {
this(new Config(contentType, null));
this(new Config(contentType, null, null));
}

private static final Logger LOGGER = LoggerFactory.getLogger(ResponseBodyProcessor.class);

private final Duration timeout;

private final ConcurrentHashMap.KeySetView<Key, Boolean> seenResponseBodies = ConcurrentHashMap.newKeySet();

private final List<LanguageProcessor.LanguageProcessingData> responseDataMetrics = Collections.synchronizedList(new ArrayList<>());
private final LanguageProcessor languageProcessor;

private final ThreadPoolExecutor executor;
private final ScheduledExecutorService executorHandler = Executors.newScheduledThreadPool(1);

public boolean add(long contentLength, long xxh64, BigByteArrayOutputStream bbaos) {
final var key = new Key(contentLength, xxh64);
Expand All @@ -51,23 +56,28 @@ public boolean add(long contentLength, long xxh64, BigByteArrayOutputStream bbao
}

private void submit(Key key, BigByteArrayOutputStream bigByteArrayOutputStream) {
executor.execute(() -> {
final var future = executor.submit(() -> {
var processingResult = languageProcessor.process(new BigByteArrayInputStream(bigByteArrayOutputStream), key.xxh64);
responseDataMetrics.add(processingResult);
});
executorHandler.schedule(() -> {
if (!future.isDone()) {
future.cancel(true);
LOGGER.warn("ResponseBodyProcessor timed out for key: {}", key);
}
}, timeout.toSeconds(), TimeUnit.SECONDS);
}

public List<LanguageProcessor.LanguageProcessingData> getResponseDataMetrics() {
if (executor.isTerminated()) {
return responseDataMetrics;
}

final var timeout = Duration.ofMinutes(10);
LOGGER.info(MessageFormat.format("Shutting down ResponseBodyProcessor with {0}min timeout to finish processing. {1} tasks remaining.", timeout.toMinutes(), executor.getQueue().size()));
LOGGER.info(MessageFormat.format("Shutting down ResponseBodyProcessor with {0} min timeout to finish processing. {1} tasks remaining.", timeout.toMinutes() + "." + (timeout.toSecondsPart() / (double) 60), executor.getQueue().size()));
boolean noTimeout;
try {
executor.shutdown();
noTimeout = executor.awaitTermination(10, TimeUnit.MINUTES);
noTimeout = executor.awaitTermination(timeout.toSeconds(), TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
6 changes: 4 additions & 2 deletions src/main/resources/iguana-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,13 @@
"threads": {
"type": "integer",
"minimum": 1
},
"timeout" : {
"type": "string"
}
},
"required": [
"contentType",
"threads"
"contentType"
],
"title": "ResponseBodyProcessor"
},
Expand Down

0 comments on commit 74d004a

Please sign in to comment.