-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'origin/feature/STARCH-938/fixed_limit_o…
…utput_buffer' into feature/STARCH-963/cookie_forwarded_http
- Loading branch information
Showing
2 changed files
with
312 additions
and
0 deletions.
There are no files selected for viewing
177 changes: 177 additions & 0 deletions
177
base/src/main/java/edu/iu/IuFixedLimitOutputBuffer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
package edu.iu; | ||
|
||
import java.io.IOException; | ||
import java.io.OutputStream; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* Maintains a fixed buffer of streamed data, for normalizing the rate of output | ||
* and limiting output to a single source. | ||
*/ | ||
public class IuFixedLimitOutputBuffer { | ||
|
||
private final byte[] buffer; | ||
private final long maxSize; | ||
private long count; | ||
|
||
private int mark; | ||
private int pos; | ||
private byte[] overflow; | ||
private int overflowOffset; | ||
|
||
/** | ||
* Constructor. | ||
* | ||
* @param maxSize maximum number of bytes to allow writing to the target | ||
*/ | ||
public IuFixedLimitOutputBuffer(long maxSize) { | ||
this(0, maxSize); | ||
} | ||
|
||
/** | ||
* Constructor. | ||
* | ||
* @param count number of bytes previously written to the target | ||
* @param maxSize maximum number of bytes to allow writing to the target | ||
*/ | ||
public IuFixedLimitOutputBuffer(long count, long maxSize) { | ||
this(16384, count, maxSize); | ||
} | ||
|
||
/** | ||
* Constructor. | ||
* | ||
* @param bufferSize fixed length buffer size | ||
* @param count number of bytes previously written to the target | ||
* @param maxSize maximum number of bytes to allow writing to the target | ||
*/ | ||
public IuFixedLimitOutputBuffer(int bufferSize, long count, long maxSize) { | ||
this.buffer = new byte[bufferSize]; | ||
this.count = count; | ||
this.maxSize = maxSize; | ||
} | ||
|
||
/** | ||
* Resets the count of bytes written to the target. | ||
*/ | ||
public void resetCount() { | ||
count = 0; | ||
} | ||
|
||
/** | ||
* Gets the number of bytes remaining in the target output quota. | ||
* | ||
* @return number of bytes remaining in the target output quota. | ||
*/ | ||
public int remaining() { | ||
final var remaining = maxSize - count; | ||
if (remaining > Integer.MAX_VALUE) | ||
return Integer.MAX_VALUE; | ||
else | ||
return (int) remaining; | ||
} | ||
|
||
/** | ||
* Writes to an {@link OutputStream}, as long as it source data is available, up | ||
* to the {@link #IuFixedLimitOutputBuffer(long) max size}. | ||
* | ||
* <p> | ||
* This method returns when: | ||
* </p> | ||
* <ul> | ||
* <li>{@link Supplier#get() dataSupplier.get()} returns null to indicate no | ||
* more source data is available.</li> | ||
* <li>The {@link #IuFixedLimitOutputBuffer(long) max size} has been reached. | ||
* {@link #remaining()} will return 0 and no more data will be written from this | ||
* buffer without first invoking {@link #resetCount()}.</li> | ||
* </ul> | ||
* | ||
* @param dataSupplier {@link Supplier} of source data | ||
* @param out {@link OutputStream} | ||
* @throws IOException If an error occurs writing to the stream. | ||
*/ | ||
public void write(Supplier<byte[]> dataSupplier, OutputStream out) throws IOException { | ||
int remaining; | ||
while ((pos > mark || // | ||
fill(dataSupplier)) && // | ||
(remaining = remaining()) > 0) { | ||
final var len = pos - mark; | ||
if (remaining < len) { | ||
out.write(buffer, mark, remaining); | ||
count += remaining; | ||
mark += remaining; | ||
} else { | ||
out.write(buffer, mark, len); | ||
count += len; | ||
mark = pos = 0; | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Fills the buffer with source data. | ||
* | ||
* @param dataSupplier {@link Supplier} of source data | ||
* @return true if data was added to the buffer; else false | ||
*/ | ||
public boolean fill(Supplier<byte[]> dataSupplier) { | ||
var available = available(); | ||
if (available <= 0) | ||
return false; | ||
|
||
final var opos = pos; | ||
|
||
if (overflow != null) { | ||
final var overflowRemaining = overflow.length - overflowOffset; | ||
|
||
if (overflowRemaining > available) { | ||
// fill available space from overflow | ||
System.arraycopy(overflow, overflowOffset, buffer, pos, available); | ||
pos += available; | ||
|
||
// increment overflow offset for next iteration | ||
overflowOffset += available; | ||
return true; | ||
} | ||
|
||
// else copy all overflow bytes into the buffer | ||
System.arraycopy(overflow, overflowOffset, buffer, pos, overflowRemaining); | ||
pos += overflowRemaining; | ||
overflow = null; | ||
overflowOffset = 0; | ||
} | ||
|
||
byte[] data; | ||
while ((data = dataSupplier.get()) != null) { | ||
available = available(); | ||
|
||
final var messageLength = data.length; | ||
if (messageLength > available) { | ||
// fill available space from data | ||
System.arraycopy(data, 0, buffer, pos, available); | ||
pos += available; | ||
|
||
// copy data reference as overflow buffer and set initial offset | ||
overflow = data; | ||
overflowOffset = available; | ||
return true; | ||
} | ||
|
||
// copy data into buffer and increment position | ||
System.arraycopy(data, 0, buffer, pos, messageLength); | ||
pos += messageLength; | ||
} | ||
|
||
return pos > opos; | ||
} | ||
|
||
/** | ||
* Gets the number of bytes available in the buffer. | ||
* | ||
* @return number of bytes available in the buffer. | ||
*/ | ||
int available() { | ||
return buffer.length - pos; | ||
} | ||
|
||
} |
135 changes: 135 additions & 0 deletions
135
base/src/test/java/edu/iu/IuFixedLimitOutputBufferTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
package edu.iu; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertArrayEquals; | ||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; | ||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.util.Arrays; | ||
import java.util.Queue; | ||
import java.util.Random; | ||
import java.util.concurrent.ConcurrentLinkedDeque; | ||
import java.util.function.Supplier; | ||
|
||
import org.junit.jupiter.api.Test; | ||
|
||
@SuppressWarnings("javadoc") | ||
public class IuFixedLimitOutputBufferTest { | ||
|
||
@SuppressWarnings("unchecked") | ||
@Test | ||
public void testNothingToDo() { | ||
final var buf = new IuFixedLimitOutputBuffer(32); | ||
final var out = new ByteArrayOutputStream(); | ||
final Supplier<byte[]> dataSupplier = mock(Supplier.class); | ||
when(dataSupplier.get()).thenReturn((byte[]) null); | ||
assertDoesNotThrow(() -> buf.write(dataSupplier, out)); | ||
assertEquals(32, buf.remaining()); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
@Test | ||
public void testExactFit() { | ||
final var data = IuText.utf8(IdGenerator.generateId()); | ||
final var buf = new IuFixedLimitOutputBuffer(data.length); | ||
final var out = new ByteArrayOutputStream(); | ||
final Supplier<byte[]> dataSupplier = mock(Supplier.class); | ||
when(dataSupplier.get()).thenReturn(data, (byte[]) null); | ||
assertDoesNotThrow(() -> buf.write(dataSupplier, out)); | ||
assertArrayEquals(data, out.toByteArray()); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
@Test | ||
public void testOverflow() { | ||
final var data = IuText.utf8(IdGenerator.generateId()); | ||
final var buf = new IuFixedLimitOutputBuffer(8, data.length); | ||
final var out = new ByteArrayOutputStream(); | ||
final Supplier<byte[]> dataSupplier = mock(Supplier.class); | ||
when(dataSupplier.get()).thenReturn(data, (byte[]) null); | ||
assertDoesNotThrow(() -> buf.write(dataSupplier, out)); | ||
assertArrayEquals(Arrays.copyOf(data, 24), out.toByteArray()); | ||
assertEquals(0, buf.remaining()); | ||
|
||
out.reset(); | ||
buf.resetCount(); | ||
assertDoesNotThrow(() -> buf.write(dataSupplier, out)); | ||
assertArrayEquals(Arrays.copyOfRange(data, 24, 32), out.toByteArray()); | ||
assertEquals(24, buf.remaining()); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
@Test | ||
public void testUndersized() { | ||
final var data = IuText.utf8(IdGenerator.generateId()); | ||
final var buf = new IuFixedLimitOutputBuffer(8, 0, data.length); | ||
final var out = new ByteArrayOutputStream(); | ||
final Supplier<byte[]> dataSupplier = mock(Supplier.class); | ||
when(dataSupplier.get()).thenReturn(data, (byte[]) null); | ||
assertDoesNotThrow(() -> buf.write(dataSupplier, out)); | ||
assertArrayEquals(data, out.toByteArray()); | ||
} | ||
|
||
@Test | ||
public void testFillAvailable() { | ||
final var data = IuText.utf8(IdGenerator.generateId()); | ||
final var buf = new IuFixedLimitOutputBuffer(8, 0, data.length); | ||
buf.fill(() -> data); | ||
assertEquals(0, buf.available()); | ||
assertDoesNotThrow(() -> buf.fill(() -> null)); | ||
} | ||
|
||
@Test | ||
public void testBigRemaining() { | ||
final var buf = new IuFixedLimitOutputBuffer(((long) Integer.MAX_VALUE) + 1); | ||
assertEquals(Integer.MAX_VALUE, buf.remaining()); | ||
} | ||
|
||
@Test | ||
public void testBulk() { | ||
final var control = new ByteArrayOutputStream(); | ||
final Queue<byte[]> queue = new ConcurrentLinkedDeque<>(); | ||
final var feed = new Thread() { | ||
volatile boolean active = true; | ||
|
||
@Override | ||
public void run() { | ||
final var rand = new Random(); | ||
while (active) | ||
synchronized (this) { | ||
IuException.unchecked(() -> { | ||
final var length = rand.nextInt(16382) + 2; | ||
final var data = new byte[length]; | ||
rand.nextBytes(data); | ||
control.write(data); | ||
queue.offer(data); | ||
this.wait(100L); | ||
}); | ||
} | ||
} | ||
}; | ||
feed.start(); | ||
|
||
final var out = new ByteArrayOutputStream(); | ||
final var buf = new IuFixedLimitOutputBuffer(4096); | ||
for (var i = 0; i < 20; i++) { | ||
while (buf.remaining() > 0) { | ||
assertDoesNotThrow(() -> buf.write(queue::poll, out)); | ||
IuException.unchecked(() -> Thread.sleep(50L)); | ||
} | ||
assertEquals(4096, out.size()); | ||
assertArrayEquals(Arrays.copyOfRange(control.toByteArray(), i * 4096, (i + 1) * 4096), out.toByteArray()); | ||
|
||
out.reset(); | ||
buf.resetCount(); | ||
} | ||
|
||
synchronized (feed) { | ||
feed.active = false; | ||
feed.notifyAll(); | ||
} | ||
} | ||
|
||
} |