diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/Metadata.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/Metadata.java index 32dfd1f..47de9b8 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/Metadata.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/Metadata.java @@ -30,6 +30,7 @@ */ class Metadata { static final String HEADER_PATH = "file.path"; + static final String HEADER_PARENT_DIR_NAME = "file.parent.dir.name"; static final String HEADER_NAME = "file.name"; static final String HEADER_NAME_WITHOUT_EXTENSION = "file.name.without.extension"; static final String HEADER_LAST_MODIFIED = "file.last.modified"; @@ -41,12 +42,14 @@ class Metadata { final String nameWithoutExtension; final Date lastModified; final long length; + String parentDirName = null; public static final Map HEADER_DESCRIPTIONS; static { Map result = new LinkedHashMap<>(); result.put(HEADER_PATH, "The absolute path to the file ingested."); + result.put(HEADER_PARENT_DIR_NAME, "The parent directory name of the file ingested"); result.put(HEADER_NAME, "The name part of the file ingested."); result.put(HEADER_NAME_WITHOUT_EXTENSION, "The file name without the extension part of the file."); result.put(HEADER_LAST_MODIFIED, "The last modified date of the file."); @@ -78,6 +81,10 @@ public Metadata(File file) { this.lastModified = new Date(file.lastModified()); this.length = file.length(); this.nameWithoutExtension = Files.getNameWithoutExtension(this.name); + + if (file.getParentFile() != null) { + this.parentDirName = file.getParentFile().getName(); + } } /** @@ -90,6 +97,7 @@ public Headers headers(long offset) { headers.addString(HEADER_NAME, this.name); headers.addString(HEADER_NAME_WITHOUT_EXTENSION, this.nameWithoutExtension); headers.addString(HEADER_PATH, this.path); + headers.addString(HEADER_PARENT_DIR_NAME, this.parentDirName); headers.addLong(HEADER_LENGTH, this.length); headers.addLong(HEADER_OFFSET, offset); headers.addTimestamp(HEADER_LAST_MODIFIED, this.lastModified); diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceTaskTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceTaskTest.java index 4db2caf..48078b2 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceTaskTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceTaskTest.java @@ -151,7 +151,8 @@ protected void poll(final String packageName, TestCase testCase) throws Interrup Metadata.HEADER_LAST_MODIFIED, Metadata.HEADER_PATH, Metadata.HEADER_LENGTH, - Metadata.HEADER_NAME_WITHOUT_EXTENSION + Metadata.HEADER_NAME_WITHOUT_EXTENSION, + Metadata.HEADER_PARENT_DIR_NAME ); for (int i = 0; i < testCase.expected.size(); i++) {