Skip to content

Commit

Permalink
Merge pull request #15 from pei-jikui/release-1.0.0
Browse files Browse the repository at this point in the history
format the file
  • Loading branch information
pei-jikui authored May 9, 2020
2 parents 1c1f3ad + 9878a6d commit bdca28b
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 31 deletions.
64 changes: 45 additions & 19 deletions src/stream/ngx_stream_alg_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@

static ngx_int_t ngx_stream_alg_init(ngx_conf_t *cf);
static ngx_int_t ngx_stream_alg_handler(ngx_stream_session_t *s);
static char * ngx_stream_alg_alg(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static ngx_event_handler_pt ngx_stream_alg_get_stream_handler(ngx_stream_session_t *s,ngx_event_handler_pt handler, ngx_int_t up_down);
static char * ngx_stream_alg_alg(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static ngx_event_handler_pt ngx_stream_alg_get_stream_handler(
ngx_stream_session_t *s,
ngx_event_handler_pt handler,
ngx_int_t up_down);
static void * ngx_stream_alg_create_srv_conf(ngx_conf_t *cf);
static void * ngx_stream_alg_create_main_conf(ngx_conf_t *cf);

Expand Down Expand Up @@ -59,7 +63,8 @@ ngx_module_t ngx_stream_alg_module = {


static ngx_int_t
ngx_stream_alg_ftp_get_peer_addr(ngx_stream_session_t *s, u_char *addr_info, ssize_t size)
ngx_stream_alg_ftp_get_peer_addr(ngx_stream_session_t *s, u_char *addr_info,
ssize_t size)
{
ngx_stream_alg_ctx_t *ctx;
ngx_stream_upstream_resolved_t *peer = NULL;
Expand All @@ -79,15 +84,17 @@ ngx_stream_alg_ftp_get_peer_addr(ngx_stream_session_t *s, u_char *addr_info, ssi
return NGX_ERROR;
}

if (sscanf((const char*)addr_info,"%u,%u,%u,%u,%u,%u",&addr1,&addr2,&addr3,&addr4,&port1,&port2) != 6){
if (sscanf((const char*)addr_info,"%u,%u,%u,%u,%u,%u",&addr1,&addr2,&addr3,
&addr4,&port1,&port2) != 6){
return NGX_ERROR;
}

server_addr = ngx_pcalloc(c->pool,INET_ADDRSTRLEN+1);
if (server_addr == NULL ){
return NGX_ERROR;
}
ngx_snprintf(server_addr,INET_ADDRSTRLEN,"%ud.%ud.%ud.%ud",addr1,addr2,addr3,addr4);
ngx_snprintf(server_addr,INET_ADDRSTRLEN,"%ud.%ud.%ud.%ud",addr1,addr2,
addr3,addr4);

peer = ngx_pcalloc(c->pool,sizeof(ngx_stream_upstream_resolved_t));
if (peer == NULL) {
Expand Down Expand Up @@ -156,7 +163,8 @@ static ngx_int_t ngx_stream_alg_create_listening_port(ngx_stream_session_t *s)
ls->addr_text.data = ngx_pcalloc(s->connection->pool,ls->addr_text.len);
ngx_memset(ls->addr_text.data,0,ls->addr_text.len);
ngx_log_debug2(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
"original listening socket work id %ud : current work id %ud",ls->worker,ngx_worker);
"original listening socket work id %ud : current work id %ud",
ls->worker,ngx_worker);
if (ngx_open_one_listening_socket(ls) == NGX_ERROR) {
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
"Failed to create listening socket on port number.");
Expand All @@ -167,7 +175,8 @@ static ngx_int_t ngx_stream_alg_create_listening_port(ngx_stream_session_t *s)

if ( ngx_event_one_listening_init(ls) == NGX_ERROR) {
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
"Failed to initialize listening socket on port number: %ud",port_num);
"Failed to initialize listening socket on port number: \
%ud", port_num);
ngx_pfree(s->connection->pool,ls);
ngx_pfree(s->connection->pool,p);
return NGX_ERROR;
Expand Down Expand Up @@ -232,7 +241,8 @@ ngx_stream_alg_ftp_process_handler(ngx_stream_session_t *s,ngx_buf_t* buffer)
if (ngx_strstrn(command+total_len-2,CRLF,2) == NULL ) {

ngx_log_debug2(NGX_LOG_DEBUG_STREAM,s->connection->log,0,
"%s Don't find a full sentence %s with \"\\r\\n\"",__func__,command);
"%s Don't find a full sentence %s with \"\\r\\n\"",
__func__,command);
ngx_pfree(c->pool,command);
return NGX_AGAIN;
}
Expand Down Expand Up @@ -272,9 +282,11 @@ ngx_stream_alg_ftp_process_handler(ngx_stream_session_t *s,ngx_buf_t* buffer)
ngx_uint_t try_times = 0;
left_brace += 1;
right_brace -= 1;
if (ngx_stream_alg_ftp_get_peer_addr(s,left_brace,right_brace-left_brace+1) < 0){
if (ngx_stream_alg_ftp_get_peer_addr(s,left_brace,
right_brace-left_brace+1) < 0){
ngx_log_debug1(NGX_LOG_DEBUG_STREAM,s->connection->log,0,
"%s:Doesn't contain the right pattern for ip and port.",__func__);
"%s:Doesn't contain the right pattern for ip and port.",
__func__);
ngx_pfree(c->pool,command);
return NGX_OK;
}
Expand All @@ -283,20 +295,26 @@ ngx_stream_alg_ftp_process_handler(ngx_stream_session_t *s,ngx_buf_t* buffer)
ngx_pfree(c->pool,command);
return NGX_ERROR;
}
ngx_inet_ntop(sockaddr.sin_family,(struct sockaddr *)&sockaddr.sin_addr,addr_str,INET_ADDRSTRLEN);
ngx_inet_ntop(sockaddr.sin_family,
(struct sockaddr *)&sockaddr.sin_addr, addr_str,
INET_ADDRSTRLEN);
ngx_log_debug2(NGX_LOG_DEBUG_STREAM,s->connection->log,0,
"%s the address is %s.",__func__,addr_str);
number = sscanf((const char *)addr_str,"%u.%u.%u.%u",&addr1,&addr2,&addr3,&addr4);
number = sscanf((const char *)addr_str,"%u.%u.%u.%u",&addr1,
&addr2,&addr3,&addr4);
}else {
fd = s->upstream ->peer.connection->fd;
if (getsockname(fd, (struct sockaddr *)&sockaddr,&socklen) == -1) {
ngx_pfree(c->pool,command);
return NGX_ERROR;
}
ngx_inet_ntop(sockaddr.sin_family,(struct sockaddr *)&sockaddr.sin_addr,addr_str,INET_ADDRSTRLEN);
ngx_inet_ntop(sockaddr.sin_family,
(struct sockaddr *)&sockaddr.sin_addr,
addr_str,INET_ADDRSTRLEN);
ngx_log_debug2(NGX_LOG_DEBUG_STREAM,s->connection->log,0,
"%s the address is %s.",__func__,addr_str);
number = sscanf((const char *)addr_str,"%u.%u.%u.%u",&addr1,&addr2,&addr3,&addr4);
number = sscanf((const char *)addr_str,"%u.%u.%u.%u",&addr1,&addr2,
&addr3,&addr4);

}
if(number != 4 ) {
Expand All @@ -308,13 +326,15 @@ ngx_stream_alg_ftp_process_handler(ngx_stream_session_t *s,ngx_buf_t* buffer)
}
if (try_times >= 5 ) {
ngx_log_debug1(NGX_LOG_DEBUG_STREAM,s->connection->log,0,
"%s allocate a new socket for data session failed.",__func__);
"%s allocate a new socket for data session failed.",
__func__);
ngx_pfree(c->pool,command);
return NGX_ERROR;
}
ngx_memset(buffer->pos,0,total_len);
if (entering_alg == 1) {
ngx_snprintf(buffer->pos,80,"227 Entering Passive Mode (%ud,%ud,%ud,%ud,%ud,%ud).\r\n",
ngx_snprintf(buffer->pos,80,"227 Entering Passive Mode \
(%ud,%ud,%ud,%ud,%ud,%ud).\r\n",
addr1,addr2,addr3,addr4,port_num/256,port_num%256);
}else {
ngx_snprintf(buffer->pos,80,"PORT %ud,%ud,%ud,%ud,%ud,%ud\r\n",
Expand Down Expand Up @@ -362,6 +382,7 @@ ngx_stream_alg_handler(ngx_stream_session_t *s)
ctx->alg_resolved_peer = NULL;
}
}

if ( c->buffer == NULL ) {
return NGX_DECLINED;
}
Expand Down Expand Up @@ -426,7 +447,8 @@ ngx_stream_alg_init(ngx_conf_t *cf)
return NGX_OK;
}

