Skip to content

Commit

Permalink
Generate event IDs earlier so previousEventIds can be updated accurately
Browse files Browse the repository at this point in the history
Added flow graph components to provenance graph

Fix component nodes to update properties rather than a new node
  • Loading branch information
mattyb149 committed Oct 2, 2023
1 parent 6baca48 commit 2961349
Show file tree
Hide file tree
Showing 41 changed files with 544 additions and 401 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.nifi.provenance.search.SearchableField;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -42,8 +41,8 @@ public class NoOpProvenanceRepository implements ProvenanceRepository {

@Override
public void initialize(EventReporter eventReporter, Authorizer authorizer,
ProvenanceAuthorizableFactory factory, IdentifierLookup identifierLookup)
throws IOException {
ProvenanceAuthorizableFactory factory, IdentifierLookup identifierLookup)
throws IOException {

}

Expand All @@ -69,13 +68,13 @@ public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) throws

@Override
public List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxRecords)
throws IOException {
throws IOException {
return emptyList();
}

@Override
public List<ProvenanceEventRecord> getEvents(long firstRecordId,
int maxRecords, NiFiUser niFiUser) throws IOException {
int maxRecords, NiFiUser niFiUser) throws IOException {
return emptyList();
}

Expand Down Expand Up @@ -156,7 +155,7 @@ public long getContainerUsableSpace(String s) throws IOException {

@Override
public AsyncLineageSubmission retrieveLineageSubmission(final String lineageIdentifier,
final NiFiUser user) {
final NiFiUser user) {
return null;
}

Expand All @@ -169,14 +168,4 @@ public AsyncLineageSubmission submitExpandParents(final long eventId, final NiFi
public AsyncLineageSubmission submitExpandChildren(final long eventId, final NiFiUser user) {
return null;
}

@Override
public List<Long> getPreviousEventIds(String flowFileUUID) {
return Collections.emptyList();
}

@Override
public void updatePreviousEventIds(ProvenanceEventRecord record, List<Long> previousIds) {
// Do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public interface ProvenanceEventRecord {
*/
List<Long> getPreviousEventIds();

void setPreviousEventIds(List<Long> previousEventIds);

/**
* @return the time at which this Provenance Event was created, as the
* number of milliseconds since epoch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,4 @@ public interface ProvenanceEventRepository {
* @throws IOException if failure closing repository
*/
void close() throws IOException;

/**
* Returns the previous provenance event IDs for the given FlowFile
* @param flowFileUUID the UUID of the FlowFile
* @return the previous event IDs for the given FlowFile
*/
List<Long> getPreviousEventIds(String flowFileUUID);

/**
* Updates the previous provenance event IDs for the given event
*
* @param record The record for which to update the previous event IDs
* @param previousIds the list of previous event IDs to set for the record, or null to remove
*/
void updatePreviousEventIds(ProvenanceEventRecord record, List<Long> previousIds);

}
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,19 @@ public interface ProvenanceReporter {
*/
void create(FlowFile flowFile, String details);

/**
* Emits a Provenance Event of type
* {@link ProvenanceEventType#CREATE CREATE} that indicates that the given
* FlowFile was created by NiFi from data that was not received from an
* external entity. If the data was received from an external source, use
* the {@link #receive(FlowFile, String, String, long)} event instead
*
* @param flowFile the FlowFile that was created
* @param details any relevant details about the CREATE event
* @param relationship the relationship the created FlowFile was transferred to
*/
void create(FlowFile flowFile, String details, Relationship relationship);

/**
* @return the number of FlowFiles for which there was a RECEIVE event
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,10 @@
*/
package org.apache.nifi.provenance;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public abstract class AbstractProvenanceRepository implements ProvenanceRepository {
public interface UpdateableProvenanceEventRecord extends ProvenanceEventRecord {

protected final Map<String, List<Long>> previousEventIdsMap = new HashMap<>();

@Override
public List<Long> getPreviousEventIds(String flowFileUUID) {
return previousEventIdsMap.get(flowFileUUID);
}

@Override
public void updatePreviousEventIds(ProvenanceEventRecord record, List<Long> previousIds) {
if (previousIds == null) {
previousEventIdsMap.remove(record.getFlowFileUuid());
} else {
previousEventIdsMap.put(record.getFlowFileUuid(), previousIds);
}
}
void setEventId(final long eventId);
void setPreviousEventIds(List<Long> previousEventIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
* A Provenance Event that is used to replace another Provenance Event when authorizations
* are not granted for the original Provenance Event
*/
public class PlaceholderProvenanceEvent implements ProvenanceEventRecord {
public class PlaceholderProvenanceEvent implements UpdateableProvenanceEventRecord {
private final String componentId;
private final long eventId;
private long eventId;
private List<Long> previousEventIds;
private final long eventTime;
private final String flowFileUuid;
Expand All @@ -45,6 +45,11 @@ public long getEventId() {
return eventId;
}

@Override
public void setEventId(long eventId) {
this.eventId = eventId;
}

@Override
public List<Long> getPreviousEventIds() {
return previousEventIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* Holder for provenance relevant information
*/
public class StandardProvenanceEventRecord implements ProvenanceEventRecord {
public class StandardProvenanceEventRecord implements UpdateableProvenanceEventRecord {

private final long eventTime;
private final long entryDate;
Expand Down Expand Up @@ -124,7 +124,8 @@ public long getStorageByteOffset() {
return storageByteOffset;
}

void setEventId(final long eventId) {
@Override
public void setEventId(final long eventId) {
this.eventId = eventId;
}

Expand All @@ -138,7 +139,6 @@ public List<Long> getPreviousEventIds() {
return previousEventIds;
}

@Override
public void setPreviousEventIds(List<Long> previousEventIds) {
this.previousEventIds = previousEventIds;
}
Expand Down Expand Up @@ -430,7 +430,7 @@ public String toString() {
+ ", uuid=" + uuid
+ ", fileSize=" + contentSize
+ ", componentId=" + componentId
+ ", componentType" + componentType
+ ", componentType=" + componentType
+ ", transitUri=" + transitUri
+ ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier
+ ", parentUuids=" + parentUuids
Expand Down Expand Up @@ -534,7 +534,6 @@ public Builder fromEvent(final ProvenanceEventRecord event) {
}

previousEventIds = event.getPreviousEventIds();

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

public class MockProvenanceRepository extends AbstractProvenanceRepository {
public class MockProvenanceRepository implements ProvenanceRepository {

private final List<ProvenanceEventRecord> records = new ArrayList<>();
private final AtomicLong idGenerator = new AtomicLong(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,18 @@ public void create(final FlowFile flowFile) {

@Override
public void create(final FlowFile flowFile, final String details) {
create(flowFile, details, null);
}

@Override
public void create(final FlowFile flowFile, final String details, final Relationship relationship) {
verifyFlowFileKnown(flowFile);

try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CREATE).setDetails(details).build();
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CREATE)
.setDetails(details)
.setRelationship(relationship)
.build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.UpdateableProvenanceEventRecord;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SimpleProvenanceRecord implements ProvenanceEventRecord {
public class SimpleProvenanceRecord implements UpdateableProvenanceEventRecord {
private long eventId;
private List<Long> previousEventIds;
private String componentId;
Expand Down Expand Up @@ -53,6 +54,7 @@ public void setEventId(long eventId) {
this.eventId = eventId;
}

@Override
public void setPreviousEventIds(List<Long> previousEventIds) {
this.previousEventIds = previousEventIds;
}
Expand Down
Loading

0 comments on commit 2961349

Please sign in to comment.