diff --git a/docker-compose-test.yml b/docker-compose-test.yml index e80cd17..df11e3b 100644 --- a/docker-compose-test.yml +++ b/docker-compose-test.yml @@ -28,7 +28,7 @@ services: ports: - "8080:8080" - "8787:8787" - - "9990:9990" + - "9990:9990" ################## # Exporter Service @@ -53,9 +53,10 @@ services: #EXPORT_FTP_PORT: "21" #EXPORT_FTP_USER: "" #EXPORT_FTP_PASSWORD: "" + EXPORT_PATH: "/opt/jboss/wildfly/test-export" ports: - "8081:8080" - - "8788:8787" + - "8788:8787" - "9991:9990" volumes: - ./imixs-archive-exporter/docker/deployments:/opt/jboss/wildfly/standalone/deployments/ @@ -85,7 +86,7 @@ services: HEALTH_ENDPOINT: "http://localhost:9992/health" ports: - "8082:8080" - - "8789:8787" + - "8789:8787" - "9992:9990" volumes: - ./imixs-archive-backup/docker/deployments:/opt/jboss/wildfly/standalone/deployments/ diff --git a/imixs-archive-exporter/README.md b/imixs-archive-exporter/README.md index 336899f..407ee8f 100644 --- a/imixs-archive-exporter/README.md +++ b/imixs-archive-exporter/README.md @@ -13,15 +13,18 @@ The Exporter Service can be run in a container environment. To connect the expor WORKFLOW_SERVICE_PASSWORD: [PASSWORD] WORKFLOW_SERVICE_AUTHMETHOD: [AUTHMETHOD] + EXPORT_PATH: [DIRECTORY] EXPORT_FTP_HOST: [FTP-SERVER-ADDRESS] - EXPORT_FTP_PATH: [DIRECTORY] EXPORT_FTP_PORT: "21" EXPORT_FTP_USER: [USER] EXPORT_FTP_PASSWORD: [PASSWORD] + EVENTLOG_TOPIC : "file.export" (default topic) + EVENTLOG_DEADLOCK: 10000 (default timeout) + **Note:** The EXPORT-USER must have manager access. -The following envrionent variables are optional and depend on the Jakarta EE Runtime: +The following environment variables are optional and depend on the Jakarta EE Runtime: - METRICS_ENDPOINT = endpoint for metrics API (default = http://localhost:9990/metrics) - HEALTH_ENDPOINT = endpoint for health API (default = http://localhost:9990/health) diff --git a/imixs-archive-exporter/src/main/java/org/imixs/archive/export/ExportApi.java b/imixs-archive-exporter/src/main/java/org/imixs/archive/export/ExportApi.java index e86c94c..74ba300 100644 --- a/imixs-archive-exporter/src/main/java/org/imixs/archive/export/ExportApi.java +++ b/imixs-archive-exporter/src/main/java/org/imixs/archive/export/ExportApi.java @@ -17,8 +17,8 @@ public class ExportApi extends Application { public static final String WORKFLOW_SYNC_INITIALDELAY = "workflow.sync.initialdelay"; public static final String WORKFLOW_SYNC_DEADLOCK = "workflow.sync.deadlock"; - public static final String EVENTLOG_TOPIC = "file.export"; - public static final String EVENTLOG_DEADLOCK = "export.sync.deadlock"; + public static final String EVENTLOG_TOPIC = "eventlog.topic"; + public static final String EVENTLOG_DEADLOCK = "eventlog.deadlock"; public static final String EXPORT_PATH = "export.path"; diff --git a/imixs-archive-exporter/src/main/java/org/imixs/archive/export/controller/ExportController.java b/imixs-archive-exporter/src/main/java/org/imixs/archive/export/controller/ExportController.java index af78ef7..14f4304 100644 --- a/imixs-archive-exporter/src/main/java/org/imixs/archive/export/controller/ExportController.java +++ b/imixs-archive-exporter/src/main/java/org/imixs/archive/export/controller/ExportController.java @@ -162,7 +162,7 @@ public long getCounterByName(String name) { return entry.getValue().getCount(); } } - logger.warning("Metric Counter : " + name + " not found!"); + logger.fine("Metric Counter : " + name + " not found!"); return 0; } diff --git a/imixs-archive-exporter/src/main/java/org/imixs/archive/export/services/FileService.java b/imixs-archive-exporter/src/main/java/org/imixs/archive/export/services/FileService.java index b54ae78..3b613b3 100644 --- a/imixs-archive-exporter/src/main/java/org/imixs/archive/export/services/FileService.java +++ b/imixs-archive-exporter/src/main/java/org/imixs/archive/export/services/FileService.java @@ -96,16 +96,27 @@ public class FileService { * * @param fileData - the File Data object * @param path - optional sub directory + * @throws ExportException */ - public void writeFileData(FileData fileData, String path) { + public void writeFileData(FileData fileData, String path) throws ExportException { + String workingPath = ""; + if (fileData == null) { + throw new ExportException("EXPORT_EXCEPTION", "FileData object is null!"); + } + if (path == null) { + path = ""; + } + workingPath = fileData.getName(); // verify checksum if file exists try { + String newChecksum = fileData.generateMD5(); FileData oldFile = readFileData(fileData.getName(), path); if (oldFile != null && oldFile.generateMD5().equals(newChecksum)) { // content hast not changed. // no operation needed. + logger.info("file content unchanged: " + workingPath); return; } @@ -116,15 +127,16 @@ public void writeFileData(FileData fileData, String path) { } else { // write file to local file storage - String ftpWorkingPath = computeWorkingDirectory(path); - ftpWorkingPath = ftpWorkingPath + fileData.getName(); - logger.info("---> write file to: " + ftpWorkingPath); - Path nioPath = Paths.get(ftpWorkingPath); - Files.write(nioPath, fileData.getContent()); + workingPath = computeWorkingDirectory(path); + // create sub directories if not exits + Files.createDirectories(Paths.get(workingPath)); + workingPath = workingPath + fileData.getName(); + logger.info("write file content: " + workingPath); + Files.write(Paths.get(workingPath), fileData.getContent()); } - } catch (NoSuchAlgorithmException | ExportException | IOException e) { - e.printStackTrace(); + } catch (NoSuchAlgorithmException | IOException e) { + throw new ExportException("EXPORT_EXCEPTION", "Unable to write file: " + workingPath, e); } } @@ -133,12 +145,35 @@ public void writeFileData(FileData fileData, String path) { * Reads a fileData from a given export path (optional on a FTP Server).. * * @param path + * @throws ExportException */ - public FileData readFileData(String fileName, String path) { - - return null; + public FileData readFileData(String fileName, String path) throws ExportException { + FileData fileData = null; // do we have a ftp server? + if (ftpServer.isPresent()) { + // FTPSClient ftpClient = getFTPClient(); + fileData = ftpGet(fileName, path); + + } else { + // write file to local file storage + String workingPath = computeWorkingDirectory(path); + workingPath = workingPath + fileName; + logger.fine("...read file from: " + workingPath); + Path nioPath = Paths.get(workingPath); + + try { + if (Files.exists(nioPath)) { + byte[] data = Files.readAllBytes(nioPath); + fileData = new FileData(fileName, data, null, null); + } + + } catch (IOException e) { + throw new ExportException("EXPORT_EXCEPTION", "Unable to read file: " + workingPath, e); + } + } + + return fileData; } @@ -178,10 +213,7 @@ private void changeWorkingDirectory(FTPClient ftpClient, String subDirectory) th try { if (!ftpClient.changeWorkingDirectory(subDirectory)) { // try to create it.... - if (!ftpClient.makeDirectory(subDirectory)) { - throw new ExportException(FTP_ERROR, "FTP Error: unable to create sub-directory '" + subDirectory - + "' : " + ftpClient.getReplyString()); - } + makeDirectoryPath(ftpClient, subDirectory); ftpClient.changeWorkingDirectory(subDirectory); } } catch (IOException e) { @@ -189,6 +221,36 @@ private void changeWorkingDirectory(FTPClient ftpClient, String subDirectory) th } } + /** + * Helper method to create a sub directory path + * + * @param ftpClient + * @param dirPath + * @return + * @throws IOException + * @throws ExportException + */ + private static void makeDirectoryPath(FTPClient ftpClient, String dirPath) throws IOException, ExportException { + String[] pathElements = dirPath.split("/"); + if (pathElements != null && pathElements.length > 0) { + for (String singleDir : pathElements) { + if (singleDir.isEmpty()) { + continue; + } + boolean existed = ftpClient.changeWorkingDirectory(singleDir); + if (!existed) { + boolean created = ftpClient.makeDirectory(singleDir); + if (created) { + ftpClient.changeWorkingDirectory(singleDir); + } else { + throw new ExportException(FTP_ERROR, + "FTP file transfer failed - COULD NOT create directory: " + singleDir); + } + } + } + } + } + /** * This method transfers a single byte to a ftp server. * @@ -205,19 +267,16 @@ private void ftpPut(FileData fileData, String path) throws ExportException { InputStream writer = null; FTPClient ftpClient = null; try { - logger.finest("......put " + fileData.getName() + " to FTP server: " + ftpServer + "..."); + logger.info("put file content: " + fileData.getName() + " on FTP server..."); ftpClient = getFTPClient(); - // verify directories changeWorkingDirectory(ftpClient, ftpWorkingPath); // upload file to FTP server. writer = new ByteArrayInputStream(fileData.getContent()); - if (!ftpClient.storeFile(fileData.getName(), writer)) { throw new ExportException(FTP_ERROR, "FTP file transfer failed: unable to write '" + ftpWorkingPath + fileData.getName() + "' : " + ftpClient.getReplyString()); } - logger.finest("...." + ftpWorkingPath + fileData.getName() + " transferred successful to " + ftpServer); } catch (IOException e) { @@ -238,27 +297,6 @@ private void ftpPut(FileData fileData, String path) throws ExportException { } } - /** - * Helper method computes the full target file path - */ - private String computeWorkingDirectory(String path) { - // Compute file path - String ftpWorkingPath = filePath.orElse(""); - if (!ftpWorkingPath.startsWith("/")) { - ftpWorkingPath = "/" + ftpWorkingPath; - } - if (path != null && !path.isEmpty()) { - if (path.startsWith("/")) { - path = path.substring(1); - } - ftpWorkingPath = ftpWorkingPath + path; - } - if (!ftpWorkingPath.endsWith("/")) { - ftpWorkingPath = ftpWorkingPath + "/"; - } - return ftpWorkingPath; - } - /** * This method reads a snapshot form the current working directory * @@ -266,12 +304,23 @@ private String computeWorkingDirectory(String path) { * @throws ExportException * @return snapshot */ - private FileData ftpGet(FTPClient ftpClient, String fileName) throws ExportException { + private FileData ftpGet(String fileName, String path) throws ExportException { long l = System.currentTimeMillis(); - ByteArrayOutputStream bos = null; + if (!ftpServer.isPresent() || !filePath.isPresent()) { + throw new ExportException(FTP_ERROR, + "FTP file transfer failed: no ftp host provided (" + ExportApi.EXPORT_FTP_HOST + ")!"); + } + + String ftpWorkingPath = computeWorkingDirectory(path); + + ByteArrayOutputStream bos = null; + FTPClient ftpClient = null; try { + ftpClient = getFTPClient(); + changeWorkingDirectory(ftpClient, ftpWorkingPath); + logger.finest("......get " + fileName + "..."); bos = new ByteArrayOutputStream(); ftpClient.retrieveFile(fileName, bos); @@ -287,4 +336,24 @@ private FileData ftpGet(FTPClient ftpClient, String fileName) throws ExportExcep } } + /** + * Helper method computes the full target file path + */ + private String computeWorkingDirectory(String path) { + // Compute file path + String ftpWorkingPath = filePath.orElse(""); + if (!ftpWorkingPath.startsWith("/")) { + ftpWorkingPath = "/" + ftpWorkingPath; + } + if (path != null && !path.isEmpty()) { + if (!path.startsWith("/")) { + path = "/" + path; + } + ftpWorkingPath = ftpWorkingPath + path; + } + if (!ftpWorkingPath.endsWith("/")) { + ftpWorkingPath = ftpWorkingPath + "/"; + } + return ftpWorkingPath; + } } diff --git a/imixs-archive-exporter/src/main/java/org/imixs/archive/export/services/SchedulerService.java b/imixs-archive-exporter/src/main/java/org/imixs/archive/export/services/SchedulerService.java index e10e51d..f8c9c10 100644 --- a/imixs-archive-exporter/src/main/java/org/imixs/archive/export/services/SchedulerService.java +++ b/imixs-archive-exporter/src/main/java/org/imixs/archive/export/services/SchedulerService.java @@ -23,6 +23,7 @@ package org.imixs.archive.export.services; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.logging.Logger; @@ -75,6 +76,7 @@ public class SchedulerService { private static Logger logger = Logger.getLogger(SchedulerService.class.getName()); + public static String SNAPSHOTID = "$snapshotid"; // timeout interval in ms @Inject @@ -109,6 +111,11 @@ public class SchedulerService { @ConfigProperty(name = ExportApi.EXPORT_PATH) Optional filePath; + // default topic 'file.export' + @Inject + @ConfigProperty(name = ExportApi.EVENTLOG_TOPIC, defaultValue = "file.export") + String topic; + // deadlock timeout interval in ms @Inject @ConfigProperty(name = ExportApi.EVENTLOG_DEADLOCK, defaultValue = "60000") @@ -179,18 +186,17 @@ private boolean verifyConfiguration() { * The metric generated : * * executions count: count of method executions processing Time: - * application_org_imixs_archive_export_ExportService_executions_total event + * application_org_imixs_archive_export_SchedulerService_executions_total event * count: error count: - * application_org_imixs_archive_export_ExportService_errors_total + * application_org_imixs_archive_export_SchedulerService_errors_total * * * @throws RestAPIException **/ @Counted(name = "executions", description = "Counting the invocations of export service", displayName = "executions") - @SuppressWarnings("unused") + @SuppressWarnings("unchecked") @Timeout public void onTimeout(jakarta.ejb.Timer _timer) { - String topic = null; String id = null; String ref = null; int total = 0; @@ -219,23 +225,27 @@ public void onTimeout(jakarta.ejb.Timer _timer) { // max 100 entries per iteration eventLogClient.setPageSize(100); - List events = eventLogClient.searchEventLog(ExportApi.EVENTLOG_TOPIC); + List events = eventLogClient.searchEventLog(topic); for (ItemCollection eventLogEntry : events) { total++; - topic = eventLogEntry.getItemValueString("topic"); id = eventLogEntry.getItemValueString("id"); ref = eventLogEntry.getItemValueString("ref"); - String path = eventLogEntry.getItemValueString("path"); + String path = ""; + // test if we have a data structure with a path information + List dataList = eventLogEntry.getItemValue("data"); + if (dataList != null && dataList.size() > 0) { + ItemCollection dataItemCol = new ItemCollection(dataList.get(0)); + path = dataItemCol.getItemValueString("path"); + } try { // first try to lock the eventLog entry.... eventLogClient.lockEventLogEntry(id); // pull the snapshotEvent ... - ItemCollection snapshot = pullSnapshot(eventLogEntry, documentClient, eventLogClient); + List fileDataList = pullFileDataList(eventLogEntry, documentClient, eventLogClient); // iterate over all Files and export it - List fileDataList = snapshot.getFileData(); for (FileData fileData : fileDataList) { fileService.writeFileData(fileData, path); success++; @@ -243,14 +253,16 @@ public void onTimeout(jakarta.ejb.Timer _timer) { // finally remove the event log entry... eventLogClient.deleteEventLogEntry(id); - - metricRegistry.counter("application_org_imixs_archive_export_ExportService_events").inc(); + metricRegistry.counter("application_org_imixs_archive_export_services_SchedulerService_events") + .inc(); } catch (InvalidAccessException | EJBException | ExportException | RestAPIException e) { // we also catch EJBExceptions here because we do not want to cancel the // ManagedScheduledExecutorService logService.warning("ExportEvent " + id + " failed: " + e.getMessage()); - metricRegistry.counter("application_org_imixs_archive_export_ExportService_errors").inc(); + metricRegistry.counter("application_org_imixs_archive_export_SchedulerService_errors").inc(); errors++; + // release lock + eventLogClient.unlockEventLogEntry(id); } } @@ -263,7 +275,7 @@ public void onTimeout(jakarta.ejb.Timer _timer) { } catch (InvalidAccessException | EJBException | RestAPIException e) { logService.severe("processing EventLog failed: " + e.getMessage()); - metricRegistry.counter("application_org_imixs_archive_export_ExportService_errors").inc(); + metricRegistry.counter("application_org_imixs_archive_export_SchedulerService_errors").inc(); } } @@ -282,18 +294,20 @@ public void releaseDeadLocks(EventLogClient eventLogClient) throws RestAPIExcept logger.fine("...no eventLogClient available!"); return; } - eventLogClient.releaseDeadLocks(deadLockInterval, ExportApi.EVENTLOG_TOPIC); + eventLogClient.releaseDeadLocks(deadLockInterval, topic); } /** - * This method loads a snapshot from the workflow instance. + * This method loads a snapshot from the workflow instance based on a eventLog + * entry. In case the workflow instance does not have a $snapshotid, the + * worklfow instance itself is returned. . *

