Skip to content

Commit

Permalink
feat: added parallel handling for rows (besides XLSX)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikolay Meleshchenko committed Dec 8, 2022
1 parent 1cdfa90 commit 1ff5102
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 79 deletions.
21 changes: 12 additions & 9 deletions src/main/java/com/github/boggard/universalreader/CSVReader.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
package com.github.boggard.universalreader;

import com.github.boggard.universalreader.contenthandler.ContentHandler;
import com.github.boggard.universalreader.contenthandler.RecordHandler;
import com.github.boggard.universalreader.util.ReaderUtil;
import lombok.experimental.UtilityClass;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.stream.Stream;

import static com.github.boggard.universalreader.util.StringStreamUtil.stringStream;

@UtilityClass
public class CSVReader {

public static <R> R readRecords(FileSource inputStreamSource, ContentHandler<R> contentsHandler,
ReaderConfiguration configuration)
public static <R, T> R readRecords(FileSource inputStreamSource, ContentHandler<R, T> contentsHandler,
ReaderConfiguration configuration)
throws IOException {
try (InputStream inputStream = inputStreamSource.getInputStream();
BufferedReader reader = ReaderUtil.inputStreamToBufferedReader(inputStream)) {

reader.lines().skip(configuration.getStartLineIndex()).forEach(line -> {
String[] lineRecords = line.split(configuration.getRecordsSeparator());
for (String lineRecord : lineRecords) {
ReaderUtil.readRecord(lineRecord, contentsHandler, configuration.getFieldsSeparator());
}
});
RecordHandler<T> recordHandler = contentsHandler.recordHandler();
Stream<T> resultStream = stringStream(reader, configuration).skip(configuration.getStartLineIndex())
.map(line -> line.split(configuration.getRecordsSeparator()))
.flatMap(Stream::of)
.map(line -> recordHandler.handleAndReturnResult(line, configuration.getFieldsSeparator()));

contentsHandler.endFile();

return contentsHandler.getResult();
return contentsHandler.getResult(resultStream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,27 @@ public class ReaderConfiguration {
private static final String DEFAULT_RECORDS_SEPARATOR = ";";
private static final String DEFAULT_FIELDS_SEPARATOR = ",";
private static final int DEFAULT_START_INDEX = 0;
private static final boolean DEFAULT_PARALLEL_PROCESSING = true;

private String recordsSeparator;
private String fieldsSeparator;
private int startLineIndex;
private boolean parallelProcessing;

private ReaderConfiguration() {
this(DEFAULT_START_INDEX);
}

private ReaderConfiguration(int startLineIndex) {
this(DEFAULT_RECORDS_SEPARATOR, DEFAULT_FIELDS_SEPARATOR, startLineIndex);
this(DEFAULT_RECORDS_SEPARATOR, DEFAULT_FIELDS_SEPARATOR, startLineIndex, DEFAULT_PARALLEL_PROCESSING);
}

private ReaderConfiguration(String recordsSeparator, String fieldsSeparator, int startLineIndex) {
private ReaderConfiguration(String recordsSeparator, String fieldsSeparator, int startLineIndex,
boolean parallelProcessing) {
this.recordsSeparator = recordsSeparator;
this.fieldsSeparator = fieldsSeparator;
this.startLineIndex = startLineIndex;
this.parallelProcessing = parallelProcessing;
}

public static ReaderConfiguration defaultReaderConfiguration() {
Expand All @@ -35,7 +39,8 @@ public static ReaderConfiguration withHeaderDefaultConfiguration() {
return new ReaderConfiguration(1);
}

public static ReaderConfiguration customConfiguration(String recordsSeparator, String fieldsSeparator, int startLineIndex) {
return new ReaderConfiguration(recordsSeparator, fieldsSeparator, startLineIndex);
public static ReaderConfiguration customConfiguration(String recordsSeparator, String fieldsSeparator, int startLineIndex,
boolean parallelProcessing) {
return new ReaderConfiguration(recordsSeparator, fieldsSeparator, startLineIndex, parallelProcessing);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@UtilityClass
public class SpreadsheetReader {

public static <R> R readRecords(FileSource fileSource, ContentHandler<R> contentsHandler,
public static <R, T> R readRecords(FileSource fileSource, ContentHandler<R, T> contentsHandler,
ReaderConfiguration configuration)
throws IOException, ParserConfigurationException, SAXException, OpenXML4JException {
File tempFile = File.createTempFile("spreadsheet-temp", null);
Expand Down
15 changes: 10 additions & 5 deletions src/main/java/com/github/boggard/universalreader/TXTReader.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
package com.github.boggard.universalreader;

import com.github.boggard.universalreader.contenthandler.ContentHandler;
import com.github.boggard.universalreader.contenthandler.RecordHandler;
import com.github.boggard.universalreader.util.ReaderUtil;
import lombok.experimental.UtilityClass;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.stream.Stream;

import static com.github.boggard.universalreader.util.StringStreamUtil.stringStream;

@UtilityClass
public class TXTReader {

public static <R> R readRecords(FileSource fileSource, ContentHandler<R> contentsHandler,
ReaderConfiguration configuration)
public static <R, T> R readRecords(FileSource fileSource, ContentHandler<R, T> contentsHandler,
ReaderConfiguration configuration)
throws IOException {
try (InputStream inputStream = fileSource.getInputStream();
BufferedReader reader = ReaderUtil.inputStreamToBufferedReader(inputStream)) {

reader.lines().skip(configuration.getStartLineIndex())
.forEach(line -> ReaderUtil.readRecord(line, contentsHandler, configuration.getFieldsSeparator()));
RecordHandler<T> recordHandler = contentsHandler.recordHandler();
Stream<T> resultStream = stringStream(reader, configuration).skip(configuration.getStartLineIndex())
.map(line -> recordHandler.handleAndReturnResult(line, configuration.getFieldsSeparator()));

contentsHandler.endFile();

return contentsHandler.getResult();
return contentsHandler.getResult(resultStream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
@UtilityClass
public class UniversalReader {

public static <R> R processRecords(FileSource fileSource, ContentHandler<R> contentsHandler,
ReaderConfiguration configuration) {
public static <R, T> R processRecords(FileSource fileSource, ContentHandler<R, T> contentsHandler,
ReaderConfiguration configuration) {
try {
log.debug("Parsing file: " + fileSource.getFileName());

Expand Down
59 changes: 32 additions & 27 deletions src/main/java/com/github/boggard/universalreader/XLSReader.java
Original file line number Diff line number Diff line change
@@ -1,51 +1,56 @@
package com.github.boggard.universalreader;

import com.github.boggard.universalreader.contenthandler.ContentHandler;
import com.github.boggard.universalreader.util.ReaderUtil;
import com.github.boggard.universalreader.contenthandler.RecordHandler;
import lombok.experimental.UtilityClass;
import org.apache.poi.ss.usermodel.*;

import java.io.File;
import java.io.IOException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

@UtilityClass
public class XLSReader {

private static final DataFormatter DATA_FORMATTER = new DataFormatter();

public static <R> R readRecords(File sourceFile, ContentHandler<R> contentsHandler,
ReaderConfiguration configuration)
public static <R, T> R readRecords(File sourceFile, ContentHandler<R, T> contentsHandler,
ReaderConfiguration configuration)
throws IOException {
RecordHandler<T> recordHandler = contentsHandler.recordHandler();
try (Workbook workbook = WorkbookFactory.create(sourceFile)) {
workbook.sheetIterator().forEachRemaining(sheet -> {
for (int i = configuration.getStartLineIndex(); i < sheet.getLastRowNum() + 1; i++) {
Row row = sheet.getRow(i);
Stream<T> resultStream = StreamSupport.stream(workbook.spliterator(), configuration.isParallelProcessing())
.flatMap(sheet -> createRowStream(configuration, sheet))
.map(row -> handleRow(recordHandler, row));

if (row != null) {
contentsHandler.startRecord();

row.cellIterator().forEachRemaining(cell -> {
String value;
contentsHandler.endFile();

if (cell.getCellType() == CellType.NUMERIC) {
CellStyle cellStyle = cell.getCellStyle();
value = DATA_FORMATTER.formatRawCellContents(cell.getNumericCellValue(),
cellStyle.getDataFormat(), cellStyle.getDataFormatString());
} else {
value = cell.getStringCellValue();
}
return contentsHandler.getResult(resultStream);
}
}

contentsHandler.startField(cell.getColumnIndex(), value);
});
private static Stream<Row> createRowStream(ReaderConfiguration configuration, Sheet sheet) {
return StreamSupport.stream(sheet.spliterator(), configuration.isParallelProcessing())
.skip(configuration.getStartLineIndex())
.filter(row -> row != null && row.getRowNum() < sheet.getLastRowNum() + 1);
}

ReaderUtil.readRecord(contentsHandler);
private static <T> T handleRow(RecordHandler<T> recordHandler, Row row) {
T rec = recordHandler.startRecord();
row.cellIterator().forEachRemaining(cell -> {
String value;
if (cell.getCellType() == CellType.NUMERIC) {
CellStyle cellStyle = cell.getCellStyle();
value = DATA_FORMATTER.formatRawCellContents(cell.getNumericCellValue(),
cellStyle.getDataFormat(), cellStyle.getDataFormatString());
} else {
value = cell.getStringCellValue();
}
}
});

contentsHandler.endFile();

return contentsHandler.getResult();
}
recordHandler.startField(rec, cell.getColumnIndex(), value);
}
);
return recordHandler.endRecord(rec);
}
}
23 changes: 14 additions & 9 deletions src/main/java/com/github/boggard/universalreader/XLSXReader.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.github.boggard.universalreader;

import com.github.boggard.universalreader.contenthandler.ContentHandler;
import com.github.boggard.universalreader.util.ReaderUtil;
import com.github.boggard.universalreader.contenthandler.RecordHandler;
import lombok.RequiredArgsConstructor;
import lombok.experimental.UtilityClass;
import org.apache.poi.ooxml.util.SAXHelper;
Expand All @@ -21,20 +21,23 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@UtilityClass
public class XLSXReader {

public static <R> R readRecords(File sourceFile, ContentHandler<R> contentsHandler,
ReaderConfiguration configuration)
public static <R, T> R readRecords(File sourceFile, ContentHandler<R, T> contentsHandler,
ReaderConfiguration configuration)
throws IOException, OpenXML4JException, ParserConfigurationException, SAXException {
try (OPCPackage pkg = OPCPackage.open(sourceFile)) {
XSSFReader reader = new XSSFReader(pkg);

SheetContentsHandlerImpl<R> sheetContentsHandler = new SheetContentsHandlerImpl<>(contentsHandler, configuration);
SheetContentsHandlerImpl<T> sheetContentsHandler = new SheetContentsHandlerImpl<>(
contentsHandler.recordHandler(), configuration);
XMLReader parser = fetchSheetParser(reader.getStylesTable(), reader.getSharedStringsTable(),
sheetContentsHandler);

Expand All @@ -43,7 +46,7 @@ public static <R> R readRecords(File sourceFile, ContentHandler<R> contentsHandl

contentsHandler.endFile();

return contentsHandler.getResult();
return contentsHandler.getResult(sheetContentsHandler.resultList.stream());
}
}

Expand All @@ -69,32 +72,34 @@ private void processSheet(XMLReader parser, InputStream sheet) {
private static class SheetContentsHandlerImpl<R> implements SheetContentsHandler {

private static final Pattern CELL_REFERENCE_PATTERN = Pattern.compile("[A-Z]+");
private final ContentHandler<R> contentsHandler;
private final RecordHandler<R> recordHandler;
private final ReaderConfiguration configuration;

private int currentRowNum = -1;
private R curRowResult;
private final List<R> resultList = new ArrayList<>();

@Override
public void startRow(int rowNum) {
currentRowNum = rowNum;

if (isContentShouldBeHandled()) {
contentsHandler.startRecord();
curRowResult = recordHandler.startRecord();
}
}

@Override
public void endRow(int rowNum) {
if (isContentShouldBeHandled()) {
ReaderUtil.readRecord(contentsHandler);
resultList.add(recordHandler.endRecord(curRowResult));
}
}

@Override
public void cell(String cellReference, String formattedValue, XSSFComment comment) {
if (isContentShouldBeHandled()) {
int index = cellReferenceToIndex(cellReference);
contentsHandler.startField(index, formattedValue);
recordHandler.startField(curRowResult, index, formattedValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package com.github.boggard.universalreader.contenthandler;

public interface ContentHandler<R> {
import java.util.stream.Stream;

void startRecord();
public interface ContentHandler<R, T> {

void startField(int index, String field);

void endRecord();
RecordHandler<T> recordHandler();

default void endFile() {
//by default do nothing
}

R getResult();
R getResult(Stream<T> resultStream);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.github.boggard.universalreader.contenthandler;

public interface RecordHandler<R> {

default R handleAndReturnResult(String record, String fieldsSeparator) {
R rec = startRecord();

String[] fields = record.split(fieldsSeparator);
for (int i = 0; i < fields.length; i++) {
startField(rec, i, fields[i]);
}

return endRecord(rec);
}

R startRecord();

void startField(R record, int index, String field);

R endRecord(R record);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.boggard.universalreader.util;

import com.github.boggard.universalreader.contenthandler.ContentHandler;
import lombok.experimental.UtilityClass;
import org.mozilla.universalchardet.UnicodeBOMInputStream;

Expand All @@ -11,21 +10,6 @@
@UtilityClass
public class ReaderUtil {

public static <T, E> void readRecord(String record, ContentHandler<E> contentsHandler, String recordsSeparator) {
contentsHandler.startRecord();

String[] fields = record.split(recordsSeparator);
for (int i = 0; i < fields.length; i++) {
contentsHandler.startField(i, fields[i]);
}

readRecord(contentsHandler);
}

public static <T, E> void readRecord(ContentHandler<E> contentsHandler) {
contentsHandler.endRecord();
}

public static BufferedReader inputStreamToBufferedReader(InputStream inputStream)
throws IOException {
Charset defaultCharset = StandardCharsets.UTF_8;
Expand Down
Loading

0 comments on commit 1ff5102

Please sign in to comment.