Skip to content

Commit

Permalink
Generate event IDs earlier so previousEventIds can be updated accurately
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Sep 29, 2023
1 parent 921749e commit ed3357d
Show file tree
Hide file tree
Showing 18 changed files with 220 additions and 339 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.nifi.provenance.search.SearchableField;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -42,8 +41,8 @@ public class NoOpProvenanceRepository implements ProvenanceRepository {

@Override
public void initialize(EventReporter eventReporter, Authorizer authorizer,
ProvenanceAuthorizableFactory factory, IdentifierLookup identifierLookup)
throws IOException {
ProvenanceAuthorizableFactory factory, IdentifierLookup identifierLookup)
throws IOException {

}

Expand All @@ -69,13 +68,13 @@ public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) throws

@Override
public List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxRecords)
throws IOException {
throws IOException {
return emptyList();
}

@Override
public List<ProvenanceEventRecord> getEvents(long firstRecordId,
int maxRecords, NiFiUser niFiUser) throws IOException {
int maxRecords, NiFiUser niFiUser) throws IOException {
return emptyList();
}

Expand Down Expand Up @@ -156,7 +155,7 @@ public long getContainerUsableSpace(String s) throws IOException {

@Override
public AsyncLineageSubmission retrieveLineageSubmission(final String lineageIdentifier,
final NiFiUser user) {
final NiFiUser user) {
return null;
}

Expand All @@ -169,14 +168,4 @@ public AsyncLineageSubmission submitExpandParents(final long eventId, final NiFi
public AsyncLineageSubmission submitExpandChildren(final long eventId, final NiFiUser user) {
return null;
}

@Override
public List<Long> getPreviousEventIds(String flowFileUUID) {
return Collections.emptyList();
}

@Override
public void updatePreviousEventIds(ProvenanceEventRecord record, List<Long> previousIds) {
// Do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public interface ProvenanceEventRecord {
*/
List<Long> getPreviousEventIds();

void setPreviousEventIds(List<Long> previousEventIds);

/**
* @return the time at which this Provenance Event was created, as the
* number of milliseconds since epoch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,4 @@ public interface ProvenanceEventRepository {
* @throws IOException if failure closing repository
*/
void close() throws IOException;

/**
* Returns the previous provenance event IDs for the given FlowFile
* @param flowFileUUID the UUID of the FlowFile
* @return the previous event IDs for the given FlowFile
*/
List<Long> getPreviousEventIds(String flowFileUUID);

/**
* Updates the previous provenance event IDs for the given event
*
* @param record The record for which to update the previous event IDs
* @param previousIds the list of previous event IDs to set for the record, or null to remove
*/
void updatePreviousEventIds(ProvenanceEventRecord record, List<Long> previousIds);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,10 @@
*/
package org.apache.nifi.provenance;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public abstract class AbstractProvenanceRepository implements ProvenanceRepository {
public interface UpdateableProvenanceEventRecord extends ProvenanceEventRecord {

protected final Map<String, List<Long>> previousEventIdsMap = new HashMap<>();

@Override
public List<Long> getPreviousEventIds(String flowFileUUID) {
return previousEventIdsMap.get(flowFileUUID);
}

@Override
public void updatePreviousEventIds(ProvenanceEventRecord record, List<Long> previousIds) {
if (previousIds == null) {
previousEventIdsMap.remove(record.getFlowFileUuid());
} else {
previousEventIdsMap.put(record.getFlowFileUuid(), previousIds);
}
}
void setEventId(final long eventId);
void setPreviousEventIds(List<Long> previousEventIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
* A Provenance Event that is used to replace another Provenance Event when authorizations
* are not granted for the original Provenance Event
*/
public class PlaceholderProvenanceEvent implements ProvenanceEventRecord {
public class PlaceholderProvenanceEvent implements UpdateableProvenanceEventRecord {
private final String componentId;
private final long eventId;
private long eventId;
private List<Long> previousEventIds;
private final long eventTime;
private final String flowFileUuid;
Expand All @@ -45,6 +45,11 @@ public long getEventId() {
return eventId;
}

@Override
public void setEventId(long eventId) {
this.eventId = eventId;
}

@Override
public List<Long> getPreviousEventIds() {
return previousEventIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* Holder for provenance relevant information
*/
public class StandardProvenanceEventRecord implements ProvenanceEventRecord {
public class StandardProvenanceEventRecord implements UpdateableProvenanceEventRecord {

private final long eventTime;
private final long entryDate;
Expand Down Expand Up @@ -124,7 +124,8 @@ public long getStorageByteOffset() {
return storageByteOffset;
}

void setEventId(final long eventId) {
@Override
public void setEventId(final long eventId) {
this.eventId = eventId;
}

Expand All @@ -138,7 +139,6 @@ public List<Long> getPreviousEventIds() {
return previousEventIds;
}

@Override
public void setPreviousEventIds(List<Long> previousEventIds) {
this.previousEventIds = previousEventIds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

public class MockProvenanceRepository extends AbstractProvenanceRepository {
public class MockProvenanceRepository implements ProvenanceRepository {

private final List<ProvenanceEventRecord> records = new ArrayList<>();
private final AtomicLong idGenerator = new AtomicLong(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.UpdateableProvenanceEventRecord;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SimpleProvenanceRecord implements ProvenanceEventRecord {
public class SimpleProvenanceRecord implements UpdateableProvenanceEventRecord {
private long eventId;
private List<Long> previousEventIds;
private String componentId;
Expand Down Expand Up @@ -53,6 +54,7 @@ public void setEventId(long eventId) {
this.eventId = eventId;
}

@Override
public void setPreviousEventIds(List<Long> previousEventIds) {
this.previousEventIds = previousEventIds;
}
Expand Down
Loading

0 comments on commit ed3357d

Please sign in to comment.