From 8b690aafe5aa716064067eeeb7941e36fe2a00dc Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Fri, 3 Jan 2025 13:27:40 -0700 Subject: [PATCH] Get more server tests working --- src/HTTP.jl | 1 + src/access_log.jl | 20 ++-- src/client/request.jl | 2 +- src/requestresponse.jl | 11 ++- src/server.jl | 203 ++++++++++++++++++++++++++--------------- test/server.jl | 95 +++++++++++++++++-- 6 files changed, 238 insertions(+), 94 deletions(-) diff --git a/src/HTTP.jl b/src/HTTP.jl index 9b532f87..78281f75 100644 --- a/src/HTTP.jl +++ b/src/HTTP.jl @@ -3,6 +3,7 @@ module HTTP using CodecZlib, URIs, Mmap, Base64 using LibAwsCommon, LibAwsIO, LibAwsHTTP +export @logfmt_str, common_logfmt, combined_logfmt export WebSockets include("utils.jl") diff --git a/src/access_log.jl b/src/access_log.jl index f9e86101..bbef3b9c 100644 --- a/src/access_log.jl +++ b/src/access_log.jl @@ -59,14 +59,14 @@ function symbol_mapping(s::Symbol) str = string(s) if (m = match(r"^http_(.+)$", str); m !== nothing) hdr = replace(String(m[1]), '_' => '-') - :(HTTP.header(http.message, $hdr, "-")) + :(something(HTTP.getheader(http.request.headers, $hdr), "-")) elseif (m = match(r"^sent_http_(.+)$", str); m !== nothing) hdr = replace(String(m[1]), '_' => '-') - :(HTTP.header(http.message.response, $hdr, "-")) + :(something(HTTP.getheader(http.response.headers, $hdr), "-")) elseif s === :remote_addr - :(http.stream.peerip) + :(HTTP.remote_address(http.connection)) elseif s === :remote_port - :(http.stream.peerport) + :(HTTP.remote_port(http.connection)) elseif s === :remote_user :("-") # TODO: find from Basic auth... elseif s === :time_iso8601 @@ -89,17 +89,17 @@ function symbol_mapping(s::Symbol) m = symbol_mapping(:request_method) t = symbol_mapping(:request_uri) p = symbol_mapping(:server_protocol) - (m, " ", t, " ", p...) + (m, " ", t, " ", p) elseif s === :request_method - :(http.message.method) + :(http.request.method) elseif s === :request_uri - :(http.message.target) + :(http.request.target) elseif s === :server_protocol - ("HTTP/", :(http.message.version.major), ".", :(http.message.version.minor)) + :(HTTP.http_version(http.connection)) elseif s === :status - :(http.message.response.status) + :(http.response.status) elseif s === :body_bytes_sent - return :(max(0, http.nwritten)) + :(HTTP.bodylen(http.response)) else error("unknown variable in logfmt: $s") end diff --git a/src/client/request.jl b/src/client/request.jl index ca118f11..a3589eaf 100644 --- a/src/client/request.jl +++ b/src/client/request.jl @@ -24,7 +24,7 @@ function with_request(f::Function, client::Client, method, path, headers=nothing setheader(h, "host", client.settings.host) end setheaderifabsent(h, "accept", "*/*") - setheaderifabsent(h, "user-agent", USER_AGENT[]) + setheaderifabsent(h, "user-agent", something(USER_AGENT[], "-")) if decompress === nothing || decompress setheaderifabsent(h, "accept-encoding", "gzip") end diff --git a/src/requestresponse.jl b/src/requestresponse.jl index a1d15bea..b5db0a08 100644 --- a/src/requestresponse.jl +++ b/src/requestresponse.jl @@ -141,6 +141,9 @@ function InputStream(allocator::Ptr{aws_allocator}, body::RequestBodyTypes) end function setinputstream!(msg::Message, body) + aws_http_message_set_body_stream(msg.ptr, C_NULL) + msg.inputstream = nothing + body === nothing && return input_stream = InputStream(msg.allocator, body) setfield!(msg, :inputstream, input_stream) if input_stream.ptr != C_NULL @@ -158,7 +161,7 @@ end mutable struct Request <: Message allocator::Ptr{aws_allocator} ptr::Ptr{aws_http_message} - inputstream::InputStream # used for outgoing request body + inputstream::Union{Nothing, InputStream} # used for outgoing request body # only set in server-side request handlers body::Union{Nothing, Vector{UInt8}} route::Union{Nothing, String} @@ -181,6 +184,7 @@ mutable struct Request <: Message end req = new(allocator, ptr) req.body = nothing + req.inputstream = nothing req.route = nothing req.params = nothing req.cookies = nothing @@ -291,7 +295,7 @@ RequestMetrics() = RequestMetrics(0, 0, 0, nothing) mutable struct Response <: Message allocator::Ptr{aws_allocator} ptr::Ptr{aws_http_message} - inputstream::InputStream + inputstream::Union{Nothing, InputStream} body::Union{Nothing, Vector{UInt8}} # only set for client-side response body when no user-provided response_body metrics::RequestMetrics request::Request @@ -311,6 +315,7 @@ mutable struct Response <: Message end resp = new(allocator, ptr) resp.body = nothing + resp.inputstream = nothing body !== nothing && setinputstream!(resp, body) return finalizer(_ -> aws_http_message_release(ptr), resp) catch @@ -324,6 +329,8 @@ Response(body=nothing) = Response(0, nothing, body) Response(status::Integer, body) = Response(status, nothing, Vector{UInt8}(string(body))) Response(status::Integer) = Response(status, nothing, nothing) +bodylen(m::Message) = isdefined(m, :inputstream) && m.inputstream !== nothing ? m.inputstream.bodylen : 0 + function Base.getproperty(x::Response, s::Symbol) if s == :status ref = Ref{Cint}() diff --git a/src/server.jl b/src/server.jl index e56ed9e2..e9fa438e 100644 --- a/src/server.jl +++ b/src/server.jl @@ -20,6 +20,28 @@ end Base.hash(c::Connection, h::UInt) = hash(c.connection, h) +function remote_address(c::Connection) + socket_ptr = aws_http_connection_get_remote_endpoint(c.connection) + addr = unsafe_load(socket_ptr).address + bytes = Vector{UInt8}(undef, length(addr)) + nul_i = 0 + for i in eachindex(bytes) + b = addr[i] + @inbounds bytes[i] = b + if b == 0x00 + nul_i = i + break + end + end + resize!(bytes, nul_i == 0 ? length(addr) : nul_i - 1) + return String(bytes) +end +remote_port(c::Connection) = Int(unsafe_load(aws_http_connection_get_remote_endpoint(c.connection)).port) +function http_version(c::Connection) + v = aws_http_connection_get_version(c.connection) + return v == AWS_HTTP_VERSION_2 ? "HTTP/2" : "HTTP/1.1" +end + mutable struct Server{F} const f::F const fut::Future{Symbol} @@ -30,6 +52,8 @@ mutable struct Server{F} const connections_lock::ReentrantLock const connections::Set{Connection} const closed::Threads.Event + const access_log::Union{Nothing, Function} + const logstate::Base.CoreLogging.LogState @atomic state::Symbol # :initializing, :running, :closed server::Ptr{aws_http_server} server_options::aws_http_server_options @@ -44,18 +68,22 @@ mutable struct Server{F} connections_lock::ReentrantLock, connections::Set{Connection}, closed::Threads.Event, + access_log::Union{Nothing, Function}, + logstate::Base.CoreLogging.LogState, state::Symbol, - ) where {F} = new{F}(f, fut, allocator, endpoint, socket_options, tls_options, connections_lock, connections, closed, state) + ) where {F} = new{F}(f, fut, allocator, endpoint, socket_options, tls_options, connections_lock, connections, closed, access_log, logstate, state) end Base.wait(s::Server) = wait(s.closed) - ftype(::Server{F}) where {F} = F +port(s::Server) = Int(s.endpoint.port) function serve!(f, host="127.0.0.1", port=8080; allocator=default_aws_allocator(), bootstrap::Ptr{aws_server_bootstrap}=default_aws_server_bootstrap(), endpoint=nothing, + listenany::Bool=false, + access_log::Union{Nothing, Function}=nothing, # socket options socket_options=nothing, socket_domain=:ipv4, @@ -74,6 +102,9 @@ function serve!(f, host="127.0.0.1", port=8080; ssl_alpn_list="h2;http/1.1", initial_window_size=typemax(UInt64), ) + if listenany + @warn "listenany not yet supported; only trying port = $port" + end server = Server{typeof(f)}( f, # RequestHandler Future{Symbol}(), @@ -101,6 +132,8 @@ function serve!(f, host="127.0.0.1", port=8080; ReentrantLock(), # connections_lock Set{Connection}(), # connections Threads.Event(), # closed + access_log, + Base.CoreLogging.current_logstate(), :initializing, # state ) server.server_options = aws_http_server_options( @@ -126,80 +159,86 @@ const on_incoming_connection = Ref{Ptr{Cvoid}}(C_NULL) function c_on_incoming_connection(aws_server, aws_conn, error_code, server_ptr) server = unsafe_pointer_to_objref(server_ptr) - if error_code != 0 - @error "incoming connection error" exception=(aws_error(error_code), Base.backtrace()) - return - end - conn = Connection( - server, - server.allocator, - aws_conn, - ) - conn.connection_options = aws_http_server_connection_options( - 1, - pointer_from_objref(conn), - on_incoming_request[], - on_connection_shutdown[] - ) - if aws_http_connection_configure_server( - aws_conn, - FieldRef(conn, :connection_options) - ) != 0 - @error "failed to configure connection" exception=(aws_error(), Base.backtrace()) + Base.CoreLogging.with_logstate(server.logstate) do + if error_code != 0 + @error "incoming connection error" exception=(aws_error(error_code), Base.backtrace()) + return + end + conn = Connection( + server, + server.allocator, + aws_conn, + ) + conn.connection_options = aws_http_server_connection_options( + 1, + pointer_from_objref(conn), + on_incoming_request[], + on_connection_shutdown[] + ) + if aws_http_connection_configure_server( + aws_conn, + FieldRef(conn, :connection_options) + ) != 0 + @error "failed to configure connection" exception=(aws_error(), Base.backtrace()) + return + end + @lock server.connections_lock begin + push!(server.connections, conn) + end return end - @lock server.connections_lock begin - push!(server.connections, conn) - end - return end const on_connection_shutdown = Ref{Ptr{Cvoid}}(C_NULL) function c_on_connection_shutdown(aws_conn, error_code, conn_ptr) conn = unsafe_pointer_to_objref(conn_ptr) - if error_code != 0 - @error "connection shutdown error" exception=(aws_error(error_code), Base.backtrace()) - end - @lock conn.server.connections_lock begin - delete!(conn.server.connections, conn) + Base.CoreLogging.with_logstate(conn.server.logstate) do + if error_code != 0 + @error "connection shutdown error" exception=(aws_error(error_code), Base.backtrace()) + end + @lock conn.server.connections_lock begin + delete!(conn.server.connections, conn) + end + return end - return end const on_incoming_request = Ref{Ptr{Cvoid}}(C_NULL) function c_on_incoming_request(aws_conn, conn_ptr) conn = unsafe_pointer_to_objref(conn_ptr) - stream = Stream{typeof(conn)}( - conn.allocator, - false, # decompress - aws_http_connection_get_version(aws_conn) == AWS_HTTP_VERSION_2 # http2 - ) - stream.connection = conn - stream.request_handler_options = aws_http_request_handler_options( - 1, - aws_conn, - pointer_from_objref(stream), - on_request_headers[], - on_request_header_block_done[], - on_request_body[], - on_request_done[], - on_server_stream_complete[], - on_destroy[] - ) - stream.request = Request("", "") - stream.ptr = aws_http_stream_new_server_request_handler( - FieldRef(stream, :request_handler_options) - ) - if stream.ptr == C_NULL - @error "failed to create stream" exception=(aws_error(), Base.backtrace()) - else - @lock conn.streams_lock begin - push!(conn.streams, stream) + Base.CoreLogging.with_logstate(conn.server.logstate) do + stream = Stream{typeof(conn)}( + conn.allocator, + false, # decompress + aws_http_connection_get_version(aws_conn) == AWS_HTTP_VERSION_2 # http2 + ) + stream.connection = conn + stream.request_handler_options = aws_http_request_handler_options( + 1, + aws_conn, + pointer_from_objref(stream), + on_request_headers[], + on_request_header_block_done[], + on_request_body[], + on_request_done[], + on_server_stream_complete[], + on_destroy[] + ) + stream.request = Request("", "") + stream.ptr = aws_http_stream_new_server_request_handler( + FieldRef(stream, :request_handler_options) + ) + if stream.ptr == C_NULL + @error "failed to create stream" exception=(aws_error(), Base.backtrace()) + else + @lock conn.streams_lock begin + push!(conn.streams, stream) + end end + return stream.ptr end - return stream.ptr end const on_request_headers = Ref{Ptr{Cvoid}}(C_NULL) @@ -247,34 +286,48 @@ const on_request_done = Ref{Ptr{Cvoid}}(C_NULL) function c_on_request_done(aws_stream_ptr, stream_ptr) stream = unsafe_pointer_to_objref(stream_ptr) - try - stream.response = stream.connection.server.f(stream.request)::Response - #TODO: is it possible to stream the response body? - #TODO: support transfer-encoding: gzip + Base.CoreLogging.with_logstate(stream.connection.server.logstate) do + try + stream.response = Base.invokelatest(stream.connection.server.f, stream.request)::Response + if stream.request.method == "HEAD" + setinputstream!(stream.response, nothing) + end + #TODO: is it possible to stream the response body? + #TODO: support transfer-encoding: gzip + catch e + @error "Request handler error; sending 500" exception=(e, catch_backtrace()) + stream.response = Response(500) + end ret = aws_http_stream_send_response(aws_stream_ptr, stream.response.ptr) if ret != 0 @error "failed to send response" exception=(aws_error(ret), Base.backtrace()) - return Cint(-1) + return Cint(AWS_ERROR_HTTP_UNKNOWN) end - catch e - @error "failed to process request" exception=(e, catch_backtrace()) - #TODO: send 500 here? - return Cint(-1) + return Cint(0) end - return Cint(0) end const on_server_stream_complete = Ref{Ptr{Cvoid}}(C_NULL) function c_on_server_stream_complete(aws_stream_ptr, error_code, stream_ptr) stream = unsafe_pointer_to_objref(stream_ptr) - if error_code != 0 - @error "server complete error" exception=(aws_error(error_code), Base.backtrace()) - end - @lock stream.connection.streams_lock begin - delete!(stream.connection.streams, stream) + Base.CoreLogging.with_logstate(stream.connection.server.logstate) do + if error_code != 0 + @error "server complete error" exception=(aws_error(error_code), Base.backtrace()) + end + if stream.connection.server.access_log !== nothing + try + @info sprint(stream.connection.server.access_log, stream) _group=:access + catch e + @show e + @error "access log error" exception=(e, catch_backtrace()) + end + end + @lock stream.connection.streams_lock begin + delete!(stream.connection.streams, stream) + end + return Cint(0) end - return Cint(0) end const on_destroy_complete = Ref{Ptr{Cvoid}}(C_NULL) diff --git a/test/server.jl b/test/server.jl index bd2bb9cf..535c314b 100644 --- a/test/server.jl +++ b/test/server.jl @@ -1,9 +1,92 @@ -using Test, HTTP +using Test, HTTP, Logging @testset "HTTP.serve" begin - server = HTTP.serve!(req -> HTTP.Response(200, "Hello, World!")) - @test server.state == :running - resp = HTTP.get("http://127.0.0.1:8080") - @test resp.status == 200 - @test String(resp.body) == "Hello, World!" + server = HTTP.serve!(req -> HTTP.Response(200, "Hello, World!"); listenany=true) + try + @test server.state == :running + resp = HTTP.get("http://127.0.0.1:8080") + @test resp.status == 200 + @test String(resp.body) == "Hello, World!" + finally + close(server) + end +end + +@testset "access logging" begin + local handler = (req) -> begin + if req.target == "/internal-error" + error("internal error") + end + if req.target == "/close" + return HTTP.Response(444, ["content-type" => "text/plain"], nothing) + end + return HTTP.Response(200, ["content-type" => "text/plain"], "hello, world") + end + function with_testserver(f, fmt) + logger = Test.TestLogger() + with_logger(logger) do + server = HTTP.serve!(handler; listenany=true, access_log=fmt) + port = HTTP.port(server) + try + f(port) + finally + close(server) + end + end + return filter!(x -> x.group == :access, logger.logs) + end + + # Common Log Format + logs = with_testserver(common_logfmt) do port + HTTP.get("http://127.0.0.1:$port") + HTTP.get("http://127.0.0.1:$port/index.html") + HTTP.get("http://127.0.0.1:$port/index.html?a=b") + HTTP.head("http://127.0.0.1:$port") + HTTP.get("http://127.0.0.1:$port/internal-error"; status_exception=false) + # sleep(1) # necessary to properly forget the closed connection from the previous call + try HTTP.get("http://127.0.0.1:$port/close"; retry=false) catch end + HTTP.get("http://127.0.0.1:$port", ["Connection" => "close"]) + sleep(1) # we want to make sure the server has time to finish logging before checking logs + end + @test length(logs) == 7 + @test all(x -> x.group === :access, logs) + @test occursin(r"^127.0.0.1 - - \[(\d{2})/.*/(\d{4}):\d{2}:\d{2}:\d{2}.*\] \"GET / HTTP/1.1\" 200 12$", logs[1].message) + @test occursin(r"^127.0.0.1 - - \[(\d{2})/.*/(\d{4}):\d{2}:\d{2}:\d{2}.*\] \"GET /index.html HTTP/1.1\" 200 12$", logs[2].message) + @test occursin(r"^127.0.0.1 - - \[(\d{2})/.*/(\d{4}):\d{2}:\d{2}:\d{2}.*\] \"GET /index.html\?a=b HTTP/1.1\" 200 12$", logs[3].message) + @test occursin(r"^127.0.0.1 - - \[(\d{2})/.*/(\d{4}):\d{2}:\d{2}:\d{2}.*\] \"HEAD / HTTP/1.1\" 200 0$", logs[4].message) + @test occursin(r"^127.0.0.1 - - \[(\d{2})/.*/(\d{4}):\d{2}:\d{2}:\d{2}.*\] \"GET /internal-error HTTP/1.1\" 500 \d+$", logs[5].message) + @test occursin(r"^127.0.0.1 - - \[(\d{2})/.*/(\d{4}):\d{2}:\d{2}:\d{2}.*\] \"GET /close HTTP/1.1\" 444 0$", logs[6].message) + @test occursin(r"^127.0.0.1 - - \[(\d{2})/.*/(\d{4}):\d{2}:\d{2}:\d{2}.*\] \"GET / HTTP/1.1\" 200 12$", logs[7].message) + + # Combined Log Format + logs = with_testserver(combined_logfmt) do port + HTTP.get("http://127.0.0.1:$port", ["Referer" => "julialang.org"]) + HTTP.get("http://127.0.0.1:$port/index.html") + useragent = HTTP.USER_AGENT[] + HTTP.setuseragent!(nothing) + HTTP.get("http://127.0.0.1:$port/index.html?a=b") + HTTP.setuseragent!(useragent) + HTTP.head("http://127.0.0.1:$port") + end + @test length(logs) == 4 + @test all(x -> x.group === :access, logs) + @test occursin(r"^127.0.0.1 - - \[(\d{2})/.*/(\d{4}):\d{2}:\d{2}:\d{2}.*\] \"GET / HTTP/1.1\" 200 12 \"julialang\.org\" \"HTTP\.jl/.*\"$", logs[1].message) + @test occursin(r"^127.0.0.1 - - \[(\d{2})/.*/(\d{4}):\d{2}:\d{2}:\d{2}.*\] \"GET /index.html HTTP/1.1\" 200 12 \"-\" \"HTTP\.jl/.*\"$", logs[2].message) + @test occursin(r"^127.0.0.1 - - \[(\d{2})/.*/(\d{4}):\d{2}:\d{2}:\d{2}.*\] \"GET /index.html\?a=b HTTP/1.1\" 200 12 \"-\" \"-\"$", logs[3].message) + @test occursin(r"^127.0.0.1 - - \[(\d{2})/.*/(\d{4}):\d{2}:\d{2}:\d{2}.*\] \"HEAD / HTTP/1.1\" 200 0 \"-\" \"HTTP\.jl/.*\"$", logs[4].message) + + # Custom log format + fmt = logfmt"$http_accept $sent_http_content_type $request $request_method $request_uri $remote_addr $remote_port $remote_user $server_protocol $time_iso8601 $time_local $status $body_bytes_sent" + logs = with_testserver(fmt) do port + HTTP.get("http://127.0.0.1:$port", ["Accept" => "application/json"]) + HTTP.get("http://127.0.0.1:$port/index.html") + HTTP.get("http://127.0.0.1:$port/index.html?a=b") + HTTP.head("http://127.0.0.1:$port") + end + @test length(logs) == 4 + @test all(x -> x.group === :access, logs) + @test occursin(r"^application/json text/plain GET / HTTP/1\.1 GET / 127\.0\.0\.1 \d+ - HTTP/1\.1 \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.* \d+/.*/\d{4}:\d{2}:\d{2}:\d{2}.* 200 12$", logs[1].message) + @test occursin(r"^\*/\* text/plain GET /index\.html HTTP/1\.1 GET /index\.html 127\.0\.0\.1 \d+ - HTTP/1\.1 \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.* \d+/.*/\d{4}:\d{2}:\d{2}:\d{2}.* 200 12$", logs[2].message) + @test occursin(r"^\*/\* text/plain GET /index\.html\?a=b HTTP/1\.1 GET /index\.html\?a=b 127\.0\.0\.1 \d+ - HTTP/1\.1 \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.* \d+/.*/\d{4}:\d{2}:\d{2}:\d{2}.* 200 12$", logs[3].message) + @test occursin(r"^\*/\* text/plain HEAD / HTTP/1\.1 HEAD / 127\.0\.0\.1 \d+ - HTTP/1\.1 \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.* \d+/.*/\d{4}:\d{2}:\d{2}:\d{2}.* 200 0$", logs[4].message) end \ No newline at end of file