Skip to content

Commit

Permalink
Refactor instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
labianchin committed Aug 5, 2019
1 parent 10d9342 commit cc47f5c
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Takes ByteBuffer datums from a BlockingQueue and writes to a DataFileWriter.
*/
public class AvroWriter implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(AvroWriter.class);
Expand All @@ -46,18 +49,13 @@ public AvroWriter(
public void run() {
LOGGER.debug("AvroWriter started");
try {
int c = 0;
while (true) {
final ByteBuffer datum = queue.take();
if (datum.capacity() == 0) {
this.dataFileWriter.sync();
return;
} else {
this.dataFileWriter.appendEncoded(datum);
c++;
}
if ((c % 100000) == 0) {
LOGGER.info("Size={} remainingCapacity={}", queue.size(), queue.remainingCapacity());
}
}
} catch (InterruptedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public void write(String query) throws Exception {
final long startMs = metering.startWriteMeter();
convertAllResultSet(resultSet, JdbcAvroRecordConverter.create(resultSet));
queue.put(ByteBuffer.allocate(0)); // write final record, so that consumer stops
final long startMs2 = metering.startWriteMeter();
final long startMs2 = System.currentTimeMillis();
future.get();
executorService.shutdown();
LOGGER.info(String.format("jdbcavroio : Waited %5.2f seconds for finishing write operation",
Expand Down

0 comments on commit cc47f5c

Please sign in to comment.