From 1899c2bb23dd5d08eed527cd93ebb62f66c4bb74 Mon Sep 17 00:00:00 2001 From: Nikolay Meleshchenko Date: Thu, 8 Dec 2022 15:15:26 +0300 Subject: [PATCH] feat: added parallel handling for rows --- .../boggard/universalreader/CSVReader.java | 21 +-- .../boggard/universalreader/ExcelReader.java | 56 ++++++++ .../universalreader/ReaderConfiguration.java | 13 +- .../universalreader/SpreadsheetReader.java | 11 +- .../boggard/universalreader/TXTReader.java | 15 ++- .../universalreader/UniversalReader.java | 4 +- .../boggard/universalreader/XLSReader.java | 51 -------- .../boggard/universalreader/XLSXReader.java | 120 ------------------ .../contenthandler/ContentHandler.java | 10 +- .../contenthandler/RecordHandler.java | 21 +++ .../universalreader/util/ReaderUtil.java | 16 --- .../util/StringStreamUtil.java | 19 +++ 12 files changed, 135 insertions(+), 222 deletions(-) create mode 100644 src/main/java/com/github/boggard/universalreader/ExcelReader.java delete mode 100644 src/main/java/com/github/boggard/universalreader/XLSReader.java delete mode 100644 src/main/java/com/github/boggard/universalreader/XLSXReader.java create mode 100644 src/main/java/com/github/boggard/universalreader/contenthandler/RecordHandler.java create mode 100644 src/main/java/com/github/boggard/universalreader/util/StringStreamUtil.java diff --git a/src/main/java/com/github/boggard/universalreader/CSVReader.java b/src/main/java/com/github/boggard/universalreader/CSVReader.java index e38a463..d770b27 100644 --- a/src/main/java/com/github/boggard/universalreader/CSVReader.java +++ b/src/main/java/com/github/boggard/universalreader/CSVReader.java @@ -1,32 +1,35 @@ package com.github.boggard.universalreader; import com.github.boggard.universalreader.contenthandler.ContentHandler; +import com.github.boggard.universalreader.contenthandler.RecordHandler; import com.github.boggard.universalreader.util.ReaderUtil; import lombok.experimental.UtilityClass; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.util.stream.Stream; + +import static com.github.boggard.universalreader.util.StringStreamUtil.stringStream; @UtilityClass public class CSVReader { - public static R readRecords(FileSource inputStreamSource, ContentHandler contentsHandler, - ReaderConfiguration configuration) + public static R readRecords(FileSource inputStreamSource, ContentHandler contentsHandler, + ReaderConfiguration configuration) throws IOException { try (InputStream inputStream = inputStreamSource.getInputStream(); BufferedReader reader = ReaderUtil.inputStreamToBufferedReader(inputStream)) { - reader.lines().skip(configuration.getStartLineIndex()).forEach(line -> { - String[] lineRecords = line.split(configuration.getRecordsSeparator()); - for (String lineRecord : lineRecords) { - ReaderUtil.readRecord(lineRecord, contentsHandler, configuration.getFieldsSeparator()); - } - }); + RecordHandler recordHandler = contentsHandler.recordHandler(); + Stream resultStream = stringStream(reader, configuration).skip(configuration.getStartLineIndex()) + .map(line -> line.split(configuration.getRecordsSeparator())) + .flatMap(Stream::of) + .map(line -> recordHandler.handleAndReturnResult(line, configuration.getFieldsSeparator())); contentsHandler.endFile(); - return contentsHandler.getResult(); + return contentsHandler.getResult(resultStream); } } } diff --git a/src/main/java/com/github/boggard/universalreader/ExcelReader.java b/src/main/java/com/github/boggard/universalreader/ExcelReader.java new file mode 100644 index 0000000..4bbd2be --- /dev/null +++ b/src/main/java/com/github/boggard/universalreader/ExcelReader.java @@ -0,0 +1,56 @@ +package com.github.boggard.universalreader; + +import com.github.boggard.universalreader.contenthandler.ContentHandler; +import com.github.boggard.universalreader.contenthandler.RecordHandler; +import lombok.experimental.UtilityClass; +import org.apache.poi.ss.usermodel.*; + +import java.io.File; +import java.io.IOException; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +@UtilityClass +public class ExcelReader { + + private static final DataFormatter DATA_FORMATTER = new DataFormatter(); + + public static R readRecords(File sourceFile, ContentHandler contentsHandler, + ReaderConfiguration configuration) + throws IOException { + RecordHandler recordHandler = contentsHandler.recordHandler(); + try (Workbook workbook = WorkbookFactory.create(sourceFile)) { + Stream resultStream = StreamSupport.stream(workbook.spliterator(), configuration.isParallelProcessing()) + .flatMap(sheet -> createRowStream(configuration, sheet)) + .map(row -> handleRow(recordHandler, row)); + + contentsHandler.endFile(); + + return contentsHandler.getResult(resultStream); + } + } + + private static Stream createRowStream(ReaderConfiguration configuration, Sheet sheet) { + return StreamSupport.stream(sheet.spliterator(), configuration.isParallelProcessing()) + .skip(configuration.getStartLineIndex()) + .filter(row -> row != null && row.getRowNum() < sheet.getLastRowNum() + 1); + } + + private static T handleRow(RecordHandler recordHandler, Row row) { + T rec = recordHandler.startRecord(); + row.cellIterator().forEachRemaining(cell -> { + String value; + if (cell.getCellType() == CellType.NUMERIC) { + CellStyle cellStyle = cell.getCellStyle(); + value = DATA_FORMATTER.formatRawCellContents(cell.getNumericCellValue(), + cellStyle.getDataFormat(), cellStyle.getDataFormatString()); + } else { + value = cell.getStringCellValue(); + } + + recordHandler.startField(rec, cell.getColumnIndex(), value); + } + ); + return recordHandler.endRecord(rec); + } +} diff --git a/src/main/java/com/github/boggard/universalreader/ReaderConfiguration.java b/src/main/java/com/github/boggard/universalreader/ReaderConfiguration.java index f127e61..85ccab3 100644 --- a/src/main/java/com/github/boggard/universalreader/ReaderConfiguration.java +++ b/src/main/java/com/github/boggard/universalreader/ReaderConfiguration.java @@ -8,23 +8,27 @@ public class ReaderConfiguration { private static final String DEFAULT_RECORDS_SEPARATOR = ";"; private static final String DEFAULT_FIELDS_SEPARATOR = ","; private static final int DEFAULT_START_INDEX = 0; + private static final boolean DEFAULT_PARALLEL_PROCESSING = true; private String recordsSeparator; private String fieldsSeparator; private int startLineIndex; + private boolean parallelProcessing; private ReaderConfiguration() { this(DEFAULT_START_INDEX); } private ReaderConfiguration(int startLineIndex) { - this(DEFAULT_RECORDS_SEPARATOR, DEFAULT_FIELDS_SEPARATOR, startLineIndex); + this(DEFAULT_RECORDS_SEPARATOR, DEFAULT_FIELDS_SEPARATOR, startLineIndex, DEFAULT_PARALLEL_PROCESSING); } - private ReaderConfiguration(String recordsSeparator, String fieldsSeparator, int startLineIndex) { + private ReaderConfiguration(String recordsSeparator, String fieldsSeparator, int startLineIndex, + boolean parallelProcessing) { this.recordsSeparator = recordsSeparator; this.fieldsSeparator = fieldsSeparator; this.startLineIndex = startLineIndex; + this.parallelProcessing = parallelProcessing; } public static ReaderConfiguration defaultReaderConfiguration() { @@ -35,7 +39,8 @@ public static ReaderConfiguration withHeaderDefaultConfiguration() { return new ReaderConfiguration(1); } - public static ReaderConfiguration customConfiguration(String recordsSeparator, String fieldsSeparator, int startLineIndex) { - return new ReaderConfiguration(recordsSeparator, fieldsSeparator, startLineIndex); + public static ReaderConfiguration customConfiguration(String recordsSeparator, String fieldsSeparator, int startLineIndex, + boolean parallelProcessing) { + return new ReaderConfiguration(recordsSeparator, fieldsSeparator, startLineIndex, parallelProcessing); } } diff --git a/src/main/java/com/github/boggard/universalreader/SpreadsheetReader.java b/src/main/java/com/github/boggard/universalreader/SpreadsheetReader.java index 6d92e08..c35bfd1 100644 --- a/src/main/java/com/github/boggard/universalreader/SpreadsheetReader.java +++ b/src/main/java/com/github/boggard/universalreader/SpreadsheetReader.java @@ -2,7 +2,6 @@ import com.github.boggard.universalreader.contenthandler.ContentHandler; import lombok.experimental.UtilityClass; -import org.apache.poi.openxml4j.exceptions.OLE2NotOfficeXmlFileException; import org.apache.poi.openxml4j.exceptions.OpenXML4JException; import org.apache.poi.util.IOUtils; import org.xml.sax.SAXException; @@ -16,19 +15,13 @@ @UtilityClass public class SpreadsheetReader { - public static R readRecords(FileSource fileSource, ContentHandler contentsHandler, + public static R readRecords(FileSource fileSource, ContentHandler contentsHandler, ReaderConfiguration configuration) throws IOException, ParserConfigurationException, SAXException, OpenXML4JException { File tempFile = File.createTempFile("spreadsheet-temp", null); writeToFile(fileSource, tempFile); - R result; - - try { - result = XLSXReader.readRecords(tempFile, contentsHandler, configuration); - } catch (OLE2NotOfficeXmlFileException e) { - result = XLSReader.readRecords(tempFile, contentsHandler, configuration); - } + R result = ExcelReader.readRecords(tempFile, contentsHandler, configuration); Files.delete(tempFile.toPath()); diff --git a/src/main/java/com/github/boggard/universalreader/TXTReader.java b/src/main/java/com/github/boggard/universalreader/TXTReader.java index b5273a5..e579fed 100644 --- a/src/main/java/com/github/boggard/universalreader/TXTReader.java +++ b/src/main/java/com/github/boggard/universalreader/TXTReader.java @@ -1,28 +1,33 @@ package com.github.boggard.universalreader; import com.github.boggard.universalreader.contenthandler.ContentHandler; +import com.github.boggard.universalreader.contenthandler.RecordHandler; import com.github.boggard.universalreader.util.ReaderUtil; import lombok.experimental.UtilityClass; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.util.stream.Stream; + +import static com.github.boggard.universalreader.util.StringStreamUtil.stringStream; @UtilityClass public class TXTReader { - public static R readRecords(FileSource fileSource, ContentHandler contentsHandler, - ReaderConfiguration configuration) + public static R readRecords(FileSource fileSource, ContentHandler contentsHandler, + ReaderConfiguration configuration) throws IOException { try (InputStream inputStream = fileSource.getInputStream(); BufferedReader reader = ReaderUtil.inputStreamToBufferedReader(inputStream)) { - reader.lines().skip(configuration.getStartLineIndex()) - .forEach(line -> ReaderUtil.readRecord(line, contentsHandler, configuration.getFieldsSeparator())); + RecordHandler recordHandler = contentsHandler.recordHandler(); + Stream resultStream = stringStream(reader, configuration).skip(configuration.getStartLineIndex()) + .map(line -> recordHandler.handleAndReturnResult(line, configuration.getFieldsSeparator())); contentsHandler.endFile(); - return contentsHandler.getResult(); + return contentsHandler.getResult(resultStream); } } } diff --git a/src/main/java/com/github/boggard/universalreader/UniversalReader.java b/src/main/java/com/github/boggard/universalreader/UniversalReader.java index 6379f02..a87d597 100644 --- a/src/main/java/com/github/boggard/universalreader/UniversalReader.java +++ b/src/main/java/com/github/boggard/universalreader/UniversalReader.java @@ -13,8 +13,8 @@ @UtilityClass public class UniversalReader { - public static R processRecords(FileSource fileSource, ContentHandler contentsHandler, - ReaderConfiguration configuration) { + public static R processRecords(FileSource fileSource, ContentHandler contentsHandler, + ReaderConfiguration configuration) { try { log.debug("Parsing file: " + fileSource.getFileName()); diff --git a/src/main/java/com/github/boggard/universalreader/XLSReader.java b/src/main/java/com/github/boggard/universalreader/XLSReader.java deleted file mode 100644 index 45072f1..0000000 --- a/src/main/java/com/github/boggard/universalreader/XLSReader.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.github.boggard.universalreader; - -import com.github.boggard.universalreader.contenthandler.ContentHandler; -import com.github.boggard.universalreader.util.ReaderUtil; -import lombok.experimental.UtilityClass; -import org.apache.poi.ss.usermodel.*; - -import java.io.File; -import java.io.IOException; - -@UtilityClass -public class XLSReader { - - private static final DataFormatter DATA_FORMATTER = new DataFormatter(); - - public static R readRecords(File sourceFile, ContentHandler contentsHandler, - ReaderConfiguration configuration) - throws IOException { - try (Workbook workbook = WorkbookFactory.create(sourceFile)) { - workbook.sheetIterator().forEachRemaining(sheet -> { - for (int i = configuration.getStartLineIndex(); i < sheet.getLastRowNum() + 1; i++) { - Row row = sheet.getRow(i); - - if (row != null) { - contentsHandler.startRecord(); - - row.cellIterator().forEachRemaining(cell -> { - String value; - - if (cell.getCellType() == CellType.NUMERIC) { - CellStyle cellStyle = cell.getCellStyle(); - value = DATA_FORMATTER.formatRawCellContents(cell.getNumericCellValue(), - cellStyle.getDataFormat(), cellStyle.getDataFormatString()); - } else { - value = cell.getStringCellValue(); - } - - contentsHandler.startField(cell.getColumnIndex(), value); - }); - - ReaderUtil.readRecord(contentsHandler); - } - } - }); - - contentsHandler.endFile(); - - return contentsHandler.getResult(); - } - } -} diff --git a/src/main/java/com/github/boggard/universalreader/XLSXReader.java b/src/main/java/com/github/boggard/universalreader/XLSXReader.java deleted file mode 100644 index 056f505..0000000 --- a/src/main/java/com/github/boggard/universalreader/XLSXReader.java +++ /dev/null @@ -1,120 +0,0 @@ -package com.github.boggard.universalreader; - -import com.github.boggard.universalreader.contenthandler.ContentHandler; -import com.github.boggard.universalreader.util.ReaderUtil; -import lombok.RequiredArgsConstructor; -import lombok.experimental.UtilityClass; -import org.apache.poi.ooxml.util.SAXHelper; -import org.apache.poi.openxml4j.exceptions.OpenXML4JException; -import org.apache.poi.openxml4j.opc.OPCPackage; -import org.apache.poi.xssf.eventusermodel.XSSFReader; -import org.apache.poi.xssf.eventusermodel.XSSFSheetXMLHandler; -import org.apache.poi.xssf.eventusermodel.XSSFSheetXMLHandler.SheetContentsHandler; -import org.apache.poi.xssf.model.SharedStrings; -import org.apache.poi.xssf.model.Styles; -import org.apache.poi.xssf.usermodel.XSSFComment; -import org.xml.sax.InputSource; -import org.xml.sax.SAXException; -import org.xml.sax.XMLReader; - -import javax.xml.parsers.ParserConfigurationException; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.util.Iterator; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -@UtilityClass -public class XLSXReader { - - public static R readRecords(File sourceFile, ContentHandler contentsHandler, - ReaderConfiguration configuration) - throws IOException, OpenXML4JException, ParserConfigurationException, SAXException { - try (OPCPackage pkg = OPCPackage.open(sourceFile)) { - XSSFReader reader = new XSSFReader(pkg); - - SheetContentsHandlerImpl sheetContentsHandler = new SheetContentsHandlerImpl<>(contentsHandler, configuration); - XMLReader parser = fetchSheetParser(reader.getStylesTable(), reader.getSharedStringsTable(), - sheetContentsHandler); - - Iterator sheetsData = reader.getSheetsData(); - sheetsData.forEachRemaining(sheet -> processSheet(parser, sheet)); - - contentsHandler.endFile(); - - return contentsHandler.getResult(); - } - } - - private XMLReader fetchSheetParser(Styles styles, SharedStrings sharedStrings, SheetContentsHandler contentsHandler) - throws SAXException, ParserConfigurationException { - XMLReader parser = SAXHelper.newXMLReader(); - XSSFSheetXMLHandler handler = new XSSFSheetXMLHandler(styles, sharedStrings, contentsHandler, true); - parser.setContentHandler(handler); - return parser; - } - - private void processSheet(XMLReader parser, InputStream sheet) { - try { - InputSource sheetSource = new InputSource(sheet); - parser.parse(sheetSource); - sheet.close(); - } catch (SAXException | IOException ex) { - throw new IllegalStateException(ex); - } - } - - @RequiredArgsConstructor - private static class SheetContentsHandlerImpl implements SheetContentsHandler { - - private static final Pattern CELL_REFERENCE_PATTERN = Pattern.compile("[A-Z]+"); - private final ContentHandler contentsHandler; - private final ReaderConfiguration configuration; - - private int currentRowNum = -1; - - @Override - public void startRow(int rowNum) { - currentRowNum = rowNum; - - if (isContentShouldBeHandled()) { - contentsHandler.startRecord(); - } - } - - @Override - public void endRow(int rowNum) { - if (isContentShouldBeHandled()) { - ReaderUtil.readRecord(contentsHandler); - } - } - - @Override - public void cell(String cellReference, String formattedValue, XSSFComment comment) { - if (isContentShouldBeHandled()) { - int index = cellReferenceToIndex(cellReference); - contentsHandler.startField(index, formattedValue); - } - } - - private static int cellReferenceToIndex(String cellReference) { - Matcher matcher = CELL_REFERENCE_PATTERN.matcher(cellReference); - - if (matcher.find()) { - String cellLetter = matcher.group(); - return findPosition(cellLetter.charAt(0)) * cellLetter.length(); - } else { - return -1; - } - } - - private static int findPosition(char inputLetter) { - return Character.toLowerCase(inputLetter) - 'a'; - } - - private boolean isContentShouldBeHandled() { - return currentRowNum >= configuration.getStartLineIndex(); - } - } -} diff --git a/src/main/java/com/github/boggard/universalreader/contenthandler/ContentHandler.java b/src/main/java/com/github/boggard/universalreader/contenthandler/ContentHandler.java index 5e283fb..a09a2ef 100644 --- a/src/main/java/com/github/boggard/universalreader/contenthandler/ContentHandler.java +++ b/src/main/java/com/github/boggard/universalreader/contenthandler/ContentHandler.java @@ -1,16 +1,14 @@ package com.github.boggard.universalreader.contenthandler; -public interface ContentHandler { +import java.util.stream.Stream; - void startRecord(); +public interface ContentHandler { - void startField(int index, String field); - - void endRecord(); + RecordHandler recordHandler(); default void endFile() { //by default do nothing } - R getResult(); + R getResult(Stream resultStream); } diff --git a/src/main/java/com/github/boggard/universalreader/contenthandler/RecordHandler.java b/src/main/java/com/github/boggard/universalreader/contenthandler/RecordHandler.java new file mode 100644 index 0000000..daa8575 --- /dev/null +++ b/src/main/java/com/github/boggard/universalreader/contenthandler/RecordHandler.java @@ -0,0 +1,21 @@ +package com.github.boggard.universalreader.contenthandler; + +public interface RecordHandler { + + default R handleAndReturnResult(String record, String fieldsSeparator) { + R rec = startRecord(); + + String[] fields = record.split(fieldsSeparator); + for (int i = 0; i < fields.length; i++) { + startField(rec, i, fields[i]); + } + + return endRecord(rec); + } + + R startRecord(); + + void startField(R record, int index, String field); + + R endRecord(R record); +} diff --git a/src/main/java/com/github/boggard/universalreader/util/ReaderUtil.java b/src/main/java/com/github/boggard/universalreader/util/ReaderUtil.java index 246cb3b..ed83522 100644 --- a/src/main/java/com/github/boggard/universalreader/util/ReaderUtil.java +++ b/src/main/java/com/github/boggard/universalreader/util/ReaderUtil.java @@ -1,6 +1,5 @@ package com.github.boggard.universalreader.util; -import com.github.boggard.universalreader.contenthandler.ContentHandler; import lombok.experimental.UtilityClass; import org.mozilla.universalchardet.UnicodeBOMInputStream; @@ -11,21 +10,6 @@ @UtilityClass public class ReaderUtil { - public static void readRecord(String record, ContentHandler contentsHandler, String recordsSeparator) { - contentsHandler.startRecord(); - - String[] fields = record.split(recordsSeparator); - for (int i = 0; i < fields.length; i++) { - contentsHandler.startField(i, fields[i]); - } - - readRecord(contentsHandler); - } - - public static void readRecord(ContentHandler contentsHandler) { - contentsHandler.endRecord(); - } - public static BufferedReader inputStreamToBufferedReader(InputStream inputStream) throws IOException { Charset defaultCharset = StandardCharsets.UTF_8; diff --git a/src/main/java/com/github/boggard/universalreader/util/StringStreamUtil.java b/src/main/java/com/github/boggard/universalreader/util/StringStreamUtil.java new file mode 100644 index 0000000..61d0fce --- /dev/null +++ b/src/main/java/com/github/boggard/universalreader/util/StringStreamUtil.java @@ -0,0 +1,19 @@ +package com.github.boggard.universalreader.util; + +import com.github.boggard.universalreader.ReaderConfiguration; +import lombok.experimental.UtilityClass; + +import java.io.BufferedReader; +import java.util.stream.Stream; + +@UtilityClass +public class StringStreamUtil { + + public static Stream stringStream(BufferedReader reader, ReaderConfiguration configuration) { + if (configuration.isParallelProcessing()) { + return reader.lines().parallel(); + } else { + return reader.lines(); + } + } +}