-
Notifications
You must be signed in to change notification settings - Fork 10
Extending unit test coverage and fixing race condition #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
832c108
to
651bf7f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initial feedback, not reviewed test code yet
input-stream/src/main/java/com/amazon/connector/s3/S3SeekableInputStream.java
Show resolved
Hide resolved
input-stream/src/main/java/com/amazon/connector/s3/io/logical/impl/ParquetLogicalIOImpl.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/amazon/connector/s3/io/physical/blockmanager/AutoClosingCircularBuffer.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/amazon/connector/s3/io/physical/blockmanager/BlockManagerConfiguration.java
Outdated
Show resolved
Hide resolved
input-stream/src/main/java/com/amazon/connector/s3/io/physical/blockmanager/IOBlock.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/amazon/connector/s3/io/physical/blockmanager/MultiObjectsBlockManager.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/amazon/connector/s3/io/physical/blockmanager/MultiObjectsBlockManager.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/amazon/connector/s3/io/physical/blockmanager/MultiObjectsBlockManager.java
Outdated
Show resolved
Hide resolved
651bf7f
to
1da9112
Compare
...rc/main/java/com/amazon/connector/s3/io/physical/blockmanager/AutoClosingCircularBuffer.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/amazon/connector/s3/io/physical/blockmanager/AutoClosingCircularBuffer.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/amazon/connector/s3/io/physical/blockmanager/AutoClosingCircularBuffer.java
Outdated
Show resolved
Hide resolved
65088bd
to
cee95e9
Compare
…toClosingCircularBuffer and moving reading of IOBlock inside it.
cee95e9
to
1e265ef
Compare
1e265ef
to
5988c66
Compare
...rc/main/java/com/amazon/connector/s3/io/physical/blockmanager/AutoClosingCircularBuffer.java
Show resolved
Hide resolved
...est/java/com/amazon/connector/s3/io/physical/blockmanager/AutoClosingCircularBufferTest.java
Show resolved
Hide resolved
...rc/main/java/com/amazon/connector/s3/io/physical/blockmanager/AutoClosingCircularBuffer.java
Outdated
Show resolved
Hide resolved
PhysicalIO physicalIO = new PhysicalIOImpl(new BlockManager(blockManager, s3URI)); | ||
LogicalIO logicalIO = | ||
new ParquetLogicalIOImpl(physicalIO, LogicalIOConfiguration.DEFAULT); | ||
SeekableInputStream stream = new S3SeekableInputStream(logicalIO); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also have multiple threads trying to prefetch at the same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate what do you mean? My expectation was that each SeekableInputStream
with default configuration will issue request to prefetch footer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the minute you initiate S3SeekableInputStream
, it'll kick off the tail prefetch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 My bad, the default configuration is to prefetch the tail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Left some minor comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!!! the tests are great!
I just left some nits. but all good otherwise
for (int i = 0; i < len; i++) { | ||
buff[i] = blockContent[positionToOffset(pos) + i]; | ||
} | ||
public int read(byte[] buff, int offset, long len, long pos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
@@ -255,6 +246,8 @@ private IOBlock createBlock(long start, long end, S3URI s3URI, boolean isPrefetc | |||
.key(s3URI.getKey()) | |||
.range(new Range(OptionalLong.of(start), OptionalLong.of(end))) | |||
.build()); | |||
|
|||
LOG.info("Creating IOBlock {}:{} for {}", start, end, s3URI); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: instead of S3URI, should we just use s3URI.getKey() in our logs? I think that might make it easier to parse
PhysicalIO physicalIO = new PhysicalIOImpl(new BlockManager(blockManager, s3URI)); | ||
LogicalIO logicalIO = | ||
new ParquetLogicalIOImpl(physicalIO, LogicalIOConfiguration.DEFAULT); | ||
SeekableInputStream stream = new S3SeekableInputStream(logicalIO); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the minute you initiate S3SeekableInputStream
, it'll kick off the tail prefetch
e.printStackTrace(); | ||
} | ||
} | ||
boolean result = haveException.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: rename variable to "completedWithException"
and then do `assertFalse(completedWithException, )
new FakeObjectClient(sb.toString()), BlockManagerConfiguration.DEFAULT); | ||
AtomicBoolean haveException = new AtomicBoolean(false); | ||
|
||
// Create 20 threads to start multiple SeekableInputStream to read last 4 bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: update comment, we are also reading first 4 bytes.
also to confirm, when our IoBlock was not immutable, this reproduced the issue right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It was throwing one type of exception before changes in IOBlock - BufferUnderflowException. After changes in IOBlock, it start to throw ConcurrentModificationException.
@@ -307,4 +332,50 @@ void testMinusOneIsHandledProperly() throws IOException { | |||
assertEquals(-1, stream.read(b, 0, LEN)); | |||
assertEquals(INITIAL_POS, stream.getPos()); | |||
} | |||
|
|||
@Test | |||
void testMultiThreadUsage() throws IOException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really like this test, thanks!
|
||
@Test | ||
void testMetadaWithNegativeContentLength() { | ||
// FakeObjectClient fakeObjectClient = new FakeObjectClient(""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove commented code
|
||
int one = '1'; | ||
int zero = '0'; | ||
assertEquals(one, ioBlock.getByte(0 + offset)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry this test is a bit hard to follow for me, could you please explain what we are testing here exactly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is checking:
- we are correctly handling blocks that has start offset.
- we are returning exactly those bytes that were requested without messing with indexes
- if we receive request to read bytes outside of current block we throw excetpion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, merging
Description of changes:
Extend code coverage to return code coverage back to 95%. Especially by adding multi thread test for running
SeekaleInputStream
in concurrent environment.Update access pattern for data owned by
AutoClosingCircularBuffer
. Move logic for looking up for appropriateIOBlock
fromMultiObjectsBlockManager
insideAutoClosingCircularBuffer
and wrap it with sync block. That is need to prevent concurrent exception that happens whenThread A
is iterating through content ofAutoClosingCircularBuffer
andTread B
is modifying it at the same time. That cause iterator used byThread A
to become invalid.By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.