Skip to content

Commit

Permalink
feat: added parallel handling for rows
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikolay Meleshchenko committed Dec 9, 2022
1 parent 1cdfa90 commit 1899c2b
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 222 deletions.
21 changes: 12 additions & 9 deletions src/main/java/com/github/boggard/universalreader/CSVReader.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
package com.github.boggard.universalreader;

import com.github.boggard.universalreader.contenthandler.ContentHandler;
import com.github.boggard.universalreader.contenthandler.RecordHandler;
import com.github.boggard.universalreader.util.ReaderUtil;
import lombok.experimental.UtilityClass;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.stream.Stream;

import static com.github.boggard.universalreader.util.StringStreamUtil.stringStream;

@UtilityClass
public class CSVReader {

public static <R> R readRecords(FileSource inputStreamSource, ContentHandler<R> contentsHandler,
ReaderConfiguration configuration)
public static <R, T> R readRecords(FileSource inputStreamSource, ContentHandler<R, T> contentsHandler,
ReaderConfiguration configuration)
throws IOException {
try (InputStream inputStream = inputStreamSource.getInputStream();
BufferedReader reader = ReaderUtil.inputStreamToBufferedReader(inputStream)) {

reader.lines().skip(configuration.getStartLineIndex()).forEach(line -> {
String[] lineRecords = line.split(configuration.getRecordsSeparator());
for (String lineRecord : lineRecords) {
ReaderUtil.readRecord(lineRecord, contentsHandler, configuration.getFieldsSeparator());
}
});
RecordHandler<T> recordHandler = contentsHandler.recordHandler();
Stream<T> resultStream = stringStream(reader, configuration).skip(configuration.getStartLineIndex())
.map(line -> line.split(configuration.getRecordsSeparator()))
.flatMap(Stream::of)
.map(line -> recordHandler.handleAndReturnResult(line, configuration.getFieldsSeparator()));

contentsHandler.endFile();

return contentsHandler.getResult();
return contentsHandler.getResult(resultStream);
}
}
}
56 changes: 56 additions & 0 deletions src/main/java/com/github/boggard/universalreader/ExcelReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.github.boggard.universalreader;

import com.github.boggard.universalreader.contenthandler.ContentHandler;
import com.github.boggard.universalreader.contenthandler.RecordHandler;
import lombok.experimental.UtilityClass;
import org.apache.poi.ss.usermodel.*;

import java.io.File;
import java.io.IOException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

