Skip to content

Commit 4e4ea3c

Browse files
committed
Merge branch 'main' into gyh
2 parents 63ea4db + 5537435 commit 4e4ea3c

File tree

12 files changed

+105
-24
lines changed

12 files changed

+105
-24
lines changed

core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ final boolean tryInitialize() {
187187
}
188188

189189
this.session = session;
190-
originalRes.setId(id);
190+
originalRes.setStreamId(streamId());
191191
responseWrapper = responseDecoder.addResponse(this, id, originalRes, ctx, ch.eventLoop());
192192

193193
if (timeoutMillis > 0) {

core/src/main/java/com/linecorp/armeria/client/HttpClientPipelineConfigurator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
527527

528528
final int id = 0;
529529
res.init(responseDecoder.inboundTrafficController());
530-
res.setId(id);
530+
res.setStreamId(1);
531531
res.subscribe(new Subscriber<HttpObject>() {
532532

533533
private boolean notified;

core/src/main/java/com/linecorp/armeria/internal/client/DecodedHttpResponse.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public final class DecodedHttpResponse extends DefaultHttpResponse {
3333
@Nullable
3434
private InboundTrafficController inboundTrafficController;
3535
private long writtenBytes;
36-
private int id = -1;
36+
private int streamId = -1;
3737

3838
public DecodedHttpResponse(EventLoop eventLoop) {
3939
this.eventLoop = eventLoop;
@@ -43,8 +43,8 @@ public void init(InboundTrafficController inboundTrafficController) {
4343
this.inboundTrafficController = inboundTrafficController;
4444
}
4545

46-
public void setId(int id) {
47-
this.id = id;
46+
public void setStreamId(int streamId) {
47+
this.streamId = streamId;
4848
}
4949

5050
public long writtenBytes() {
@@ -68,7 +68,11 @@ public boolean tryWrite(HttpObject obj) {
6868
if (published && obj instanceof HttpData) {
6969
final int length = ((HttpData) obj).length();
7070
assert inboundTrafficController != null;
71-
inboundTrafficController.inc(length);
71+
if (streamId == 1) {
72+
// The stream ID 1 is used for an HTTP/1 upgrade request.
73+
} else {
74+
inboundTrafficController.inc(length);
75+
}
7276
writtenBytes += length;
7377
}
7478
return published;
@@ -79,8 +83,12 @@ protected void onRemoval(HttpObject obj) {
7983
if (obj instanceof HttpData) {
8084
final int length = ((HttpData) obj).length();
8185
assert inboundTrafficController != null;
82-
assert id > -1 : "id must be set before consuming HttpData";
83-
inboundTrafficController.dec(id, length);
86+
assert streamId > -1 : "id must be set before consuming HttpData";
87+
if (streamId == 1) {
88+
// The stream ID 1 is used for an HTTP/1 upgrade request.
89+
} else {
90+
inboundTrafficController.dec(streamId, length);
91+
}
8492
}
8593
}
8694
}

core/src/main/java/com/linecorp/armeria/internal/common/InboundTrafficController.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,13 @@ public void inc(int numProducedBytes) {
100100
}
101101
}
102102

103-
public void dec(int id, int numConsumedBytes) {
103+
public void dec(int streamId, int numConsumedBytes) {
104104
if (decoder != null) {
105105
assert channel != null;
106106
if (channel.eventLoop().inEventLoop()) {
107-
consumeHttp2Bytes(id, numConsumedBytes);
107+
consumeHttp2Bytes(streamId, numConsumedBytes);
108108
} else {
109-
channel.eventLoop().execute(() -> consumeHttp2Bytes(id, numConsumedBytes));
109+
channel.eventLoop().execute(() -> consumeHttp2Bytes(streamId, numConsumedBytes));
110110
}
111111
}
112112
final int oldValue = getAndAdd(-numConsumedBytes);
@@ -119,8 +119,7 @@ public void dec(int id, int numConsumedBytes) {
119119
}
120120
}
121121

122-
private void consumeHttp2Bytes(int id, int numConsumedBytes) {
123-
final int streamId = streamId(id);
122+
private void consumeHttp2Bytes(int streamId, int numConsumedBytes) {
124123
assert decoder != null && channel != null;
125124
final Http2Stream stream = decoder.connection().stream(streamId);
126125
if (stream != null) {
@@ -147,10 +146,6 @@ public boolean isSuspended() {
147146
return suspended;
148147
}
149148

150-
private static int streamId(int id) {
151-
return (id << 1) + 1;
152-
}
153-
154149
@Override
155150
public String toString() {
156151
return MoreObjects.toStringHelper(this)

core/src/main/java/com/linecorp/armeria/server/Http2RequestDecoder.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ final class Http2RequestDecoder extends Http2EventAdapter {
7676
private final KeepAliveHandler keepAliveHandler;
7777
private final Http2GoAwayHandler goAwayHandler;
7878
private final IntObjectMap<@Nullable DecodedHttpRequest> requests = new IntObjectHashMap<>();
79-
private int nextId;
8079

8180
Http2RequestDecoder(ServerConfig cfg, Channel channel, AsciiString scheme,
8281
KeepAliveHandler keepAliveHandler, Http2ConnectionDecoder decoder) {
@@ -208,7 +207,8 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
208207
}
209208
}
210209

211-
final int id = ++nextId;
210+
// Derive the request ID from the stream ID.
211+
final int id = streamIdToId(streamId);
212212
final EventLoop eventLoop = ctx.channel().eventLoop();
213213
req = DecodedHttpRequest.of(endOfStream, eventLoop, id, streamId, headers, true,
214214
inboundTrafficController, routingCtx);
@@ -248,6 +248,10 @@ public void onHeadersRead(
248248
onHeadersRead(ctx, streamId, headers, padding, endOfStream);
249249
}
250250

251+
private static int streamIdToId(int streamId) {
252+
return (streamId - 1) >>> 1;
253+
}
254+
251255
private boolean handle100Continue(int streamId, Http2Headers headers, HttpMethod method) {
252256
final CharSequence expectValue = headers.get(HttpHeaderNames.EXPECT);
253257
if (expectValue == null) {

core/src/main/java/com/linecorp/armeria/server/StreamingDecodedHttpRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public boolean tryWrite(HttpObject obj) {
172172
protected void onRemoval(HttpObject obj) {
173173
if (obj instanceof HttpData) {
174174
final int length = ((HttpData) obj).length();
175-
inboundTrafficController.dec(id, length);
175+
inboundTrafficController.dec(streamId, length);
176176
}
177177
}
178178

core/src/main/resources/com/linecorp/armeria/public_suffixes.txt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,7 @@ ap.leg.br
609609
ap.ngrok.io
610610
aparecida.br
611611
apartments
612+
api.br
612613
api.gov.uk
613614
api.lp.dev
614615
api.stdlib.com
@@ -3706,6 +3707,7 @@ i.ng
37063707
i.ph
37073708
i.se
37083709
i234.me
3710+
ia.br
37093711
ia.us
37103712
iamallama.com
37113713
ibara.okayama.jp
@@ -4803,7 +4805,6 @@ lib.ca.us
48034805
lib.co.us
48044806
lib.ct.us
48054807
lib.dc.us
4806-
lib.de.us
48074808
lib.ee
48084809
lib.fl.us
48094810
lib.ga.us
@@ -6996,6 +6997,7 @@ readymade.jp
69966997
realestate
69976998
realestate.pl
69986999
realm.cz
7000+
realtime.supabase.co
69997001
realtor
70007002
realty
70017003
rebun.hokkaido.jp
@@ -7977,6 +7979,7 @@ soc.srcf.net
79777979
soccer
79787980
sochi.su
79797981
social
7982+
social.br
79807983
soctrang.vn
79817984
sodegaura.chiba.jp
79827985
soeda.fukuoka.jp
@@ -8093,6 +8096,7 @@ stockholm
80938096
stokke.no
80948097
stor-elvdal.no
80958098
storage
8099+
storage.supabase.co
80968100
storage.yandexcloud.net
80978101
stord.no
80988102
stordal.no
@@ -9098,6 +9102,7 @@ wakayama.wakayama.jp
90989102
wake.okayama.jp
90999103
wakkanai.hokkaido.jp
91009104
wakuya.miyagi.jp
9105+
wal.app
91019106
walbrzych.pl
91029107
wales
91039108
walmart
@@ -9762,6 +9767,7 @@ xx.kg
97629767
xxx
97639768
xxx.ec
97649769
xyz
9770+
xyz.br
97659771
xz.cn
97669772
y.bg
97679773
y.se

core/src/test/java/com/linecorp/armeria/client/HttpResponseWrapperTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private static HttpResponseWrapper httpResponseWrapper(DecodedHttpResponse res)
160160
final TestHttpResponseDecoder decoder = new TestHttpResponseDecoder(channel, controller);
161161

162162
final int id = 1;
163-
res.setId(id);
163+
res.setStreamId(id);
164164
res.init(controller);
165165
return decoder.addResponse(null, id, res, cctx, cctx.eventLoop());
166166
}

core/src/test/java/com/linecorp/armeria/server/HttpServerFlowControlTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.CountDownLatch;
2525
import java.util.concurrent.atomic.AtomicInteger;
2626

27+
import org.assertj.core.data.Percentage;
2728
import org.hamcrest.Matchers;
2829
import org.junit.jupiter.api.BeforeEach;
2930
import org.junit.jupiter.api.Test;
@@ -36,6 +37,7 @@
3637
import com.linecorp.armeria.common.AggregatedHttpResponse;
3738
import com.linecorp.armeria.common.ExchangeType;
3839
import com.linecorp.armeria.common.HttpData;
40+
import com.linecorp.armeria.common.HttpHeaderNames;
3941
import com.linecorp.armeria.common.HttpRequest;
4042
import com.linecorp.armeria.common.HttpResponse;
4143
import com.linecorp.armeria.common.HttpStatus;
@@ -66,9 +68,15 @@ public class HttpServerFlowControlTest {
6668

6769
@RegisterExtension
6870
static final ServerExtension server = new ServerExtension() {
71+
@Override
72+
protected boolean runForEachTest() {
73+
return true;
74+
}
75+
6976
@Override
7077
protected void configure(ServerBuilder sb) {
7178
sb.decorator(LoggingService.newDecorator());
79+
sb.http2StreamWindowUpdateRatio(0.9f);
7280
sb.http2InitialConnectionWindowSize(CONNECTION_WINDOW);
7381
sb.http2InitialStreamWindowSize(STREAM_WINDOW);
7482
sb.service(PATH, (ctx, req) -> {
@@ -99,6 +107,16 @@ protected void configure(ServerBuilder sb) {
99107
return HttpResponse.of(future);
100108
});
101109

110+
sb.service("/stream", (ctx, req) -> {
111+
final CompletableFuture<HttpResponse> future = CompletableFuture.supplyAsync(() -> {
112+
final AggregatedHttpRequest aggReq = req.aggregate().join();
113+
return HttpResponse.of(HttpStatus.OK, MediaType.PLAIN_TEXT,
114+
"Received: " + aggReq.content().length());
115+
}, ctx.blockingTaskExecutor());
116+
117+
return HttpResponse.of(future);
118+
});
119+
102120
sb.service("/aggregate", new HttpService() {
103121

104122
@Override
@@ -162,6 +180,39 @@ void flowControl() throws Exception {
162180
assertThat(flowController.windowSize(connection.connectionStream())).isGreaterThan(0);
163181
}
164182

183+
@Test
184+
void flowControl_with_rejected_streams() throws Exception {
185+
final WebClient client = WebClient.of(server.uri(SessionProtocol.H2C));
186+
for (int i = 0; i < 10; i++) {
187+
final CompletableFuture<AggregatedHttpResponse> res1 =
188+
client.post("/stream", HttpData.wrap(new byte[DATA_SIZE])).aggregate();
189+
190+
final CompletableFuture<AggregatedHttpResponse> res2 =
191+
client.prepare()
192+
.post("/stream")
193+
.content(MediaType.OCTET_STREAM, HttpData.wrap(new byte[DATA_SIZE]))
194+
// Make the request fail before increasing the request ID.
195+
.header(HttpHeaderNames.EXPECT, "invalid")
196+
.execute()
197+
.aggregate();
198+
199+
final AggregatedHttpResponse aggRes1 = res1.join();
200+
assertThat(aggRes1.status()).isEqualTo(HttpStatus.OK);
201+
assertThat(aggRes1.contentUtf8()).isEqualTo("Received: " + DATA_SIZE);
202+
203+
final AggregatedHttpResponse aggRes2 = res2.join();
204+
assertThat(aggRes2.status()).isEqualTo(HttpStatus.EXPECTATION_FAILED);
205+
}
206+
207+
final ServiceRequestContext sctx = server.requestContextCaptor().take();
208+
final StreamingDecodedHttpRequest decodedServerReq = (StreamingDecodedHttpRequest) sctx.request();
209+
final Http2ConnectionDecoder decoder = decodedServerReq.inboundTrafficController().decoder();
210+
final Http2LocalFlowController flowController = decoder.flowController();
211+
final Http2Connection connection = decoder.connection();
212+
assertThat(flowController.windowSize(connection.connectionStream()))
213+
.isCloseTo(CONNECTION_WINDOW, Percentage.withPercentage(10));
214+
}
215+
165216
@Test
166217
void ignoreFlowControlForNonStreamRequest() throws Exception {
167218
final WebClient client = WebClient.of(server.uri(SessionProtocol.H2C));

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
group=com.linecorp.armeria
2-
version=1.33.2-SNAPSHOT
2+
version=1.33.3-SNAPSHOT
33
projectName=Armeria
44
projectUrl=https://armeria.dev/
55
projectDescription=Asynchronous HTTP/2 RPC/REST client/server library built on top of Java 8, Netty, Thrift and gRPC

0 commit comments

Comments
 (0)