forked from sanic-org/sanic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_keep_alive_timeout.py
211 lines (174 loc) · 6.44 KB
/
test_keep_alive_timeout.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import asyncio
import platform
from asyncio import sleep as aio_sleep
from itertools import count
from os import environ
import pytest
from sanic_testing.reusable import ReusableClient
from sanic import Sanic
from sanic.compat import OS_IS_WINDOWS
from sanic.response import text
CONFIG_FOR_TESTS = {"KEEP_ALIVE_TIMEOUT": 2, "KEEP_ALIVE": True}
MAX_LOOPS = 15
port_counter = count()
keep_alive_timeout_app_reuse = Sanic("test_ka_timeout_reuse")
keep_alive_app_client_timeout = Sanic("test_ka_client_timeout")
keep_alive_app_server_timeout = Sanic("test_ka_server_timeout")
keep_alive_app_context = Sanic("keep_alive_app_context")
keep_alive_timeout_app_reuse.config.update(CONFIG_FOR_TESTS)
keep_alive_app_client_timeout.config.update(CONFIG_FOR_TESTS)
keep_alive_app_server_timeout.config.update(CONFIG_FOR_TESTS)
keep_alive_app_context.config.update(CONFIG_FOR_TESTS)
@keep_alive_timeout_app_reuse.route("/1")
async def handler1(request):
return text("OK")
@keep_alive_app_client_timeout.route("/1")
async def handler2(request):
return text("OK")
@keep_alive_app_server_timeout.route("/1")
async def handler3(request):
return text("OK")
@keep_alive_app_context.post("/ctx")
def set_ctx(request):
request.conn_info.ctx.foo = "hello"
return text("OK")
@keep_alive_app_context.get("/ctx")
def get_ctx(request):
return text(request.conn_info.ctx.foo)
@pytest.mark.skipif(
bool(environ.get("SANIC_NO_UVLOOP")) or OS_IS_WINDOWS,
reason="Not testable with current client",
)
def test_keep_alive_timeout_reuse(port):
"""If the server keep-alive timeout and client keep-alive timeout are
both longer than the delay, the client _and_ server will successfully
reuse the existing connection."""
loops = 0
while True:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReusableClient(
keep_alive_timeout_app_reuse, loop=loop, port=port
)
try:
with client:
headers = {"Connection": "keep-alive"}
request, response = client.get("/1", headers=headers)
assert response.status == 200
assert response.text == "OK"
assert request.protocol.state["requests_count"] == 1
loop.run_until_complete(aio_sleep(1))
request, response = client.get("/1")
assert response.status == 200
assert response.text == "OK"
assert request.protocol.state["requests_count"] == 2
except OSError:
loops += 1
if loops > MAX_LOOPS:
raise
continue
else:
break
@pytest.mark.skipif(
bool(environ.get("SANIC_NO_UVLOOP"))
or OS_IS_WINDOWS
or platform.system() != "Linux",
reason="Not testable with current client",
)
def test_keep_alive_client_timeout(port):
"""If the server keep-alive timeout is longer than the client
keep-alive timeout, client will try to create a new connection here."""
loops = 0
while True:
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReusableClient(
keep_alive_app_client_timeout, loop=loop, port=port
)
with client:
headers = {"Connection": "keep-alive"}
request, response = client.get(
"/1", headers=headers, timeout=1
)
assert response.status == 200
assert response.text == "OK"
assert request.protocol.state["requests_count"] == 1
loop.run_until_complete(aio_sleep(2))
request, response = client.get("/1", timeout=1)
assert request.protocol.state["requests_count"] == 1
except OSError:
loops += 1
if loops > MAX_LOOPS:
raise
continue
else:
break
@pytest.mark.skipif(
bool(environ.get("SANIC_NO_UVLOOP")) or OS_IS_WINDOWS,
reason="Not testable with current client",
)
def test_keep_alive_server_timeout(port):
"""If the client keep-alive timeout is longer than the server
keep-alive timeout, the client will either a 'Connection reset' error
_or_ a new connection. Depending on how the event-loop handles the
broken server connection."""
loops = 0
while True:
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReusableClient(
keep_alive_app_server_timeout, loop=loop, port=port
)
with client:
headers = {"Connection": "keep-alive"}
request, response = client.get(
"/1", headers=headers, timeout=60
)
assert response.status == 200
assert response.text == "OK"
assert request.protocol.state["requests_count"] == 1
loop.run_until_complete(aio_sleep(3))
request, response = client.get("/1", timeout=60)
assert request.protocol.state["requests_count"] == 1
except OSError:
loops += 1
if loops > MAX_LOOPS:
raise
continue
else:
break
@pytest.mark.skipif(
bool(environ.get("SANIC_NO_UVLOOP")) or OS_IS_WINDOWS,
reason="Not testable with current client",
)
def test_keep_alive_connection_context(port):
loops = 0
while True:
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReusableClient(
keep_alive_app_context, loop=loop, port=port
)
with client:
headers = {"Connection": "keep-alive"}
request1, _ = client.post("/ctx", headers=headers)
loop.run_until_complete(aio_sleep(1))
request2, response = client.get("/ctx")
assert response.text == "hello"
assert id(request1.conn_info.ctx) == id(request2.conn_info.ctx)
assert (
request1.conn_info.ctx.foo
== request2.conn_info.ctx.foo
== "hello"
)
assert request2.protocol.state["requests_count"] == 2
except OSError:
loops += 1
if loops > MAX_LOOPS:
raise
continue
else:
break