@UtilityClass
public class ExcelReader {

private static final DataFormatter DATA_FORMATTER = new DataFormatter();

public static <R, T> R readRecords(File sourceFile, ContentHandler<R, T> contentsHandler,
ReaderConfiguration configuration)
throws IOException {
RecordHandler<T> recordHandler = contentsHandler.recordHandler();
try (Workbook workbook = WorkbookFactory.create(sourceFile)) {
Stream<T> resultStream = StreamSupport.stream(workbook.spliterator(), configuration.isParallelProcessing())
.flatMap(sheet -> createRowStream(configuration, sheet))
.map(row -> handleRow(recordHandler, row));

contentsHandler.endFile();

return contentsHandler.getResult(resultStream);
}
}

private static Stream<Row> createRowStream(ReaderConfiguration configuration, Sheet sheet) {
return StreamSupport.stream(sheet.spliterator(), configuration.isParallelProcessing())
.skip(configuration.getStartLineIndex())
.filter(row -> row != null && row.getRowNum() < sheet.getLastRowNum() + 1);
}

private static <T> T handleRow(RecordHandler<T> recordHandler, Row row) {
T rec = recordHandler.startRecord();
row.cellIterator().forEachRemaining(cell -> {
String value;
if (cell.getCellType() == CellType.NUMERIC) {
CellStyle cellStyle = cell.getCellStyle();
value = DATA_FORMATTER.formatRawCellContents(cell.getNumericCellValue(),
cellStyle.getDataFormat(), cellStyle.getDataFormatString());
} else {
value = cell.getStringCellValue();
}

recordHandler.startField(rec, cell.getColumnIndex(), value);
}
);
return recordHandler.endRecord(rec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,27 @@ public class ReaderConfiguration {
private static final String DEFAULT_RECORDS_SEPARATOR = ";";
private static final String DEFAULT_FIELDS_SEPARATOR = ",";
private static final int DEFAULT_START_INDEX = 0;
private static final boolean DEFAULT_PARALLEL_PROCESSING = true;

private String recordsSeparator;
private String fieldsSeparator;
private int startLineIndex;
private boolean parallelProcessing;

private ReaderConfiguration() {
this(DEFAULT_START_INDEX);
}

private ReaderConfiguration(int startLineIndex) {
this(DEFAULT_RECORDS_SEPARATOR, DEFAULT_FIELDS_SEPARATOR, startLineIndex);
this(DEFAULT_RECORDS_SEPARATOR, DEFAULT_FIELDS_SEPARATOR, startLineIndex, DEFAULT_PARALLEL_PROCESSING);
}

private ReaderConfiguration(String recordsSeparator, String fieldsSeparator, int startLineIndex) {
private ReaderConfiguration(String recordsSeparator, String fieldsSeparator, int startLineIndex,
boolean parallelProcessing) {
this.recordsSeparator = recordsSeparator;
this.fieldsSeparator = fieldsSeparator;
this.startLineIndex = startLineIndex;
this.parallelProcessing = parallelProcessing;
}

public static ReaderConfiguration defaultReaderConfiguration() {
Expand All @@ -35,7 +39,8 @@ public static ReaderConfiguration withHeaderDefaultConfiguration() {
return new ReaderConfiguration(1);
}

public static ReaderConfiguration customConfiguration(String recordsSeparator, String fieldsSeparator, int startLineIndex) {
return new ReaderConfiguration(recordsSeparator, fieldsSeparator, startLineIndex);
public static ReaderConfiguration customConfiguration(String recordsSeparator, String fieldsSeparator, int startLineIndex,
boolean parallelProcessing) {
return new ReaderConfiguration(recordsSeparator, fieldsSeparator, startLineIndex, parallelProcessing);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.github.boggard.universalreader.contenthandler.ContentHandler;
import lombok.experimental.UtilityClass;
import org.apache.poi.openxml4j.exceptions.OLE2NotOfficeXmlFileException;
import org.apache.poi.openxml4j.exceptions.OpenXML4JException;
import org.apache.poi.util.IOUtils;
import org.xml.sax.SAXException;
Expand All @@ -16,19 +15,13 @@
@UtilityClass
public class SpreadsheetReader {

public static <R> R readRecords(FileSource fileSource, ContentHandler<R> contentsHandler,
public static <R, T> R readRecords(FileSource fileSource, ContentHandler<R, T> contentsHandler,
ReaderConfiguration configuration)
throws IOException, ParserConfigurationException, SAXException, OpenXML4JException {
File tempFile = File.createTempFile("spreadsheet-temp", null);
writeToFile(fileSource, tempFile);

R result;

try {
result = XLSXReader.readRecords(tempFile, contentsHandler, configuration);
} catch (OLE2NotOfficeXmlFileException e) {
result = XLSReader.readRecords(tempFile, contentsHandler, configuration);
}
R result = ExcelReader.readRecords(tempFile, contentsHandler, configuration);

Files.delete(tempFile.toPath());

Expand Down
15 changes: 10 additions & 5 deletions src/main/java/com/github/boggard/universalreader/TXTReader.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
package com.github.boggard.universalreader;

import com.github.boggard.universalreader.contenthandler.ContentHandler;
import com.github.boggard.universalreader.contenthandler.RecordHandler;
import com.github.boggard.universalreader.util.ReaderUtil;
import lombok.experimental.UtilityClass;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.stream.Stream;

import static com.github.boggard.universalreader.util.StringStreamUtil.stringStream;

@UtilityClass
public class TXTReader {

public static <R> R readRecords(FileSource fileSource, ContentHandler<R> contentsHandler,
ReaderConfiguration configuration)
public static <R, T> R readRecords(FileSource fileSource, ContentHandler<R, T> contentsHandler,
ReaderConfiguration configuration)
throws IOException {
try (InputStream inputStream = fileSource.getInputStream();
BufferedReader reader = ReaderUtil.inputStreamToBufferedReader(inputStream)) {

reader.lines().skip(configuration.getStartLineIndex())
.forEach(line -> ReaderUtil.readRecord(line, contentsHandler, configuration.getFieldsSeparator()));
RecordHandler<T> recordHandler = contentsHandler.recordHandler();
Stream<T> resultStream = stringStream(reader, configuration).skip(configuration.getStartLineIndex())
.map(line -> recordHandler.handleAndReturnResult(line, configuration.getFieldsSeparator()));

contentsHandler.endFile();

return contentsHandler.getResult();
return contentsHandler.getResult(resultStream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
@UtilityClass
public class UniversalReader {

public static <R> R processRecords(FileSource fileSource, ContentHandler<R> contentsHandler,
ReaderConfiguration configuration) {
public static <R, T> R processRecords(FileSource fileSource, ContentHandler<R, T> contentsHandler,
ReaderConfiguration configuration) {
try {
log.debug("Parsing file: " + fileSource.getFileName());

Expand Down
51 changes: 0 additions & 51 deletions src/main/java/com/github/boggard/universalreader/XLSReader.java

This file was deleted.

120 changes: 0 additions & 120 deletions src/main/java/com/github/boggard/universalreader/XLSXReader.java

This file was deleted.

Loading

0 comments on commit 1899c2b

Please sign in to comment.