Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions docs/plans/2026-04-17-bulk-import-reid-race.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Bulk Import re-ID Race Fix — Implementation Summary (v3)

**Goal:** Eliminate the race condition where bulk-import re-ID jobs kick off per-encounter immediately after each detection callback, missing intra-bulk matches because sibling annotations aren't yet in WBIA's matching index.

**Architecture:** Branch `processCallback` on bulk imports. When a detection callback belongs to a bulk import (detected via `importTaskId` in task parameters), defer identification. On each callback, check whether ALL MediaAssets in the ImportTask have terminal detection status. Only the LAST terminal callback (claimed atomically via JVM-global registry + persistent field for restart-survival) fires a single bulk-final ID pass over ALL eligible annotations from the ImportTask.

## Files Changed

| File | Change |
|------|--------|
| `identity/BulkFinalIdentificationRegistry.java` | NEW — JVM-global ConcurrentHashMap single-winner claim |
| `ia/Task.java` | ADD `bulkFinalIdentificationFired` persistent Boolean + getter/setter |
| `ia/package.jdo` | ADD JDO mapping for new Task field |
| `importer/ImportTask.java` | ADD `isAllDetectionTerminal()` — checks all MAs have terminal status |
| `identity/IBEISIA.java` | REWRITE identification block in `processCallback` (~1416-1441). Extracted helpers: `getImportTaskId`, `fireBulkFinalIdentificationIfReady`, `groupByEncounter`, `collectNewAnnotations`, `kickoffIdentificationForAnnotations` |

## Gate Logic (in order)

1. `skipIdent` property check → early return (unchanged)
2. `taskParametersSkipIdent(parentTask)` → early return (unchanged)
3. **NEW**: If `importTaskId` is in task parameters → bulk path:
a. `itask.isAllDetectionTerminal()` → false? return (not ready yet)
b. `rootTask.isBulkFinalIdentificationFired()` → true? return (restart survival)
c. `BulkFinalIdentificationRegistry.tryClaim(importTaskId)` → false? return (another thread won)
d. Mark persistent flag, commit
e. Enumerate ALL `itask.getAnnotations()` filtered by `matchAgainst && validForIdentification`
f. Group by encounter, kick off ID per encounter with location filters
4. If NOT bulk → today's per-callback behavior (unchanged logic, refactored into helpers)

## Tests (22 total, all passing)

- `BulkFinalIdentificationRegistryTest` — 5 tests including concurrent 50-thread race
- `IBEISIABulkCallbackTest` — 5 tests for bulk detection via task parameters
- `IBEISIAGroupByEncounterTest` — 4 tests for encounter grouping
- `ImportTaskDetectionTerminalTest` — 8 tests for terminal status checking

## Edge Cases

