Skip to content

Commit 2c67417

Browse files
committed
feature: enabled the 'tcpsock:receiveany()' API for upstream TCP cosockets.
Ported from ngx_meta_lua #3018ec8 and ngx_http_lua #b5ffb11.
1 parent dc91fd7 commit 2c67417

4 files changed

+534
-7
lines changed

src/ngx_stream_lua_socket_tcp.c

+95-4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ static int ngx_stream_lua_socket_tcp_connect(lua_State *L);
3333
static int ngx_stream_lua_socket_tcp_sslhandshake(lua_State *L);
3434
#endif
3535
static int ngx_stream_lua_socket_tcp_receive(lua_State *L);
36+
static int ngx_stream_lua_socket_tcp_receiveany(lua_State *L);
3637
static int ngx_stream_lua_socket_tcp_send(lua_State *L);
3738
static int ngx_stream_lua_socket_tcp_close(lua_State *L);
3839
static int ngx_stream_lua_socket_tcp_setoption(lua_State *L);
@@ -107,6 +108,7 @@ static int ngx_stream_lua_socket_write_error_retval_handler(
107108
static ngx_int_t ngx_stream_lua_socket_read_all(void *data, ssize_t bytes);
108109
static ngx_int_t ngx_stream_lua_socket_read_until(void *data, ssize_t bytes);
109110
static ngx_int_t ngx_stream_lua_socket_read_chunk(void *data, ssize_t bytes);
111+
static ngx_int_t ngx_stream_lua_socket_read_any(void *data, ssize_t bytes);
110112
static int ngx_stream_lua_socket_tcp_receiveuntil(lua_State *L);
111113
static int ngx_stream_lua_socket_receiveuntil_iterator(lua_State *L);
112114
static ngx_int_t ngx_stream_lua_socket_compile_pattern(u_char *data, size_t len,
@@ -316,7 +318,7 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
316318
/* {{{tcp object metatable */
317319
lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
318320
tcp_socket_metatable_key));
319-
lua_createtable(L, 0 /* narr */, 13 /* nrec */);
321+
lua_createtable(L, 0 /* narr */, 14 /* nrec */);
320322

321323
lua_pushcfunction(L, ngx_stream_lua_socket_tcp_connect);
322324
lua_setfield(L, -2, "connect");
@@ -334,6 +336,9 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
334336
lua_pushcfunction(L, ngx_stream_lua_socket_tcp_receiveuntil);
335337
lua_setfield(L, -2, "receiveuntil");
336338

339+
lua_pushcfunction(L, ngx_stream_lua_socket_tcp_receiveany);
340+
lua_setfield(L, -2, "receiveany");
341+
337342
lua_pushcfunction(L, ngx_stream_lua_socket_tcp_send);
338343
lua_setfield(L, -2, "send");
339344

@@ -358,7 +363,6 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
358363
lua_pushcfunction(L, ngx_stream_lua_socket_tcp_shutdown);
359364
lua_setfield(L, -2, "shutdown");
360365

361-
362366
lua_pushvalue(L, -1);
363367
lua_setfield(L, -2, "__index");
364368
lua_rawset(L, LUA_REGISTRYINDEX);
@@ -2366,6 +2370,75 @@ ngx_stream_lua_socket_tcp_receive_helper(ngx_stream_lua_request_t *r,
23662370
}
23672371

23682372

2373+
static int
2374+
ngx_stream_lua_socket_tcp_receiveany(lua_State *L)
2375+
{
2376+
int n;
2377+
lua_Integer bytes;
2378+
ngx_stream_lua_request_t *r;
2379+
ngx_stream_lua_loc_conf_t *llcf;
2380+
2381+
ngx_stream_lua_socket_tcp_upstream_t *u;
2382+
2383+
n = lua_gettop(L);
2384+
if (n != 2) {
2385+
return luaL_error(L, "expecting 2 arguments "
2386+
"(including the object), but got %d", n);
2387+
}
2388+
2389+
r = ngx_stream_lua_get_req(L);
2390+
if (r == NULL) {
2391+
return luaL_error(L, "no request found");
2392+
}
2393+
2394+
luaL_checktype(L, 1, LUA_TTABLE);
2395+
2396+
lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
2397+
u = lua_touserdata(L, -1);
2398+
2399+
if (u == NULL || u->peer.connection == NULL || u->read_closed) {
2400+
2401+
llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
2402+
2403+
if (llcf->log_socket_errors) {
2404+
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
2405+
"attempt to receive data on a closed "
2406+
"socket: u:%p, c:%p, ft:%d eof:%d",
2407+
u, u ? u->peer.connection : NULL,
2408+
u ? (int) u->ft_type : 0, u ? (int) u->eof : 0);
2409+
}
2410+
2411+
lua_pushnil(L);
2412+
lua_pushliteral(L, "closed");
2413+
return 2;
2414+
}
2415+
2416+
if (u->request != r) {
2417+
return luaL_error(L, "bad request");
2418+
}
2419+
2420+
ngx_stream_lua_socket_check_busy_connecting(r, u, L);
2421+
ngx_stream_lua_socket_check_busy_reading(r, u, L);
2422+
2423+
if (!lua_isnumber(L, 2)) {
2424+
return luaL_argerror(L, 2, "bad max argument");
2425+
}
2426+
2427+
bytes = lua_tointeger(L, 2);
2428+
if (bytes <= 0) {
2429+
return luaL_argerror(L, 2, "bad max argument");
2430+
}
2431+
2432+
u->input_filter = ngx_stream_lua_socket_read_any;
2433+
u->rest = (size_t) bytes;
2434+
u->length = u->rest;
2435+
2436+
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
2437+
"stream lua tcp socket calling receiveany() "
2438+
"method to read at most %uz bytes", u->rest);
2439+
2440+
return ngx_stream_lua_socket_tcp_receive_helper(r, u, L);
2441+
}
23692442

23702443

23712444
static int
@@ -2406,8 +2479,8 @@ ngx_stream_lua_socket_tcp_receive(lua_State *L)
24062479

24072480
if (llcf->log_socket_errors) {
24082481
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
2409-
"attempt to receive data on a closed socket: u:%p, "
2410-
"c:%p, ft:%d eof:%d",
2482+
"stream attempt to receive data on a closed "
2483+
"socket: u:%p, c:%p, ft:%d eof:%d",
24112484
u, u ? u->peer.connection : NULL,
24122485
u ? (int) u->ft_type : 0, u ? (int) u->eof : 0);
24132486
}
@@ -2550,6 +2623,24 @@ ngx_stream_lua_socket_read_line(void *data, ssize_t bytes)
25502623
}
25512624

25522625

2626+
static ngx_int_t
2627+
ngx_stream_lua_socket_read_any(void *data, ssize_t bytes)
2628+
{
2629+
ngx_int_t rc;
2630+
ngx_stream_lua_socket_tcp_upstream_t *u = data;
2631+
2632+
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, u->request->connection->log, 0,
2633+
"stream lua tcp socket read any");
2634+
2635+
rc = ngx_stream_lua_read_any(&u->buffer, u->buf_in, &u->rest, bytes,
2636+
u->request->connection->log);
2637+
if (rc == NGX_ERROR) {
2638+
u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_CLOSED;
2639+
return NGX_ERROR;
2640+
}
2641+
2642+
return rc;
2643+
}
25532644

25542645

25552646
static ngx_int_t

0 commit comments

Comments
 (0)