Skip to content

Commit

Permalink
Generate event IDs earlier so previousEventIds can be updated accurately
Browse files Browse the repository at this point in the history
Added flow graph components to provenance graph

Fix component nodes to update properties rather than a new node

Add capabilities (some placeholders) for creating the database and indexes, added relationship property to 'outgoing' provenance events

Added relationships to all provenance events except clone and join, also some Cypher improvements

Added some fine-grained exceptions for better error handling/reporting

Split query generation API from GraphClientService into finer-grained methods

Replace Collections.emptyList() with ArrayList<>[0] for modifiability

Fix copyright headers
  • Loading branch information
mattyb149 committed Oct 17, 2023
1 parent 8eb3ee4 commit eb99335
Show file tree
Hide file tree
Showing 284 changed files with 2,159 additions and 1,203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.nifi.provenance.search.SearchableField;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -42,8 +41,8 @@ public class NoOpProvenanceRepository implements ProvenanceRepository {

@Override
public void initialize(EventReporter eventReporter, Authorizer authorizer,
ProvenanceAuthorizableFactory factory, IdentifierLookup identifierLookup)
throws IOException {
ProvenanceAuthorizableFactory factory, IdentifierLookup identifierLookup)
throws IOException {

}

Expand All @@ -69,13 +68,13 @@ public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) throws

@Override
public List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxRecords)
throws IOException {
throws IOException {
return emptyList();
}

@Override
public List<ProvenanceEventRecord> getEvents(long firstRecordId,
int maxRecords, NiFiUser niFiUser) throws IOException {
int maxRecords, NiFiUser niFiUser) throws IOException {
return emptyList();
}

Expand Down Expand Up @@ -156,7 +155,7 @@ public long getContainerUsableSpace(String s) throws IOException {

@Override
public AsyncLineageSubmission retrieveLineageSubmission(final String lineageIdentifier,
final NiFiUser user) {
final NiFiUser user) {
return null;
}

Expand All @@ -169,14 +168,4 @@ public AsyncLineageSubmission submitExpandParents(final long eventId, final NiFi
public AsyncLineageSubmission submitExpandChildren(final long eventId, final NiFiUser user) {
return null;
}

@Override
public List<Long> getPreviousEventIds(String flowFileUUID) {
return Collections.emptyList();
}

@Override
public void updatePreviousEventIds(ProvenanceEventRecord record, List<Long> previousIds) {
// Do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public interface ProvenanceEventRecord {
*/
List<Long> getPreviousEventIds();

void setPreviousEventIds(List<Long> previousEventIds);

/**
* @return the time at which this Provenance Event was created, as the
* number of milliseconds since epoch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,4 @@ public interface ProvenanceEventRepository {
* @throws IOException if failure closing repository
*/
void close() throws IOException;

/**
* Returns the previous provenance event IDs for the given FlowFile
* @param flowFileUUID the UUID of the FlowFile
* @return the previous event IDs for the given FlowFile
*/
List<Long> getPreviousEventIds(String flowFileUUID);

/**
* Updates the previous provenance event IDs for the given event
*
* @param record The record for which to update the previous event IDs
* @param previousIds the list of previous event IDs to set for the record, or null to remove
*/
void updatePreviousEventIds(ProvenanceEventRecord record, List<Long> previousIds);

}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,10 @@
*/
package org.apache.nifi.provenance;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public abstract class AbstractProvenanceRepository implements ProvenanceRepository {
public interface UpdateableProvenanceEventRecord extends ProvenanceEventRecord {

protected final Map<String, List<Long>> previousEventIdsMap = new HashMap<>();

@Override
public List<Long> getPreviousEventIds(String flowFileUUID) {
return previousEventIdsMap.get(flowFileUUID);
}

@Override
public void updatePreviousEventIds(ProvenanceEventRecord record, List<Long> previousIds) {
if (previousIds == null) {
previousEventIdsMap.remove(record.getFlowFileUuid());
} else {
previousEventIdsMap.put(record.getFlowFileUuid(), previousIds);
}
}
void setEventId(final long eventId);
void setPreviousEventIds(List<Long> previousEventIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
* A Provenance Event that is used to replace another Provenance Event when authorizations
* are not granted for the original Provenance Event
*/
public class PlaceholderProvenanceEvent implements ProvenanceEventRecord {
public class PlaceholderProvenanceEvent implements UpdateableProvenanceEventRecord {
private final String componentId;
private final long eventId;
private long eventId;
private List<Long> previousEventIds;
private final long eventTime;
private final String flowFileUuid;
Expand All @@ -45,6 +45,11 @@ public long getEventId() {
return eventId;
}

@Override
public void setEventId(long eventId) {
this.eventId = eventId;
}

@Override
public List<Long> getPreviousEventIds() {
return previousEventIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* Holder for provenance relevant information
*/
public class StandardProvenanceEventRecord implements ProvenanceEventRecord {
public class StandardProvenanceEventRecord implements UpdateableProvenanceEventRecord {

private final long eventTime;
private final long entryDate;
Expand Down Expand Up @@ -124,7 +124,8 @@ public long getStorageByteOffset() {
return storageByteOffset;
}

void setEventId(final long eventId) {
@Override
public void setEventId(final long eventId) {
this.eventId = eventId;
}

Expand All @@ -138,7 +139,6 @@ public List<Long> getPreviousEventIds() {
return previousEventIds;
}

@Override
public void setPreviousEventIds(List<Long> previousEventIds) {
this.previousEventIds = previousEventIds;
}
Expand Down Expand Up @@ -315,7 +315,8 @@ public int hashCode() {
}

return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 41 * transitUri.hashCode())
+ (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode
//+ (relationship == null ? 0 : 47 * relationship.hashCode())
+ 44 * eventTypeCode
+ 47 * getChildUuids().hashCode() + 47 * getParentUuids().hashCode();
}

Expand Down Expand Up @@ -362,10 +363,6 @@ public boolean equals(final Object obj) {
return false;
}

if (different(relationship, other.relationship)) {
return false;
}

return !(eventType == ProvenanceEventType.REPLAY && eventTime != other.getEventTime());
}

Expand Down Expand Up @@ -430,7 +427,7 @@ public String toString() {
+ ", uuid=" + uuid
+ ", fileSize=" + contentSize
+ ", componentId=" + componentId
+ ", componentType" + componentType
+ ", componentType=" + componentType
+ ", transitUri=" + transitUri
+ ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier
+ ", parentUuids=" + parentUuids
Expand Down Expand Up @@ -534,7 +531,6 @@ public Builder fromEvent(final ProvenanceEventRecord event) {
}

previousEventIds = event.getPreviousEventIds();

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

public class MockProvenanceRepository extends AbstractProvenanceRepository {
public class MockProvenanceRepository implements ProvenanceRepository {

private final List<ProvenanceEventRecord> records = new ArrayList<>();
private final AtomicLong idGenerator = new AtomicLong(0L);
Expand Down
Loading

0 comments on commit eb99335

Please sign in to comment.