static ngx_int_t ngx_stream_stream_handler(ngx_event_t *ev, ngx_int_t stream_direction)
static ngx_int_t ngx_stream_stream_handler(ngx_event_t *ev,
ngx_int_t stream_direction)
{
ngx_connection_t *c;
ngx_stream_session_t *s;
Expand Down Expand Up @@ -537,7 +559,8 @@ static ngx_int_t ngx_stream_stream_handler(ngx_event_t *ev, ngx_int_t stream_dir
c->buffer->pos = c->buffer->last;
return NGX_OK;
}
static void ngx_stream_alg_stream_handler(ngx_event_t *ev, ngx_int_t stream_direction)
static void ngx_stream_alg_stream_handler(ngx_event_t *ev,
ngx_int_t stream_direction)
{
ngx_stream_alg_main_conf_t *amcf;
ngx_int_t rc;
Expand Down Expand Up @@ -576,7 +599,10 @@ static void ngx_stream_alg_downstream_handler(ngx_event_t *ev)
return;
}

static ngx_event_handler_pt ngx_stream_alg_get_stream_handler(ngx_stream_session_t *s,ngx_event_handler_pt pre_handler, ngx_int_t up_down)
static ngx_event_handler_pt ngx_stream_alg_get_stream_handler(
ngx_stream_session_t *s,
ngx_event_handler_pt pre_handler,
ngx_int_t up_down)
{
ngx_event_handler_pt handler = NULL;
ngx_stream_alg_main_conf_t *amcf;
Expand Down
10 changes: 7 additions & 3 deletions src/stream/ngx_stream_alg_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
#define NGX_STREAM_ALG_DOWNSTREAM 0
#define NGX_STREAM_ALG_UPSTREAM 1

typedef ngx_int_t (*ngx_stream_alg_handler_pt)(ngx_stream_session_t *s,u_char *buf,ssize_t ssize);
typedef ngx_int_t (*ngx_stream_alg_process_handler_pt)(ngx_stream_session_t *s);
typedef ngx_int_t (*ngx_stream_alg_handler_pt)(ngx_stream_session_t *s,
u_char *buf,ssize_t ssize);
typedef ngx_int_t (*ngx_stream_alg_process_handler_pt)(
ngx_stream_session_t *s);
typedef void (*ngx_stream_ream_handler)(ngx_event_t *ev);
typedef ngx_event_handler_pt (*ngx_stream_alg_get_handler)(ngx_stream_session_t *s, ngx_event_handler_pt handler,ngx_int_t up_down);
typedef ngx_event_handler_pt (*ngx_stream_alg_get_handler)(
ngx_stream_session_t *s, ngx_event_handler_pt handler,
ngx_int_t up_down);

typedef struct {
ngx_flag_t alg_ftp;
Expand Down
31 changes: 22 additions & 9 deletions src/stream/ngx_stream_proxy_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,12 @@ ngx_stream_proxy_handler(ngx_stream_session_t *s)
if (!parent) {
amcf = ngx_stream_get_module_main_conf(s,ngx_stream_alg_module);
if (amcf) {
c->write->handler = (amcf->alg_get_stream_handler)(s,ngx_stream_proxy_downstream_handler,NGX_STREAM_ALG_DOWNSTREAM);
c->read->handler = (amcf->alg_get_stream_handler)(s,ngx_stream_proxy_downstream_handler,NGX_STREAM_ALG_DOWNSTREAM);
c->write->handler = (amcf->alg_get_stream_handler)(s,
ngx_stream_proxy_downstream_handler,
NGX_STREAM_ALG_DOWNSTREAM);
c->read->handler = (amcf->alg_get_stream_handler)(s,
ngx_stream_proxy_downstream_handler,
NGX_STREAM_ALG_DOWNSTREAM);
}
}
}
Expand Down Expand Up @@ -459,17 +463,20 @@ ngx_stream_proxy_handler(ngx_stream_session_t *s)
ngx_stream_alg_ctx_t *ctx;
ctx = ngx_stream_get_module_ctx(parent, ngx_stream_alg_module);
if (ctx && ctx->alg_resolved_peer) {
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, "Alg data connection, don't need to select server.");
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
"Alg data connection, don't need to select server.");
u->resolved = ctx->alg_resolved_peer;
goto resolved;
} else {
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, "ctx or alg resolved peer is invalidate.");
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
"ctx or alg resolved peer is invalidate.");
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
return;
/*error*/
}
} else {
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, "Don't find the parent session.");
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
"Don't find the parent session.");
}
#endif
if (pscf->upstream_value) {
Expand Down Expand Up @@ -951,8 +958,12 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
if (!parent) {
amcf = ngx_stream_get_module_main_conf(s,ngx_stream_alg_module);
if (amcf) {
pc->write->handler = (amcf->alg_get_stream_handler)(s,ngx_stream_proxy_upstream_handler,NGX_STREAM_ALG_UPSTREAM);
pc->read->handler = (amcf->alg_get_stream_handler)(s,ngx_stream_proxy_upstream_handler,NGX_STREAM_ALG_UPSTREAM);
pc->write->handler = (amcf->alg_get_stream_handler)(s,
ngx_stream_proxy_upstream_handler,
NGX_STREAM_ALG_UPSTREAM);
pc->read->handler = (amcf->alg_get_stream_handler)(s,
ngx_stream_proxy_upstream_handler,
NGX_STREAM_ALG_UPSTREAM);
}
}
}
Expand Down Expand Up @@ -1283,7 +1294,8 @@ ngx_stream_proxy_ssl_name(ngx_stream_session_t *s)
static void
ngx_stream_proxy_downstream_handler(ngx_event_t *ev)
{
ngx_log_debug1(NGX_LOG_DEBUG_STREAM,ev->log,0,"downstream handler %s.",ev->write? "write":"read");
ngx_log_debug1(NGX_LOG_DEBUG_STREAM,ev->log,0,"downstream handler %s.",
ev->write? "write":"read");
ngx_stream_proxy_process_connection(ev, ev->write);
}

Expand Down Expand Up @@ -1360,7 +1372,8 @@ ngx_stream_proxy_resolve_handler(ngx_resolver_ctx_t *ctx)
static void
ngx_stream_proxy_upstream_handler(ngx_event_t *ev)
{
ngx_log_debug1(NGX_LOG_DEBUG_STREAM,ev->log,0,"upstream handler %s.",ev->write? "write":"read");
ngx_log_debug1(NGX_LOG_DEBUG_STREAM,ev->log,0,"upstream handler %s.",
ev->write? "write":"read");
ngx_stream_proxy_process_connection(ev, !ev->write);
}

Expand Down

0 comments on commit bdca28b

Please sign in to comment.