- **Pending assets (human review required):** Terminal for gating purposes; no annotations created, so they won't be in the ID pass. Follow-up re-ID needed after review.
- **Restart mid-bulk:** Persistent `Task.bulkFinalIdentificationFired` on the root IA Task (via `ImportTask.getIATask()`) prevents re-fire.
- **Multi-species/multi-algorithm callbacks:** Claim is keyed on `importTaskId` (not callback task ID), so multiple callback tasks for the same import share the same claim. Root IA Task is resolved via `itask.getIATask()` to avoid broken `getRootTask()` when `addChild()` doesn't set parent pointers.
- **Zero eligible annotations after filtering:** `kickoffIdentificationForAnnotations` logs and returns.
- **Non-bulk regression:** Non-bulk path explicitly goes through same `collectNewAnnotations` + `kickoffIdentificationForAnnotations`, preserving identical behavior.
- **Multi-Tomcat:** Current implementation is single-Tomcat-safe. For multi-instance, replace registry with DB compare-and-set on the persistent field.
9 changes: 9 additions & 0 deletions src/main/java/org/ecocean/ia/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class Task implements java.io.Serializable {
private String statusDetails = null;
private Long completionDateInMilliseconds;
private String queueResumeMessage;
private Boolean bulkFinalIdentificationFired = null;

public Task() {
this(Util.generateUUID());
Expand Down Expand Up @@ -509,6 +510,14 @@ public String toString() {
.toString();
}

public boolean isBulkFinalIdentificationFired() {
return Boolean.TRUE.equals(bulkFinalIdentificationFired);
}

public void markBulkFinalIdentificationFired() {
this.bulkFinalIdentificationFired = Boolean.TRUE;
}

public static Task load(String taskId, Shepherd myShepherd) {
Task t = null;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.ecocean.identity;

import java.util.concurrent.ConcurrentHashMap;

public final class BulkFinalIdentificationRegistry {
private static final ConcurrentHashMap<String, Boolean> CLAIMED =
new ConcurrentHashMap<>();

private BulkFinalIdentificationRegistry() {}

public static boolean tryClaim(String taskId) {
if (taskId == null) return false;
return CLAIMED.putIfAbsent(taskId, Boolean.TRUE) == null;
}

public static void release(String taskId) {
if (taskId != null) CLAIMED.remove(taskId);
}

public static void clearForTesting() {
CLAIMED.clear();
}
}
215 changes: 138 additions & 77 deletions src/main/java/org/ecocean/identity/IBEISIA.java
Original file line number Diff line number Diff line change
Expand Up @@ -1443,94 +1443,155 @@ a few lines below here still has output (in mlservice-free IA.json) such as:

boolean skipIdent = Util.booleanNotFalse(IA.getProperty(context,
"IBEISIADisableIdentification"));
// now we pick up IA.intake(anns) from detection above (if applicable)
// should we cluster these based on MediaAsset instead? send them in groups to IA.intake()?
if (!skipIdent && (newAnns != null)) {
List<Annotation> needIdentifying = new ArrayList<Annotation>();
Shepherd myShepherd2 = new Shepherd(context);
myShepherd2.setAction("IBEISIA.processCallback-IA.intake");
myShepherd2.beginDBTransaction();
if (skipIdent) return rtn;

Shepherd myShepherd2 = new Shepherd(context);
myShepherd2.setAction("IBEISIA.processCallback-IA.intake");
myShepherd2.beginDBTransaction();
try {
Task parentTask = Task.load(taskID, myShepherd2);
// Task parametersSkipIdent looks at the parent task.. It works for non-ID Wildbooks. If you are sending multiple annotations from a
// single image, and some need
// ID, some don't, you need to check downstream.
if (taskParametersSkipIdent(parentTask)) {
System.out.println("NOTICE: IBEISIA.processCallback() " + parentTask +
" skipped identification");
} else {
Iterator<?> keys = newAnns.keys();
while (keys.hasNext()) {
String maId = (String)keys.next();
System.out.println("maId -> " + maId);
JSONArray annIds = newAnns.optJSONArray(maId);
if (annIds == null) continue;
System.out.println(" ---> " + annIds);
for (int i = 0; i < annIds.length(); i++) {
String aid = annIds.optString(i, null);
if (aid == null) continue;
Annotation ann = ((Annotation)(myShepherd2.getPM().getObjectById(
myShepherd2.getPM().newObjectIdInstance(Annotation.class, aid), true)));
if (ann != null && IBEISIA.validForIdentification(ann,
myShepherd2.getContext())) {
needIdentifying.add(ann);
}
}
}
return rtn;
}
if (needIdentifying.size() > 0) {
// split the results into encounters
HashMap<String, ArrayList<Annotation> > needIdentifyingMap = new HashMap<String,
ArrayList<Annotation> >();
for (Annotation annot : needIdentifying) {
Encounter enc = annot.findEncounter(myShepherd2);
if (enc != null) {
if (needIdentifyingMap.containsKey(enc.getCatalogNumber())) {
ArrayList<Annotation> annots = needIdentifyingMap.get(
enc.getCatalogNumber());
annots.add(annot);
needIdentifyingMap.put(enc.getCatalogNumber(), annots);
} else {
ArrayList<Annotation> annots = new ArrayList<Annotation>();
annots.add(annot);
needIdentifyingMap.put(enc.getCatalogNumber(), annots);
}
}
}
// send to ID by Encounter
for (String encUUID : needIdentifyingMap.keySet()) {
ArrayList<Annotation> annots = needIdentifyingMap.get(encUUID);
JSONObject taskParameters = new JSONObject();
JSONObject mf = new JSONObject();
Encounter enc = myShepherd2.getEncounter(encUUID);
if (enc != null && enc.getLocationID() != null) {
ArrayList<String> locationIDs = new ArrayList<String>();
List<String> matchTheseLocationIDs = LocationID.getIDForParentAndChildren(
enc.getLocationID(), locationIDs, null);
mf.put("locationIds", matchTheseLocationIDs);
}
taskParameters.put("matchingSetFilter", mf);
Task subParentTask = new Task();
subParentTask.setParameters(taskParameters);
myShepherd2.storeNewTask(subParentTask);
myShepherd2.updateDBTransaction();

Task childTask = IA.intakeAnnotations(myShepherd2, annots, subParentTask,
false);
myShepherd2.storeNewTask(childTask);
myShepherd2.updateDBTransaction();
subParentTask.addChild(childTask);
myShepherd2.updateDBTransaction();
}
} else {
System.out.println(
"[INFO]: No annotations were suitable for identification. Check resulting identification class(es).");
myShepherd2.rollbackDBTransaction();

String importTaskId = getImportTaskId(parentTask);
if (importTaskId != null) {
fireBulkFinalIdentificationIfReady(importTaskId, parentTask, myShepherd2);
} else if (newAnns != null) {
List<Annotation> needIdentifying = collectNewAnnotations(newAnns, myShepherd2);
kickoffIdentificationForAnnotations(needIdentifying, myShepherd2, null);
}
} finally {
myShepherd2.rollbackAndClose();
}
return rtn;
}

static String getImportTaskId(Task parentTask) {
if (parentTask == null) return null;
JSONObject params = parentTask.getParameters();
if (params == null) return null;
String id = params.optString("importTaskId", null);
return (id != null && !id.isEmpty()) ? id : null;
}

static boolean isBulkImportParentTask(Task parentTask) {
return getImportTaskId(parentTask) != null;
}

private static void fireBulkFinalIdentificationIfReady(
String importTaskId, Task parentTask, Shepherd myShepherd) {
ImportTask itask = myShepherd.getImportTask(importTaskId);
if (itask == null) {
System.out.println("WARN: fireBulkFinalIdentificationIfReady: " +
"ImportTask " + importTaskId + " not found");
return;
}
if (!itask.isAllDetectionTerminal()) return;
Task rootTask = itask.getIATask();
if (rootTask == null) rootTask = parentTask;
if (rootTask.isBulkFinalIdentificationFired()) {
System.out.println("INFO: fireBulkFinalIdentificationIfReady: " +
"already fired (persistent) for ImportTask " + importTaskId);
return;
}
if (!BulkFinalIdentificationRegistry.tryClaim(importTaskId)) {
System.out.println("INFO: fireBulkFinalIdentificationIfReady: " +
"another callback already claimed for ImportTask " + importTaskId);
return;
}
rootTask.markBulkFinalIdentificationFired();
myShepherd.updateDBTransaction();

java.util.Set<Annotation> all = itask.getAnnotations();
List<Annotation> eligible = new ArrayList<Annotation>();
for (Annotation a : all) {
if (a == null) continue;
if (!a.getMatchAgainst()) continue;
if (!validForIdentification(a, myShepherd.getContext())) continue;
eligible.add(a);
}
System.out.println("INFO: fireBulkFinalIdentificationIfReady: ImportTask " +
importTaskId + " -> ID on " + eligible.size() + " annotations across " +
itask.numberEncounters() + " encounters");
kickoffIdentificationForAnnotations(eligible, myShepherd, rootTask);
BulkFinalIdentificationRegistry.release(importTaskId);
}

static Map<String, List<Annotation>> groupByEncounter(
List<Annotation> anns, Shepherd myShepherd) {
Map<String, List<Annotation>> map = new java.util.LinkedHashMap<>();
if (anns == null) return map;
for (Annotation ann : anns) {
if (ann == null) continue;
Encounter enc = ann.findEncounter(myShepherd);
if (enc == null) continue;
map.computeIfAbsent(enc.getCatalogNumber(), k -> new ArrayList<>()).add(ann);
}
return map;
}

private static List<Annotation> collectNewAnnotations(
JSONObject newAnns, Shepherd myShepherd) {
List<Annotation> out = new ArrayList<Annotation>();
if (newAnns == null) return out;
Iterator<?> keys = newAnns.keys();
while (keys.hasNext()) {
String maId = (String) keys.next();
JSONArray annIds = newAnns.optJSONArray(maId);
if (annIds == null) continue;
for (int i = 0; i < annIds.length(); i++) {
String aid = annIds.optString(i, null);
if (aid == null) continue;
try {
Annotation ann = (Annotation) myShepherd.getPM().getObjectById(
myShepherd.getPM().newObjectIdInstance(Annotation.class, aid), true);
if (ann != null && validForIdentification(ann, myShepherd.getContext())) {
out.add(ann);
}
} catch (Exception ex) {
System.out.println("WARN: collectNewAnnotations miss " + aid + ": " + ex);
}
}
}
return out;
}

private static void kickoffIdentificationForAnnotations(
List<Annotation> anns, Shepherd myShepherd, Task rootTask) {
if (anns == null || anns.isEmpty()) {
System.out.println("[INFO]: No annotations suitable for identification.");
return;
}
Map<String, List<Annotation>> byEncounter = groupByEncounter(anns, myShepherd);
for (Map.Entry<String, List<Annotation>> e : byEncounter.entrySet()) {
String encUUID = e.getKey();
List<Annotation> encAnnots = e.getValue();
JSONObject taskParameters = new JSONObject();
JSONObject mf = new JSONObject();
Encounter enc = myShepherd.getEncounter(encUUID);
if (enc != null && enc.getLocationID() != null) {
ArrayList<String> locationIDs = new ArrayList<String>();
List<String> matchTheseLocationIDs = LocationID.getIDForParentAndChildren(
enc.getLocationID(), locationIDs, null);
mf.put("locationIds", matchTheseLocationIDs);
}
taskParameters.put("matchingSetFilter", mf);
Task subParentTask = new Task();
subParentTask.setParameters(taskParameters);
myShepherd.storeNewTask(subParentTask);
if (rootTask != null) rootTask.addChild(subParentTask);
myShepherd.updateDBTransaction();
Task childTask = IA.intakeAnnotations(myShepherd, encAnnots, subParentTask, false);
myShepherd.storeNewTask(childTask);
myShepherd.updateDBTransaction();
subParentTask.addChild(childTask);
myShepherd.updateDBTransaction();
}
}

private static JSONObject processCallbackDetect(String taskID,
ArrayList<IdentityServiceLog> logs, JSONObject resp, Shepherd myShepherd,
HttpServletRequest request) {
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/ecocean/servlet/importer/ImportTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,21 @@ public List<MediaAsset> getMediaAssets() {
return mas;
}

private static final Set<String> TERMINAL_DETECTION_STATUSES =
java.util.Collections.unmodifiableSet(new HashSet<>(
java.util.Arrays.asList("complete", "pending", "error")));

public boolean isAllDetectionTerminal() {
List<MediaAsset> assets = this.getMediaAssets();
if (assets == null || assets.isEmpty()) return true;
for (MediaAsset ma : assets) {
if (ma == null) continue;
String status = ma.getDetectionStatus();
if (status == null || !TERMINAL_DETECTION_STATUSES.contains(status)) return false;
}
return true;
}

public List<Occurrence> getOccurrences(Shepherd myShepherd) {
if (encounters == null) return null;
List<Occurrence> occs = new ArrayList<Occurrence>();
Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/org/ecocean/ia/package.jdo
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ alter table "TASK" alter column "PARAMETERS" type text;
</field>
-->

<field name="bulkFinalIdentificationFired" persistence-modifier="persistent">
<column allows-null="true"/>
</field>

<field name="objectMediaAssets">
<collection element-type="org.ecocean.media.MediaAsset"/>
<join />
Expand Down
Loading
Loading