* The method returns null if the snapshot no longer exists. In this case the * method automatically deletes the outdated event log entry. * * @throws ExportException */ - public ItemCollection pullSnapshot(ItemCollection eventLogEntry, DocumentClient documentClient, + public List pullFileDataList(ItemCollection eventLogEntry, DocumentClient documentClient, EventLogClient eventLogClient) throws ExportException { if (eventLogEntry == null || documentClient == null || eventLogClient == null) { @@ -301,27 +315,29 @@ public ItemCollection pullSnapshot(ItemCollection eventLogEntry, DocumentClient logger.fine("...no eventLogClient available!"); return null; } - String ref = eventLogEntry.getItemValueString("ref"); - String id = eventLogEntry.getItemValueString("id"); - logger.finest("......pullSnapshot ref " + ref + "..."); - // lookup the snapshot... - ItemCollection snapshot; + try { - snapshot = documentClient.getDocument(ref); + + String id = eventLogEntry.getItemValueString("id"); + logger.finest("......fileData ref=" + ref + "..."); + + // first load the document... + ItemCollection workitem = documentClient.getDocument(ref); + // now test if we have a $snapshotid? + if (workitem.getItemValueString(SNAPSHOTID).isEmpty()) { + // no snapshot exists, return file list + return workitem.getFileData(); + } + + // load the snapshot + + ItemCollection snapshot = documentClient.getDocument(workitem.getItemValueString(SNAPSHOTID)); if (snapshot != null) { - logger.finest("...write snapshot into export store..."); - return snapshot; + return snapshot.getFileData(); } } catch (RestAPIException e) { - logService.warning("Snapshot " + ref + " pull failed: " + e.getMessage()); - // now we need to remove the batch event - logService.warning("EventLogEntry " + id + " will be removed!"); - try { - eventLogClient.deleteEventLogEntry(id); - } catch (RestAPIException e1) { - throw new ExportException("REMOTE_EXCEPTION", "Unable to delete eventLogEntry: " + id, e1); - } + throw new ExportException("TIMER_EXCEPTION", "Snapshot " + ref + " pull failed", e); } return null; } diff --git a/imixs-archive-exporter/src/main/webapp/index.xhtml b/imixs-archive-exporter/src/main/webapp/index.xhtml index 07f6e37..572e8d5 100644 --- a/imixs-archive-exporter/src/main/webapp/index.xhtml +++ b/imixs-archive-exporter/src/main/webapp/index.xhtml @@ -60,15 +60,25 @@ - FTP Host: - #{exportController.ftpServer}:#{exportController.ftpPort} + Export Path: + #{exportController.filePath} + - Export Space: - #{exportController.filePath} + FTP Host: + + + - none - + + + #{exportController.ftpServer}:#{exportController.ftpPort} + + + + @@ -96,7 +106,7 @@

- #{exportController.getCounterByName('eventProcessed')} Events + #{exportController.getCounterByName('events')} Events

Number of processed export events since last start.