diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 3877ac58f9..a8037af8c1 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -185,15 +185,15 @@ jobs: - name: Build FFI env: RUSTFLAGS: --cfg hyper_unstable_ffi - run: cargo rustc --features client,http1,http2,ffi --crate-type cdylib + run: cargo rustc --features ffi --crate-type cdylib --release - name: Make Examples - run: cd capi/examples && make client + run: cd capi/examples && make - name: Run FFI unit tests env: RUSTFLAGS: --cfg hyper_unstable_ffi - run: cargo test --features server,client,http1,http2,ffi --lib + run: cargo test --features ffi --lib ffi-header: name: Verify hyper.h is up to date diff --git a/Cargo.toml b/Cargo.toml index f569ba228c..cd28d9f4b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ want = { version = "0.3", optional = true } [dev-dependencies] form_urlencoded = "1" futures-channel = { version = "0.3", features = ["sink"] } -futures-util = { version = "0.3", default-features = false, features = ["alloc", "sink"] } +futures-util = { version = "0.3", default-features = false, features = ["sink"] } http-body-util = "0.1" pretty_env_logger = "0.5" spmc = "0.3" @@ -69,12 +69,7 @@ tokio-util = "0.7.10" default = [] # Easily turn it all on -full = [ - "client", - "http1", - "http2", - "server", -] +full = ["client", "http1", "http2", "server", "futures-util?/alloc"] # HTTP versions http1 = ["dep:futures-channel", "dep:futures-util", "dep:httparse", "dep:itoa"] @@ -85,7 +80,7 @@ client = ["dep:want", "dep:pin-project-lite", "dep:smallvec"] server = ["dep:httpdate", "dep:pin-project-lite", "dep:smallvec"] # C-API support (currently unstable (no semver)) -ffi = ["dep:libc", "dep:http-body-util", "futures-util?/alloc"] +ffi = ["dep:libc", "full", "dep:http-body-util", "futures-util?/alloc"] # Utilize tracing (currently unstable) tracing = ["dep:tracing"] diff --git a/benches/support/tokiort.rs b/benches/support/tokiort.rs index 0beea038cd..2c9fdc542e 100644 --- a/benches/support/tokiort.rs +++ b/benches/support/tokiort.rs @@ -1,14 +1,12 @@ #![allow(dead_code)] //! Various runtimes for hyper use std::{ - future::Future, pin::Pin, task::{Context, Poll}, time::{Duration, Instant}, }; use hyper::rt::{Sleep, Timer}; -use pin_project_lite::pin_project; #[derive(Clone)] /// An Executor that uses the tokio runtime. @@ -25,25 +23,20 @@ where } /// A Timer that uses the tokio runtime. - #[derive(Clone, Debug)] pub struct TokioTimer; impl Timer for TokioTimer { fn sleep(&self, duration: Duration) -> Pin> { - Box::pin(TokioSleep { - inner: tokio::time::sleep(duration), - }) + Box::pin(tokio::time::sleep(duration)) } fn sleep_until(&self, deadline: Instant) -> Pin> { - Box::pin(TokioSleep { - inner: tokio::time::sleep_until(deadline.into()), - }) + Box::pin(tokio::time::sleep_until(deadline.into())) } fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { - if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { + if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { sleep.reset(new_deadline.into()) } } @@ -56,32 +49,7 @@ impl TokioTimer { } } -// Use TokioSleep to get tokio::time::Sleep to implement Unpin. -// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html -pin_project! { - pub(crate) struct TokioSleep { - #[pin] - pub(crate) inner: tokio::time::Sleep, - } -} - -impl Future for TokioSleep { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().inner.poll(cx) - } -} - -impl Sleep for TokioSleep {} - -impl TokioSleep { - pub fn reset(self: Pin<&mut Self>, deadline: Instant) { - self.project().inner.as_mut().reset(deadline.into()); - } -} - -pin_project! { +pin_project_lite::pin_project! { #[derive(Debug)] pub struct TokioIo { #[pin] diff --git a/capi/cbindgen.toml b/capi/cbindgen.toml index a4990840e1..b16c3abd85 100644 --- a/capi/cbindgen.toml +++ b/capi/cbindgen.toml @@ -7,7 +7,7 @@ header = """/* * * Full docs at: https://docs.rs/hyper/latest/hyper/ffi/index.html */""" -include_guard = "_HYPER_H" +pragma_once = true no_includes = true sys_includes = ["stdint.h", "stddef.h", "stdbool.h"] cpp_compat = true diff --git a/capi/examples/.clang-format b/capi/examples/.clang-format new file mode 100644 index 0000000000..6f387775e1 --- /dev/null +++ b/capi/examples/.clang-format @@ -0,0 +1,191 @@ +--- +Language: Cpp +AccessModifierOffset: -2 +AlignAfterOpenBracket: BlockIndent +AlignArrayOfStructures: None +AlignConsecutiveMacros: None +AlignConsecutiveAssignments: None +AlignConsecutiveBitFields: None +AlignConsecutiveDeclarations: None +AlignEscapedNewlines: Right +AlignOperands: Align +AlignTrailingComments: true +AllowAllArgumentsOnNextLine: true +AllowAllParametersOfDeclarationOnNextLine: true +AllowShortEnumsOnASingleLine: true +AllowShortBlocksOnASingleLine: Never +AllowShortCaseLabelsOnASingleLine: false +AllowShortFunctionsOnASingleLine: All +AllowShortLambdasOnASingleLine: All +AllowShortIfStatementsOnASingleLine: Never +AllowShortLoopsOnASingleLine: false +AlwaysBreakAfterDefinitionReturnType: None +AlwaysBreakAfterReturnType: None +AlwaysBreakBeforeMultilineStrings: false +AlwaysBreakTemplateDeclarations: MultiLine +AttributeMacros: + - __capability +BinPackArguments: false +BinPackParameters: false +BraceWrapping: + AfterCaseLabel: false + AfterClass: false + AfterControlStatement: Never + AfterEnum: false + AfterFunction: false + AfterNamespace: false + AfterObjCDeclaration: false + AfterStruct: false + AfterUnion: false + AfterExternBlock: false + BeforeCatch: false + BeforeElse: false + BeforeLambdaBody: false + BeforeWhile: false + IndentBraces: false + SplitEmptyFunction: true + SplitEmptyRecord: true + SplitEmptyNamespace: true +BreakBeforeBinaryOperators: None +BreakBeforeConceptDeclarations: true +BreakBeforeBraces: Attach +BreakBeforeInheritanceComma: false +BreakInheritanceList: BeforeColon +BreakBeforeTernaryOperators: true +BreakConstructorInitializersBeforeComma: false +BreakConstructorInitializers: BeforeColon +BreakAfterJavaFieldAnnotations: false +BreakStringLiterals: true +ColumnLimit: 100 +CommentPragmas: '^ IWYU pragma:' +QualifierAlignment: Leave +CompactNamespaces: false +ConstructorInitializerIndentWidth: 4 +ContinuationIndentWidth: 4 +Cpp11BracedListStyle: true +DeriveLineEnding: true +DerivePointerAlignment: false +DisableFormat: false +EmptyLineAfterAccessModifier: Never +EmptyLineBeforeAccessModifier: LogicalBlock +ExperimentalAutoDetectBinPacking: false +PackConstructorInitializers: BinPack +BasedOnStyle: '' +ConstructorInitializerAllOnOneLineOrOnePerLine: false +AllowAllConstructorInitializersOnNextLine: true +FixNamespaceComments: true +ForEachMacros: + - foreach + - Q_FOREACH + - BOOST_FOREACH +IfMacros: + - KJ_IF_MAYBE +IncludeBlocks: Preserve +IncludeCategories: + - Regex: '^"(llvm|llvm-c|clang|clang-c)/' + Priority: 2 + SortPriority: 0 + CaseSensitive: false + - Regex: '^(<|"(gtest|gmock|isl|json)/)' + Priority: 3 + SortPriority: 0 + CaseSensitive: false + - Regex: '.*' + Priority: 1 + SortPriority: 0 + CaseSensitive: false +IncludeIsMainRegex: '(Test)?$' +IncludeIsMainSourceRegex: '' +IndentAccessModifiers: false +IndentCaseLabels: false +IndentCaseBlocks: false +IndentGotoLabels: true +IndentPPDirectives: None +IndentExternBlock: AfterExternBlock +IndentRequires: false +IndentWidth: 4 +IndentWrappedFunctionNames: false +InsertTrailingCommas: None +JavaScriptQuotes: Leave +JavaScriptWrapImports: true +KeepEmptyLinesAtTheStartOfBlocks: true +LambdaBodyIndentation: Signature +MacroBlockBegin: '' +MacroBlockEnd: '' +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None +ObjCBinPackProtocolList: Auto +ObjCBlockIndentWidth: 2 +ObjCBreakBeforeNestedBlockParam: true +ObjCSpaceAfterProperty: false +ObjCSpaceBeforeProtocolList: true +PenaltyBreakAssignment: 2 +PenaltyBreakBeforeFirstCallParameter: 19 +PenaltyBreakComment: 300 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakOpenParenthesis: 0 +PenaltyBreakString: 1000 +PenaltyBreakTemplateDeclaration: 10 +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 1000 +PenaltyIndentedWhitespace: 0 +PointerAlignment: Right +PPIndentWidth: -1 +ReferenceAlignment: Pointer +ReflowComments: true +RemoveBracesLLVM: false +SeparateDefinitionBlocks: Leave +ShortNamespaceLines: 1 +SortIncludes: CaseSensitive +SortJavaStaticImport: Before +SortUsingDeclarations: true +SpaceAfterCStyleCast: false +SpaceAfterLogicalNot: false +SpaceAfterTemplateKeyword: true +SpaceBeforeAssignmentOperators: true +SpaceBeforeCaseColon: false +SpaceBeforeCpp11BracedList: false +SpaceBeforeCtorInitializerColon: true +SpaceBeforeInheritanceColon: true +SpaceBeforeParens: ControlStatements +SpaceBeforeParensOptions: + AfterControlStatements: true + AfterForeachMacros: true + AfterFunctionDefinitionName: false + AfterFunctionDeclarationName: false + AfterIfMacros: true + AfterOverloadedOperator: false + BeforeNonEmptyParentheses: false +SpaceAroundPointerQualifiers: Default +SpaceBeforeRangeBasedForLoopColon: true +SpaceInEmptyBlock: false +SpaceInEmptyParentheses: false +SpacesBeforeTrailingComments: 1 +SpacesInAngles: Never +SpacesInConditionalStatement: false +SpacesInContainerLiterals: true +SpacesInCStyleCastParentheses: false +SpacesInLineCommentPrefix: + Minimum: 1 + Maximum: -1 +SpacesInParentheses: false +SpacesInSquareBrackets: false +SpaceBeforeSquareBrackets: false +BitFieldColonSpacing: Both +Standard: Latest +StatementAttributeLikeMacros: + - Q_EMIT +StatementMacros: + - Q_UNUSED + - QT_REQUIRE_VERSION +TabWidth: 8 +UseCRLF: false +UseTab: Never +WhitespaceSensitiveMacros: + - STRINGIZE + - PP_STRINGIZE + - BOOST_PP_STRINGIZE + - NS_SWIFT_NAME + - CF_SWIFT_NAME +... + diff --git a/capi/examples/.gitignore b/capi/examples/.gitignore new file mode 100644 index 0000000000..d1da6f3056 --- /dev/null +++ b/capi/examples/.gitignore @@ -0,0 +1,4 @@ +/*.o +/server +/client +/upload diff --git a/capi/examples/Makefile b/capi/examples/Makefile index 951c99fe62..c668d43043 100644 --- a/capi/examples/Makefile +++ b/capi/examples/Makefile @@ -2,24 +2,22 @@ # Build the example client # -TARGET = client -TARGET2 = upload - -OBJS = client.o -OBJS2 = upload.o - -RPATH=$(PWD)/../../target/debug -CFLAGS = -I../include -LDFLAGS = -L$(RPATH) -Wl,-rpath,$(RPATH) +RPATH=$(PWD)/../../target/release +HYPER_CFLAGS += -I../include -ggdb3 -O2 +HYPER_LDFLAGS += -L$(RPATH) -Wl,-rpath,$(RPATH) LIBS = -lhyper -all: $(TARGET) $(TARGET2) +all: client upload server -$(TARGET): $(OBJS) - $(CC) -o $(TARGET) $(OBJS) $(LDFLAGS) $(LIBS) +%.o : %.c ../include/hyper.h + ${CC} -c -o $@ $< $(HYPER_CFLAGS) $(CFLAGS) -$(TARGET2): $(OBJS2) - $(CC) -o $(TARGET2) $(OBJS2) $(LDFLAGS) $(LIBS) +client: client.o + $(CC) -o $@ $< $(HYPER_LDFLAGS) $(LDFLAGS) $(LIBS) +upload: upload.o + $(CC) -o $@ $< $(HYPER_LDFLAGS) $(LDFLAGS) $(LIBS) +server: server.o + $(CC) -o $@ $< $(HYPER_LDFLAGS) $(LDFLAGS) $(LIBS) clean: - rm -f $(OBJS) $(TARGET) $(OBJS2) $(TARGET2) + rm -f *.o client server upload diff --git a/capi/examples/client.c b/capi/examples/client.c index 57a3e7b6c7..273369dc98 100644 --- a/capi/examples/client.c +++ b/capi/examples/client.c @@ -1,19 +1,18 @@ -#include +#include +#include +#include +#include +#include #include +#include +#include #include -#include -#include -#include -#include -#include +#include #include -#include -#include #include "hyper.h" - struct conn_data { int fd; hyper_waker *read_waker; @@ -112,12 +111,10 @@ static int connect_to(const char *host, const char *port) { return sfd; } -static int print_each_header(void *userdata, - const uint8_t *name, - size_t name_len, - const uint8_t *value, - size_t value_len) { - printf("%.*s: %.*s\n", (int) name_len, name, (int) value_len, value); +static int print_each_header( + void *userdata, const uint8_t *name, size_t name_len, const uint8_t *value, size_t value_len +) { + printf("%.*s: %.*s\n", (int)name_len, name, (int)value_len, value); return HYPER_ITER_CONTINUE; } @@ -135,7 +132,12 @@ typedef enum { EXAMPLE_HANDSHAKE, EXAMPLE_SEND, EXAMPLE_RESP_BODY -} example_id; +} example_state; + +typedef union example_id { + void *ptr; + example_state state; +} example_userdata; #define STR_ARG(XX) (uint8_t *)XX, strlen(XX) @@ -168,7 +170,7 @@ int main(int argc, char *argv[]) { // Hookup the IO hyper_io *io = hyper_io_new(); - hyper_io_set_userdata(io, (void *)conn); + hyper_io_set_userdata(io, (void *)conn, NULL); hyper_io_set_read(io, read_cb); hyper_io_set_write(io, write_cb); @@ -182,7 +184,7 @@ int main(int argc, char *argv[]) { hyper_clientconn_options_exec(opts, exec); hyper_task *handshake = hyper_clientconn_handshake(io, opts); - hyper_task_set_userdata(handshake, (void *)EXAMPLE_HANDSHAKE); + hyper_task_set_userdata(handshake, (void *)EXAMPLE_HANDSHAKE, NULL); // Let's wait for the handshake to finish... hyper_executor_push(exec, handshake); @@ -198,9 +200,8 @@ int main(int argc, char *argv[]) { if (!task) { break; } - switch ((example_id) hyper_task_userdata(task)) { + switch (((example_userdata)hyper_task_userdata(task)).state) { case EXAMPLE_HANDSHAKE: - ; if (hyper_task_type(task) == HYPER_TASK_ERROR) { printf("handshake error!\n"); err = hyper_task_value(task); @@ -225,11 +226,11 @@ int main(int argc, char *argv[]) { } hyper_headers *req_headers = hyper_request_headers(req); - hyper_headers_set(req_headers, STR_ARG("Host"), STR_ARG(host)); + hyper_headers_set(req_headers, STR_ARG("Host"), STR_ARG(host)); // Send it! hyper_task *send = hyper_clientconn_send(client, req); - hyper_task_set_userdata(send, (void *)EXAMPLE_SEND); + hyper_task_set_userdata(send, (void *)EXAMPLE_SEND, NULL); printf("sending ...\n"); hyper_executor_push(exec, send); @@ -238,7 +239,6 @@ int main(int argc, char *argv[]) { break; case EXAMPLE_SEND: - ; if (hyper_task_type(task) == HYPER_TASK_ERROR) { printf("send error!\n"); err = hyper_task_value(task); @@ -254,15 +254,15 @@ int main(int argc, char *argv[]) { const uint8_t *rp = hyper_response_reason_phrase(resp); size_t rp_len = hyper_response_reason_phrase_len(resp); - printf("\nResponse Status: %d %.*s\n", http_status, (int) rp_len, rp); + printf("\nResponse Status: %d %.*s\n", http_status, (int)rp_len, rp); hyper_headers *headers = hyper_response_headers(resp); hyper_headers_foreach(headers, print_each_header, NULL); printf("\n"); hyper_body *resp_body = hyper_response_body(resp); - hyper_task *foreach = hyper_body_foreach(resp_body, print_each_chunk, NULL); - hyper_task_set_userdata(foreach, (void *)EXAMPLE_RESP_BODY); + hyper_task *foreach = hyper_body_foreach(resp_body, print_each_chunk, NULL, NULL); + hyper_task_set_userdata(foreach, (void *)EXAMPLE_RESP_BODY, NULL); hyper_executor_push(exec, foreach); // No longer need the response @@ -270,7 +270,6 @@ int main(int argc, char *argv[]) { break; case EXAMPLE_RESP_BODY: - ; if (hyper_task_type(task) == HYPER_TASK_ERROR) { printf("body error!\n"); err = hyper_task_value(task); @@ -323,7 +322,6 @@ int main(int argc, char *argv[]) { hyper_waker_wake(conn->write_waker); conn->write_waker = NULL; } - } return 0; @@ -332,9 +330,9 @@ int main(int argc, char *argv[]) { if (err) { printf("error code: %d\n", hyper_error_code(err)); // grab the error details - char errbuf [256]; + unsigned char errbuf[256]; size_t errlen = hyper_error_print(err, errbuf, sizeof(errbuf)); - printf("details: %.*s\n", (int) errlen, errbuf); + printf("details: %.*s\n", (int)errlen, errbuf); // clean up the error hyper_error_free(err); diff --git a/capi/examples/server.c b/capi/examples/server.c new file mode 100644 index 0000000000..d442bce39e --- /dev/null +++ b/capi/examples/server.c @@ -0,0 +1,580 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "hyper.h" + +static const int MAX_EVENTS = 128; + +typedef struct conn_data_s { + int fd; + int epoll_fd; + uint32_t event_mask; + hyper_waker *read_waker; + hyper_waker *write_waker; +} conn_data; + +static int listen_on(const char *host, const char *port) { + struct addrinfo hints; + struct addrinfo *result; + + // Work out bind address + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + hints.ai_canonname = NULL; + hints.ai_addr = NULL; + hints.ai_next = NULL; + + int gai_rc = getaddrinfo(host, port, &hints, &result); + if (gai_rc != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(gai_rc)); + return -1; + } + + // Try each bind address until one works + int sock = -1; + for (struct addrinfo *resp = result; resp; resp = resp->ai_next) { + sock = socket(resp->ai_family, resp->ai_socktype, resp->ai_protocol); + if (sock < 0) { + perror("socket"); + continue; + } + + // Enable SO_REUSEADDR + int reuseaddr = 1; + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) < 0) { + perror("setsockopt"); + } + + // Attempt to bind to the address + if (bind(sock, resp->ai_addr, resp->ai_addrlen) == 0) { + break; + } + perror("bind"); + + // Failed, tidy up + close(sock); + sock = -1; + } + + freeaddrinfo(result); + + if (sock < 0) { + return -1; + } + + // Non-blocking for async + if (fcntl(sock, F_SETFL, O_NONBLOCK) != 0) { + perror("fcntl(O_NONBLOCK) (listening)\n"); + return -1; + } + + // Close handle on exec(ve) + if (fcntl(sock, F_SETFD, FD_CLOEXEC) != 0) { + perror("fcntl(FD_CLOEXEC) (listening)\n"); + return 1; + } + + // Enable listening mode + if (listen(sock, 32) < 0) { + perror("listen"); + return -1; + } + + return sock; +} + +// Register interest in various termination signals. The returned fd can be +// polled with epoll. +static int register_signal_handler() { + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGINT); + sigaddset(&mask, SIGTERM); + sigaddset(&mask, SIGQUIT); + int signal_fd = signalfd(-1, &mask, SFD_NONBLOCK | SFD_CLOEXEC); + if (signal_fd < 0) { + perror("signalfd"); + return 1; + } + sigaddset(&mask, SIGPIPE); + if (sigprocmask(SIG_BLOCK, &mask, NULL) < 0) { + perror("sigprocmask"); + return 1; + } + + return signal_fd; +} + +// Register connection FD with epoll, associated with this `conn` +static bool update_conn_data_registrations(conn_data *conn, bool create) { + struct epoll_event transport_event; + transport_event.events = conn->event_mask; + transport_event.data.ptr = conn; + if (epoll_ctl( + conn->epoll_fd, create ? EPOLL_CTL_ADD : EPOLL_CTL_MOD, conn->fd, &transport_event + ) < 0) { + perror("epoll_ctl (transport)"); + return false; + } else { + return true; + } +} + +static size_t read_cb(void *userdata, hyper_context *ctx, uint8_t *buf, size_t buf_len) { + conn_data *conn = (conn_data *)userdata; + ssize_t ret = read(conn->fd, buf, buf_len); + + if (ret >= 0) { + // Normal (synchronous) read successful (or socket is closed) + return ret; + } + + if (errno != EAGAIN) { + // kaboom + return HYPER_IO_ERROR; + } + + // Otherwise this would block, so register interest and return pending + if (conn->read_waker != NULL) { + hyper_waker_free(conn->read_waker); + } + + if (!(conn->event_mask & EPOLLIN)) { + conn->event_mask |= EPOLLIN; + if (!update_conn_data_registrations(conn, false)) { + return HYPER_IO_ERROR; + } + } + + conn->read_waker = hyper_context_waker(ctx); + return HYPER_IO_PENDING; +} + +static size_t write_cb(void *userdata, hyper_context *ctx, const uint8_t *buf, size_t buf_len) { + conn_data *conn = (conn_data *)userdata; + ssize_t ret = write(conn->fd, buf, buf_len); + + if (ret >= 0) { + // Normal (synchronous) write successful (or socket is closed) + return ret; + } + + if (errno != EAGAIN) { + // kaboom + return HYPER_IO_ERROR; + } + + // Otherwise this would block, so register interest and return pending + if (conn->write_waker != NULL) { + hyper_waker_free(conn->write_waker); + } + + if (!(conn->event_mask & EPOLLOUT)) { + conn->event_mask |= EPOLLOUT; + if (!update_conn_data_registrations(conn, false)) { + return HYPER_IO_ERROR; + } + } + + conn->write_waker = hyper_context_waker(ctx); + return HYPER_IO_PENDING; +} + +static conn_data *create_conn_data(int epoll, int fd) { + conn_data *conn = malloc(sizeof(conn_data)); + conn->fd = fd; + conn->epoll_fd = epoll; + conn->event_mask = 0; + conn->read_waker = NULL; + conn->write_waker = NULL; + + if (!update_conn_data_registrations(conn, true)) { + free(conn); + return NULL; + } + + return conn; +} + +static void free_conn_data(void *userdata) { + conn_data *conn = (conn_data *)userdata; + + // Disassociate with the epoll + if (epoll_ctl(conn->epoll_fd, EPOLL_CTL_DEL, conn->fd, NULL) < 0) { + perror("epoll_ctl (transport, delete)"); + } + + // Drop any saved-off wakers + if (conn->read_waker) { + hyper_waker_free(conn->read_waker); + conn->read_waker = NULL; + } + if (conn->write_waker) { + hyper_waker_free(conn->write_waker); + conn->write_waker = NULL; + } + + // Shut down the socket connection + close(conn->fd); + + // ...and clean up + free(conn); +} + +static hyper_io *create_io(conn_data *conn) { + // Hookup the IO + hyper_io *io = hyper_io_new(); + hyper_io_set_userdata(io, (void *)conn, free_conn_data); + hyper_io_set_read(io, read_cb); + hyper_io_set_write(io, write_cb); + + return io; +} + +typedef struct service_userdata_s { + char host[128]; + char port[8]; + const hyper_executor *executor; +} service_userdata; + +static service_userdata *create_service_userdata() { + return (service_userdata *)calloc(1, sizeof(service_userdata)); +} + +static void free_service_userdata(void *userdata) { + service_userdata *cast_userdata = (service_userdata *)userdata; + free(cast_userdata); +} + +static int print_each_header( + void *userdata, const uint8_t *name, size_t name_len, const uint8_t *value, size_t value_len +) { + printf("%.*s: %.*s\n", (int)name_len, name, (int)value_len, value); + return HYPER_ITER_CONTINUE; +} + +static int print_body_chunk(void *userdata, const hyper_buf *chunk) { + const uint8_t *buf = hyper_buf_bytes(chunk); + size_t len = hyper_buf_len(chunk); + write(1, buf, len); + return HYPER_ITER_CONTINUE; +} + +static int send_each_body_chunk(void *userdata, hyper_context *ctx, hyper_buf **chunk) { + int *chunk_count = (int *)userdata; + if (*chunk_count > 0) { + unsigned char data[4096]; + memset(data, '0' + (*chunk_count % 10), sizeof(data)); + *chunk = hyper_buf_copy(data, sizeof(data)); + (*chunk_count)--; + } else { + *chunk = NULL; + } + return HYPER_POLL_READY; +} + +static void server_callback( + void *userdata, hyper_request *request, hyper_response_channel *channel +) { + service_userdata *service_data = (service_userdata *)userdata; + printf("Request from %s:%s\n", service_data->host, service_data->port); + + // Print out various properties of the request. + unsigned char scheme[16]; + size_t scheme_len = sizeof(scheme); + unsigned char authority[16]; + size_t authority_len = sizeof(authority); + unsigned char path_and_query[16]; + size_t path_and_query_len = sizeof(path_and_query); + if (hyper_request_uri_parts( + request, + scheme, + &scheme_len, + authority, + &authority_len, + path_and_query, + &path_and_query_len + ) == 0) { + printf("Request scheme was %.*s\n", (int)scheme_len, scheme); + printf("Request authority was %.*s\n", (int)authority_len, authority); + printf("Request path_and_query was %.*s\n", (int)path_and_query_len, path_and_query); + } + int version = hyper_request_version(request); + printf("Request version was %d\n", version); + unsigned char method[16]; + size_t method_len = sizeof(method); + if (hyper_request_method(request, method, &method_len) == 0) { + printf("Request method was %.*s\n", (int)method_len, method); + } + + // Print out all the headers from the request + hyper_headers *req_headers = hyper_request_headers(request); + hyper_headers_foreach(req_headers, print_each_header, NULL); + + if (!strcmp((char *)method, "POST") || !strcmp((char *)method, "PUT")) { + // ...consume the request body + hyper_body *body = hyper_request_body(request); + hyper_task *task = hyper_body_foreach(body, print_body_chunk, NULL, NULL); + hyper_executor_push(service_data->executor, task); + } + + // Tidy up + hyper_request_free(request); + + // Build a response + hyper_response *response = hyper_response_new(); + hyper_response_set_status(response, 200); + hyper_headers *rsp_headers = hyper_response_headers(response); + hyper_headers_set( + rsp_headers, (unsigned char *)"Cache-Control", 13, (unsigned char *)"no-cache", 8 + ); + + if (!strncmp((char *)method, "GET", method_len)) { + // ...add a body + hyper_body *body = hyper_body_new(); + hyper_body_set_data_func(body, send_each_body_chunk); + int *chunk_count = (int *)malloc(sizeof(int)); + *chunk_count = 10; + hyper_body_set_userdata(body, (void *)chunk_count, free); + hyper_response_set_body(response, body); + } + + // ...and send the response, completing the transaction + hyper_response_channel_send(channel, response); +} + +int main(int argc, char *argv[]) { + const char *host = argc > 1 ? argv[1] : "127.0.0.1"; + const char *port = argc > 2 ? argv[2] : "1234"; + printf("listening on port %s on %s...\n", port, host); + + // The main listening socket + int listen_fd = listen_on(host, port); + if (listen_fd < 0) { + return 1; + } + + int signal_fd = register_signal_handler(); + if (signal_fd < 0) { + return 1; + } + + // Use epoll cos' it's cool + int epoll = epoll_create1(EPOLL_CLOEXEC); + if (epoll < 0) { + perror("epoll"); + return 1; + } + + // Always await new connections from the listen socket + struct epoll_event listen_event; + listen_event.events = EPOLLIN; + listen_event.data.ptr = &listen_fd; + if (epoll_ctl(epoll, EPOLL_CTL_ADD, listen_fd, &listen_event) < 0) { + perror("epoll_ctl (add listening)"); + return 1; + } + + // Always await signals on the signal socket + struct epoll_event signal_event; + signal_event.events = EPOLLIN; + signal_event.data.ptr = &signal_fd; + if (epoll_ctl(epoll, EPOLL_CTL_ADD, signal_fd, &signal_event) < 0) { + perror("epoll_ctl (add signal)"); + return 1; + } + + printf("http handshake (hyper v%s) ...\n", hyper_version()); + + // We need an executor generally to poll futures + const hyper_executor *exec = hyper_executor_new(); + + // Configure the server HTTP/1 stack + hyper_http1_serverconn_options *http1_opts = hyper_http1_serverconn_options_new(exec); + hyper_http1_serverconn_options_header_read_timeout(http1_opts, 1000 * 5); // 5 seconds + + // Configure the server HTTP/2 stack + hyper_http2_serverconn_options *http2_opts = hyper_http2_serverconn_options_new(exec); + hyper_http2_serverconn_options_keep_alive_interval(http2_opts, 5); // 5 seconds + hyper_http2_serverconn_options_keep_alive_timeout(http2_opts, 5); // 5 seconds + + while (1) { + while (1) { + hyper_task *task = hyper_executor_poll(exec); + if (!task) { + break; + } + + if (hyper_task_type(task) == HYPER_TASK_ERROR) { + printf("hyper task failed with error!\n"); + + hyper_error *err = hyper_task_value(task); + printf("error code: %d\n", hyper_error_code(err)); + uint8_t errbuf[256]; + size_t errlen = hyper_error_print(err, errbuf, sizeof(errbuf)); + printf("details: %.*s\n", (int)errlen, errbuf); + + // clean up the error + hyper_error_free(err); + + // clean up the task + hyper_task_free(task); + + continue; + } + + if (hyper_task_type(task) == HYPER_TASK_EMPTY) { + printf("internal hyper task complete\n"); + hyper_task_free(task); + + continue; + } + + if (hyper_task_type(task) == HYPER_TASK_SERVERCONN) { + printf("server connection task complete\n"); + hyper_task_free(task); + + continue; + } + } + + int timeout = hyper_executor_next_timer_pop(exec); + + printf("Processed all tasks - polling for events (max %dms)\n", timeout); + + struct epoll_event events[MAX_EVENTS]; + + int nevents = epoll_wait(epoll, events, MAX_EVENTS, timeout); + if (nevents < 0) { + perror("epoll"); + return 1; + } + + printf("Poll reported %d events\n", nevents); + + for (int n = 0; n < nevents; n++) { + if (events[n].data.ptr == &listen_fd) { + // Incoming connection(s) on listen_fd + int new_fd; + struct sockaddr_storage remote_addr_storage; + struct sockaddr *remote_addr = (struct sockaddr *)&remote_addr_storage; + socklen_t remote_addr_len = sizeof(struct sockaddr_storage); + while ((new_fd = accept( + listen_fd, (struct sockaddr *)&remote_addr_storage, &remote_addr_len + )) >= 0) { + service_userdata *userdata = create_service_userdata(); + userdata->executor = exec; + if (getnameinfo( + remote_addr, + remote_addr_len, + userdata->host, + sizeof(userdata->host), + userdata->port, + sizeof(userdata->port), + NI_NUMERICHOST | NI_NUMERICSERV + ) < 0) { + perror("getnameinfo"); + printf("New incoming connection from (unknown)\n"); + } else { + printf( + "New incoming connection from (%s:%s)\n", userdata->host, userdata->port + ); + } + + // Set non-blocking + if (fcntl(new_fd, F_SETFL, O_NONBLOCK) != 0) { + perror("fcntl(O_NONBLOCK) (transport)\n"); + return 1; + } + + // Close handle on exec(ve) + if (fcntl(new_fd, F_SETFD, FD_CLOEXEC) != 0) { + perror("fcntl(FD_CLOEXEC) (transport)\n"); + return 1; + } + + // Wire up IO + conn_data *conn = create_conn_data(epoll, new_fd); + hyper_io *io = create_io(conn); + + // Ask hyper to drive this connection + hyper_service *service = hyper_service_new(server_callback); + hyper_service_set_userdata(service, userdata, free_service_userdata); + hyper_task *serverconn = + hyper_serve_httpX_connection(http1_opts, http2_opts, io, service); + hyper_executor_push(exec, serverconn); + } + + if (errno != EAGAIN) { + perror("accept"); + } + } else if (events[n].data.ptr == &signal_fd) { + struct signalfd_siginfo siginfo; + if (read(signal_fd, &siginfo, sizeof(struct signalfd_siginfo)) != + sizeof(struct signalfd_siginfo)) { + perror("read (signal_fd)"); + return 1; + } + + if (siginfo.ssi_signo == SIGINT) { + printf("Caught SIGINT... exiting\n"); + goto EXIT; + } else if (siginfo.ssi_signo == SIGTERM) { + printf("Caught SIGTERM... exiting\n"); + goto EXIT; + } else if (siginfo.ssi_signo == SIGQUIT) { + printf("Caught SIGQUIT... exiting\n"); + goto EXIT; + } else { + printf("Caught unexpected signal %d... ignoring\n", siginfo.ssi_signo); + } + } else { + // Existing transport socket, poke the wakers or close the socket + conn_data *conn = events[n].data.ptr; + if (events[n].events & EPOLLIN) { + if (conn->read_waker) { + hyper_waker_wake(conn->read_waker); + conn->read_waker = NULL; + } else { + conn->event_mask &= ~EPOLLIN; + if (!update_conn_data_registrations(conn, false)) { + epoll_ctl(conn->epoll_fd, EPOLL_CTL_DEL, conn->fd, NULL); + } + } + } + if (events[n].events & EPOLLOUT) { + if (conn->write_waker) { + hyper_waker_wake(conn->write_waker); + conn->write_waker = NULL; + } else { + conn->event_mask &= ~EPOLLOUT; + if (!update_conn_data_registrations(conn, false)) { + epoll_ctl(conn->epoll_fd, EPOLL_CTL_DEL, conn->fd, NULL); + } + } + } + } + } + } + +EXIT: + hyper_http1_serverconn_options_free(http1_opts); + hyper_http2_serverconn_options_free(http2_opts); + hyper_executor_free(exec); + + return 1; +} diff --git a/capi/examples/upload.c b/capi/examples/upload.c index 4bf44e6513..4da690f735 100644 --- a/capi/examples/upload.c +++ b/capi/examples/upload.c @@ -1,19 +1,18 @@ -#include +#include +#include +#include +#include +#include #include +#include +#include #include -#include -#include -#include -#include -#include +#include #include -#include -#include #include "hyper.h" - struct conn_data { int fd; hyper_waker *read_waker; @@ -114,14 +113,12 @@ static int connect_to(const char *host, const char *port) { struct upload_body { int fd; - char *buf; + unsigned char *buf; size_t len; }; -static int poll_req_upload(void *userdata, - hyper_context *ctx, - hyper_buf **chunk) { - struct upload_body* upload = userdata; +static int poll_req_upload(void *userdata, hyper_context *ctx, hyper_buf **chunk) { + struct upload_body *upload = userdata; ssize_t res = read(upload->fd, upload->buf, upload->len); if (res > 0) { @@ -140,12 +137,10 @@ static int poll_req_upload(void *userdata, return HYPER_POLL_ERROR; } -static int print_each_header(void *userdata, - const uint8_t *name, - size_t name_len, - const uint8_t *value, - size_t value_len) { - printf("%.*s: %.*s\n", (int) name_len, name, (int) value_len, value); +static int print_each_header( + void *userdata, const uint8_t *name, size_t name_len, const uint8_t *value, size_t value_len +) { + printf("%.*s: %.*s\n", (int)name_len, name, (int)value_len, value); return HYPER_ITER_CONTINUE; } @@ -160,7 +155,12 @@ typedef enum { EXAMPLE_HANDSHAKE, EXAMPLE_SEND, EXAMPLE_RESP_BODY -} example_id; +} example_state; + +typedef union example_id { + void *ptr; + example_state state; +} example_userdata; #define STR_ARG(XX) (uint8_t *)XX, strlen(XX) @@ -208,10 +208,9 @@ int main(int argc, char *argv[]) { conn->read_waker = NULL; conn->write_waker = NULL; - // Hookup the IO hyper_io *io = hyper_io_new(); - hyper_io_set_userdata(io, (void *)conn); + hyper_io_set_userdata(io, (void *)conn, NULL); hyper_io_set_read(io, read_cb); hyper_io_set_write(io, write_cb); @@ -225,7 +224,7 @@ int main(int argc, char *argv[]) { hyper_clientconn_options_exec(opts, exec); hyper_task *handshake = hyper_clientconn_handshake(io, opts); - hyper_task_set_userdata(handshake, (void *)EXAMPLE_HANDSHAKE); + hyper_task_set_userdata(handshake, (void *)EXAMPLE_HANDSHAKE, NULL); // Let's wait for the handshake to finish... hyper_executor_push(exec, handshake); @@ -243,9 +242,8 @@ int main(int argc, char *argv[]) { } hyper_task_return_type task_type = hyper_task_type(task); - switch ((example_id) hyper_task_userdata(task)) { + switch (((example_userdata)hyper_task_userdata(task)).state) { case EXAMPLE_HANDSHAKE: - ; if (task_type == HYPER_TASK_ERROR) { printf("handshake error!\n"); return 1; @@ -276,17 +274,17 @@ int main(int argc, char *argv[]) { // the body is sent immediately. This will just print if any // informational headers are received. printf(" with expect-continue ...\n"); - hyper_request_on_informational(req, print_informational, NULL); + hyper_request_on_informational(req, print_informational, NULL, NULL); // Prepare the req body hyper_body *body = hyper_body_new(); - hyper_body_set_userdata(body, &upload); + hyper_body_set_userdata(body, &upload, NULL); hyper_body_set_data_func(body, poll_req_upload); hyper_request_set_body(req, body); // Send it! hyper_task *send = hyper_clientconn_send(client, req); - hyper_task_set_userdata(send, (void *)EXAMPLE_SEND); + hyper_task_set_userdata(send, (void *)EXAMPLE_SEND, NULL); printf("sending ...\n"); hyper_executor_push(exec, send); @@ -295,7 +293,6 @@ int main(int argc, char *argv[]) { break; case EXAMPLE_SEND: - ; if (task_type == HYPER_TASK_ERROR) { printf("send error!\n"); return 1; @@ -318,7 +315,7 @@ int main(int argc, char *argv[]) { // Set us up to peel data from the body a chunk at a time hyper_task *body_data = hyper_body_data(resp_body); - hyper_task_set_userdata(body_data, (void *)EXAMPLE_RESP_BODY); + hyper_task_set_userdata(body_data, (void *)EXAMPLE_RESP_BODY, NULL); hyper_executor_push(exec, body_data); // No longer need the response @@ -326,7 +323,6 @@ int main(int argc, char *argv[]) { break; case EXAMPLE_RESP_BODY: - ; if (task_type == HYPER_TASK_ERROR) { printf("body error!\n"); return 1; @@ -339,7 +335,7 @@ int main(int argc, char *argv[]) { hyper_task_free(task); hyper_task *body_data = hyper_body_data(resp_body); - hyper_task_set_userdata(body_data, (void *)EXAMPLE_RESP_BODY); + hyper_task_set_userdata(body_data, (void *)EXAMPLE_RESP_BODY, NULL); hyper_executor_push(exec, body_data); break; @@ -395,6 +391,5 @@ int main(int argc, char *argv[]) { } } - return 0; } diff --git a/capi/gen_header.sh b/capi/gen_header.sh index 2696340b48..1abac92d5d 100755 --- a/capi/gen_header.sh +++ b/capi/gen_header.sh @@ -31,7 +31,7 @@ fi cp "$CAPI_DIR/include/hyper.h" "$header_file_backup" # Expand just the ffi module -if ! RUSTFLAGS='--cfg hyper_unstable_ffi' cargo expand --features client,http1,http2,ffi ::ffi 2> $WORK_DIR/expand_stderr.err > $WORK_DIR/expanded.rs; then +if ! RUSTFLAGS='--cfg hyper_unstable_ffi' cargo expand --features ffi,server,client,http1,http2 ::ffi 2> $WORK_DIR/expand_stderr.err > $WORK_DIR/expanded.rs; then cat $WORK_DIR/expand_stderr.err fi diff --git a/capi/include/hyper.h b/capi/include/hyper.h index b88bd39e9d..5f9fb56027 100644 --- a/capi/include/hyper.h +++ b/capi/include/hyper.h @@ -5,8 +5,7 @@ * Full docs at: https://docs.rs/hyper/latest/hyper/ffi/index.html */ -#ifndef _HYPER_H -#define _HYPER_H +#pragma once #include #include @@ -99,6 +98,10 @@ typedef enum hyper_code { The peer sent an HTTP message that could not be parsed. */ HYPERE_INVALID_PEER_MESSAGE, + /* + A provided buffer is too small to hold the value that would be written to it. + */ + HYPERE_INSUFFICIENT_SPACE, } hyper_code; /* @@ -125,6 +128,10 @@ typedef enum hyper_task_return_type { The value of this task is `hyper_buf *`. */ HYPER_TASK_BUF, + /* + The value of this task is null (the task was a server-side connection task) + */ + HYPER_TASK_SERVERCONN, } hyper_task_return_type; /* @@ -167,6 +174,16 @@ typedef struct hyper_executor hyper_executor; */ typedef struct hyper_headers hyper_headers; +/* + Configuration options for HTTP/1 server connections. + */ +typedef struct hyper_http1_serverconn_options hyper_http1_serverconn_options; + +/* + Configuration options for HTTP/2 server connections. + */ +typedef struct hyper_http2_serverconn_options hyper_http2_serverconn_options; + /* A read/write handle for a specific connection. */ @@ -182,6 +199,16 @@ typedef struct hyper_request hyper_request; */ typedef struct hyper_response hyper_response; +/* + A channel on which to send back a response to complete a transaction for a service. + */ +typedef struct hyper_response_channel hyper_response_channel; + +/* + A service that can serve a single server connection. + */ +typedef struct hyper_service hyper_service; + /* An async task. */ @@ -194,6 +221,11 @@ typedef struct hyper_waker hyper_waker; typedef int (*hyper_body_foreach_callback)(void*, const struct hyper_buf*); +/* + Many hyper entities can be given userdata to allow user callbacks to correlate work together. + */ +typedef void (*hyper_userdata_drop)(void*); + typedef int (*hyper_body_data_callback)(void*, struct hyper_context*, struct hyper_buf**); typedef void (*hyper_request_on_informational_callback)(void*, struct hyper_response*); @@ -204,6 +236,11 @@ typedef size_t (*hyper_io_read_callback)(void*, struct hyper_context*, uint8_t*, typedef size_t (*hyper_io_write_callback)(void*, struct hyper_context*, const uint8_t*, size_t); +/* + The main definition of a service. This callback will be invoked for each transaction on the + */ +typedef void (*hyper_service_callback)(void*, struct hyper_request*, struct hyper_response_channel*); + #ifdef __cplusplus extern "C" { #endif // __cplusplus @@ -233,12 +270,13 @@ struct hyper_task *hyper_body_data(struct hyper_body *body); */ struct hyper_task *hyper_body_foreach(struct hyper_body *body, hyper_body_foreach_callback func, - void *userdata); + void *userdata, + hyper_userdata_drop drop); /* Set userdata on this body, which will be passed to callback functions. */ -void hyper_body_set_userdata(struct hyper_body *body, void *userdata); +void hyper_body_set_userdata(struct hyper_body *body, void *userdata, hyper_userdata_drop drop); /* Set the outgoing data callback for this body. @@ -352,6 +390,13 @@ enum hyper_code hyper_request_set_method(struct hyper_request *req, const uint8_t *method, size_t method_len); +/* + Get the HTTP Method of the request. + */ +enum hyper_code hyper_request_method(const struct hyper_request *req, + uint8_t *method, + size_t *method_len); + /* Set the URI of the request. */ @@ -370,11 +415,27 @@ enum hyper_code hyper_request_set_uri_parts(struct hyper_request *req, const uint8_t *path_and_query, size_t path_and_query_len); +/* + Get the URI of the request split into scheme, authority and path/query strings. + */ +enum hyper_code hyper_request_uri_parts(const struct hyper_request *req, + uint8_t *scheme, + size_t *scheme_len, + uint8_t *authority, + size_t *authority_len, + uint8_t *path_and_query, + size_t *path_and_query_len); + /* Set the preferred HTTP version of the request. */ enum hyper_code hyper_request_set_version(struct hyper_request *req, int version); +/* + Get the HTTP version used by this request. + */ +int hyper_request_version(const struct hyper_request *resp); + /* Gets a mutable reference to the HTTP headers of this request */ @@ -385,12 +446,23 @@ struct hyper_headers *hyper_request_headers(struct hyper_request *req); */ enum hyper_code hyper_request_set_body(struct hyper_request *req, struct hyper_body *body); +/* + Take ownership of the body of this request. + */ +struct hyper_body *hyper_request_body(struct hyper_request *req); + /* Set an informational (1xx) response callback. */ enum hyper_code hyper_request_on_informational(struct hyper_request *req, hyper_request_on_informational_callback callback, - void *data); + void *data, + hyper_userdata_drop drop); + +/* + Construct a new HTTP 200 Ok response + */ +struct hyper_response *hyper_response_new(void); /* Free an HTTP response. @@ -402,6 +474,11 @@ void hyper_response_free(struct hyper_response *resp); */ uint16_t hyper_response_status(const struct hyper_response *resp); +/* + Set the HTTP Status-Code of this response. + */ +void hyper_response_set_status(struct hyper_response *resp, uint16_t status); + /* Get a pointer to the reason-phrase of this response. */ @@ -412,6 +489,11 @@ const uint8_t *hyper_response_reason_phrase(const struct hyper_response *resp); */ size_t hyper_response_reason_phrase_len(const struct hyper_response *resp); +/* + Set the preferred HTTP version of the response. + */ +enum hyper_code hyper_response_set_version(struct hyper_response *req, int version); + /* Get the HTTP version used by this response. */ @@ -422,6 +504,11 @@ int hyper_response_version(const struct hyper_response *resp); */ struct hyper_headers *hyper_response_headers(struct hyper_response *resp); +/* + Set the body of the response. + */ +enum hyper_code hyper_response_set_body(struct hyper_response *rsp, struct hyper_body *body); + /* Take ownership of the body of this response. */ @@ -465,7 +552,12 @@ void hyper_io_free(struct hyper_io *io); /* Set the user data pointer for this IO to some value. */ -void hyper_io_set_userdata(struct hyper_io *io, void *data); +void hyper_io_set_userdata(struct hyper_io *io, void *data, hyper_userdata_drop drop_func); + +/* + Get the user data pointer for this IO value. + */ +void *hyper_io_get_userdata(struct hyper_io *io); /* Set the read function for this IO transport. @@ -477,6 +569,178 @@ void hyper_io_set_read(struct hyper_io *io, hyper_io_read_callback func); */ void hyper_io_set_write(struct hyper_io *io, hyper_io_write_callback func); +/* + Create a new HTTP/1 serverconn options object. + */ +struct hyper_http1_serverconn_options *hyper_http1_serverconn_options_new(const struct hyper_executor *exec); + +/* + Free a `hyper_http1_serverconn_options*`. + */ +void hyper_http1_serverconn_options_free(struct hyper_http1_serverconn_options *opts); + +/* + Set whether HTTP/1 connections should support half-closures. + */ +enum hyper_code hyper_http1_serverconn_options_half_close(struct hyper_http1_serverconn_options *opts, + bool enabled); + +/* + Enables or disables HTTP/1 keep-alive. + */ +enum hyper_code hyper_http1_serverconn_options_keep_alive(struct hyper_http1_serverconn_options *opts, + bool enabled); + +/* + Set whether HTTP/1 connections will write header names as title case at the socket level. + */ +enum hyper_code hyper_http1_serverconn_options_title_case_headers(struct hyper_http1_serverconn_options *opts, + bool enabled); + +/* + Set whether to support preserving original header cases. + */ +enum hyper_code hyper_http1_serverconn_options_preserve_header_case(struct hyper_http1_serverconn_options *opts, + bool enabled); + +/* + Set a timeout for reading client request headers. If a client does not + */ +enum hyper_code hyper_http1_serverconn_options_header_read_timeout(struct hyper_http1_serverconn_options *opts, + uint64_t millis); + +/* + Set whether HTTP/1 connections should try to use vectored writes, or always flatten into a + */ +enum hyper_code hyper_http1_serverconn_options_writev(struct hyper_http1_serverconn_options *opts, + bool enabled); + +/* + Set the maximum buffer size for the HTTP/1 connection. Must be no lower `8192`. + */ +enum hyper_code hyper_http1_serverconn_options_max_buf_size(struct hyper_http1_serverconn_options *opts, + uintptr_t max_buf_size); + +/* + Aggregates flushes to better support pipelined responses. + */ +enum hyper_code hyper_http1_serverconn_options_pipeline_flush(struct hyper_http1_serverconn_options *opts, + bool enabled); + +/* + Create a new HTTP/2 serverconn options object bound to the provided executor. + */ +struct hyper_http2_serverconn_options *hyper_http2_serverconn_options_new(const struct hyper_executor *exec); + +/* + Free a `hyper_http2_serverconn_options*`. + */ +void hyper_http2_serverconn_options_free(struct hyper_http2_serverconn_options *opts); + +/* + Sets the `SETTINGS_INITIAL_WINDOW_SIZE` option for HTTP/2 stream-level flow control. + */ +enum hyper_code hyper_http2_serverconn_options_initial_stream_window_size(struct hyper_http2_serverconn_options *opts, + unsigned int window_size); + +/* + Sets the max connection-level flow control for HTTP/2. + */ +enum hyper_code hyper_http2_serverconn_options_initial_connection_window_size(struct hyper_http2_serverconn_options *opts, + unsigned int window_size); + +/* + Sets whether to use an adaptive flow control. + */ +enum hyper_code hyper_http2_serverconn_options_adaptive_window(struct hyper_http2_serverconn_options *opts, + bool enabled); + +/* + Sets the maximum frame size to use for HTTP/2. + */ +enum hyper_code hyper_http2_serverconn_options_max_frame_size(struct hyper_http2_serverconn_options *opts, + unsigned int frame_size); + +/* + Sets the `SETTINGS_MAX_CONCURRENT_STREAMS` option for HTTP2 connections. + */ +enum hyper_code hyper_http2_serverconn_options_max_concurrent_streams(struct hyper_http2_serverconn_options *opts, + unsigned int max_streams); + +/* + Sets an interval for HTTP/2 Ping frames should be sent to keep a connection alive. + */ +enum hyper_code hyper_http2_serverconn_options_keep_alive_interval(struct hyper_http2_serverconn_options *opts, + uint64_t interval_seconds); + +/* + Sets a timeout for receiving an acknowledgement of the keep-alive ping. + */ +enum hyper_code hyper_http2_serverconn_options_keep_alive_timeout(struct hyper_http2_serverconn_options *opts, + uint64_t timeout_seconds); + +/* + Set the maximum write buffer size for each HTTP/2 stream. Must be no larger than + */ +enum hyper_code hyper_http2_serverconn_options_max_send_buf_size(struct hyper_http2_serverconn_options *opts, + uintptr_t max_buf_size); + +/* + Enables the extended `CONNECT` protocol. + */ +enum hyper_code hyper_http2_serverconn_options_enable_connect_protocol(struct hyper_http2_serverconn_options *opts); + +/* + Sets the max size of received header frames. + */ +enum hyper_code hyper_http2_serverconn_options_max_header_list_size(struct hyper_http2_serverconn_options *opts, + uint32_t max); + +/* + Create a service from a wrapped callback function. + */ +struct hyper_service *hyper_service_new(hyper_service_callback service_fn); + +/* + Register opaque userdata with the `hyper_service`. This userdata must be `Send` in a rust + */ +void hyper_service_set_userdata(struct hyper_service *service, + void *userdata, + hyper_userdata_drop drop); + +/* + Frees a hyper_service object if no longer needed + */ +void hyper_service_free(struct hyper_service *service); + +/* + Serve the provided `hyper_service *` as an HTTP/1 endpoint over the provided `hyper_io *` + */ +struct hyper_task *hyper_serve_http1_connection(struct hyper_http1_serverconn_options *serverconn_options, + struct hyper_io *io, + struct hyper_service *service); + +/* + Serve the provided `hyper_service *` as an HTTP/2 endpoint over the provided `hyper_io *` + */ +struct hyper_task *hyper_serve_http2_connection(struct hyper_http2_serverconn_options *serverconn_options, + struct hyper_io *io, + struct hyper_service *service); + +/* + Serve the provided `hyper_service *` as either an HTTP/1 or HTTP/2 (depending on what the + */ +struct hyper_task *hyper_serve_httpX_connection(struct hyper_http1_serverconn_options *http1_serverconn_options, + struct hyper_http2_serverconn_options *http2_serverconn_options, + struct hyper_io *io, + struct hyper_service *service); + +/* + Sends a `hyper_response*` back to the client. This function consumes the response and the + */ +void hyper_response_channel_send(struct hyper_response_channel *channel, + struct hyper_response *response); + /* Creates a new task executor. */ @@ -497,6 +761,11 @@ enum hyper_code hyper_executor_push(const struct hyper_executor *exec, struct hy */ struct hyper_task *hyper_executor_poll(const struct hyper_executor *exec); +/* + Returns the time until the executor will be able to make progress on tasks due to internal + */ +int hyper_executor_next_timer_pop(const struct hyper_executor *exec); + /* Free a task. */ @@ -515,7 +784,7 @@ enum hyper_task_return_type hyper_task_type(struct hyper_task *task); /* Set a user data pointer to be associated with this task. */ -void hyper_task_set_userdata(struct hyper_task *task, void *userdata); +void hyper_task_set_userdata(struct hyper_task *task, void *userdata, hyper_userdata_drop drop); /* Retrieve the userdata that has been set via `hyper_task_set_userdata`. @@ -540,5 +809,3 @@ void hyper_waker_wake(struct hyper_waker *waker); #ifdef __cplusplus } // extern "C" #endif // __cplusplus - -#endif /* _HYPER_H */ diff --git a/src/ext/mod.rs b/src/ext/mod.rs index 1235202291..a6fc3729e3 100644 --- a/src/ext/mod.rs +++ b/src/ext/mod.rs @@ -198,12 +198,12 @@ impl OriginalHeaderOrder { /// in the original order received. /// /// # Examples - /// ```no_run + /// ```ignore /// use hyper::ext::OriginalHeaderOrder; /// use hyper::header::{HeaderName, HeaderValue, HeaderMap}; /// /// let mut h_order = OriginalHeaderOrder::default(); - /// let mut h_map = Headermap::new(); + /// let mut h_map = HeaderMap::new(); /// /// let name1 = b"Set-CookiE"; /// let value1 = b"a=b"; @@ -218,9 +218,9 @@ impl OriginalHeaderOrder { /// let name3 = b"SET-COOKIE"; /// let value3 = b"c=d"; /// h_map.append(name3, value3); - /// h_order.append(name3) + /// h_order.append(name3); /// - /// let mut iter = h_order.get_in_order() + /// let mut iter = h_order.get_in_order(); /// /// let (name, idx) = iter.next(); /// assert_eq!(b"a=b", h_map.get_all(name).nth(idx).unwrap()); diff --git a/src/ffi/body.rs b/src/ffi/body.rs index c412f219f6..375fb22ca7 100644 --- a/src/ffi/body.rs +++ b/src/ffi/body.rs @@ -7,7 +7,8 @@ use http_body_util::BodyExt as _; use libc::{c_int, size_t}; use super::task::{hyper_context, hyper_task, hyper_task_return_type, AsTaskType}; -use super::{UserDataPointer, HYPER_ITER_CONTINUE}; +use super::userdata::{hyper_userdata_drop, Userdata}; +use super::HYPER_ITER_CONTINUE; use crate::body::{Bytes, Frame, Incoming as IncomingBody}; /// A streaming HTTP body. @@ -44,11 +45,11 @@ pub struct hyper_body(pub(super) IncomingBody); /// - hyper_buf_copy: Create a new hyper_buf * by copying the provided bytes. /// - hyper_buf_free: Free this buffer. /// - hyper_buf_len: Get the length of the bytes this buffer contains. -pub struct hyper_buf(pub(crate) Bytes); +pub struct hyper_buf(pub(super) Bytes); pub(crate) struct UserBody { data_func: hyper_body_data_callback, - userdata: *mut c_void, + userdata: Userdata, } // ===== Body ===== @@ -145,16 +146,16 @@ ffi_fn! { /// consumed. /// /// This will consume the `hyper_body *`, you shouldn't use it anymore or free it. - fn hyper_body_foreach(body: *mut hyper_body, func: hyper_body_foreach_callback, userdata: *mut c_void) -> *mut hyper_task { + fn hyper_body_foreach(body: *mut hyper_body, func: hyper_body_foreach_callback, userdata: *mut c_void, drop: hyper_userdata_drop) -> *mut hyper_task { let mut body = non_null!(Box::from_raw(body) ?= ptr::null_mut()); - let userdata = UserDataPointer(userdata); + let userdata = Userdata::new(userdata, drop); Box::into_raw(hyper_task::boxed(async move { let _ = &userdata; while let Some(item) = body.0.frame().await { let frame = item?; if let Ok(chunk) = frame.into_data() { - if HYPER_ITER_CONTINUE != func(userdata.0, &hyper_buf(chunk)) { + if HYPER_ITER_CONTINUE != func(userdata.as_ptr(), &hyper_buf(chunk)) { return Err(crate::Error::new_user_aborted_by_callback()); } } @@ -166,9 +167,9 @@ ffi_fn! { ffi_fn! { /// Set userdata on this body, which will be passed to callback functions. - fn hyper_body_set_userdata(body: *mut hyper_body, userdata: *mut c_void) { + fn hyper_body_set_userdata(body: *mut hyper_body, userdata: *mut c_void, drop: hyper_userdata_drop) { let b = non_null!(&mut *body ?= ()); - b.0.as_ffi_mut().userdata = userdata; + b.0.as_ffi_mut().userdata = Userdata::new(userdata, drop); } } @@ -204,7 +205,7 @@ impl UserBody { pub(crate) fn new() -> UserBody { UserBody { data_func: data_noop, - userdata: std::ptr::null_mut(), + userdata: Userdata::default(), } } @@ -213,7 +214,7 @@ impl UserBody { cx: &mut Context<'_>, ) -> Poll>>> { let mut out = std::ptr::null_mut(); - match (self.data_func)(self.userdata, hyper_context::wrap(cx), &mut out) { + match (self.data_func)(self.userdata.as_ptr(), hyper_context::wrap(cx), &mut out) { super::task::HYPER_POLL_READY => { if out.is_null() { Poll::Ready(None) diff --git a/src/ffi/client.rs b/src/ffi/client.rs index 975314b9be..cbd7a03bf1 100644 --- a/src/ffi/client.rs +++ b/src/ffi/client.rs @@ -1,3 +1,4 @@ +use std::pin::Pin; use std::ptr; use std::sync::Arc; @@ -108,7 +109,7 @@ ffi_fn! { .map(|(tx, conn)| { options.exec.execute(Box::pin(async move { let _ = conn.await; - })); + }) as Pin>); hyper_clientconn { tx: Tx::Http2(tx) } }); } @@ -123,7 +124,7 @@ ffi_fn! { .map(|(tx, conn)| { options.exec.execute(Box::pin(async move { let _ = conn.await; - })); + }) as Pin>); hyper_clientconn { tx: Tx::Http1(tx) } }) })) @@ -143,18 +144,18 @@ ffi_fn! { /// `hyper_task_free`, or taken ownership of by `hyper_executor_push` /// without subsequently being given back by `hyper_executor_poll`. fn hyper_clientconn_send(conn: *mut hyper_clientconn, req: *mut hyper_request) -> *mut hyper_task { - let mut req = non_null! { Box::from_raw(req) ?= ptr::null_mut() }; + let req = non_null! { Box::from_raw(req) ?= ptr::null_mut() }; // Update request with original-case map of headers - req.finalize_request(); + let req = (*req).finalize(); let fut = match non_null! { &mut *conn ?= ptr::null_mut() }.tx { - Tx::Http1(ref mut tx) => futures_util::future::Either::Left(tx.send_request(req.0)), - Tx::Http2(ref mut tx) => futures_util::future::Either::Right(tx.send_request(req.0)), + Tx::Http1(ref mut tx) => futures_util::future::Either::Left(tx.send_request(req)), + Tx::Http2(ref mut tx) => futures_util::future::Either::Right(tx.send_request(req)), }; let fut = async move { - fut.await.map(hyper_response::wrap) + fut.await.map(hyper_response::from) }; Box::into_raw(hyper_task::boxed(fut)) diff --git a/src/ffi/error.rs b/src/ffi/error.rs index b103b2f053..db24422722 100644 --- a/src/ffi/error.rs +++ b/src/ffi/error.rs @@ -33,6 +33,8 @@ pub enum hyper_code { HYPERE_FEATURE_NOT_ENABLED, /// The peer sent an HTTP message that could not be parsed. HYPERE_INVALID_PEER_MESSAGE, + /// A provided buffer is too small to hold the value that would be written to it. + HYPERE_INSUFFICIENT_SPACE, } // ===== impl hyper_error ===== diff --git a/src/ffi/http_types.rs b/src/ffi/http_types.rs index 2779cb191f..0fdf3d12b6 100644 --- a/src/ffi/http_types.rs +++ b/src/ffi/http_types.rs @@ -1,11 +1,13 @@ use bytes::Bytes; use libc::{c_int, size_t}; use std::ffi::c_void; +use std::sync::Arc; use super::body::hyper_body; use super::error::hyper_code; use super::task::{hyper_task_return_type, AsTaskType}; -use super::{UserDataPointer, HYPER_ITER_CONTINUE}; +use super::userdata::{hyper_userdata_drop, Userdata}; +use super::HYPER_ITER_CONTINUE; use crate::body::Incoming as IncomingBody; use crate::ext::{HeaderCaseMap, OriginalHeaderOrder, ReasonPhrase}; use crate::header::{HeaderName, HeaderValue}; @@ -27,12 +29,12 @@ use crate::{HeaderMap, Method, Request, Response, Uri}; /// - hyper_request_set_version: Set the preferred HTTP version of the request. /// - hyper_request_on_informational: Set an informational (1xx) response callback. /// - hyper_request_free: Free an HTTP request. -pub struct hyper_request(pub(super) Request); +pub struct hyper_request(Request); /// An HTTP response. /// /// Obtain one of these by making a request with `hyper_clientconn_send`, then -/// polling the executor unntil you get a `hyper_task` of type +/// polling the executor until you get a `hyper_task` of type /// `HYPER_TASK_RESPONSE`. To figure out which request this response /// corresponds to, check the userdata of the task, which you should /// previously have set to an application-specific identifier for the @@ -47,7 +49,7 @@ pub struct hyper_request(pub(super) Request); /// - hyper_response_headers: Gets a reference to the HTTP headers of this response. /// - hyper_response_body: Take ownership of the body of this response. /// - hyper_response_free: Free an HTTP response. -pub struct hyper_response(pub(super) Response); +pub struct hyper_response(Response); /// An HTTP header map. /// @@ -71,7 +73,7 @@ pub struct hyper_headers { #[derive(Clone)] pub(crate) struct OnInformational { func: hyper_request_on_informational_callback, - data: UserDataPointer, + userdata: Arc, } type hyper_request_on_informational_callback = extern "C" fn(*mut c_void, *mut hyper_response); @@ -87,7 +89,7 @@ ffi_fn! { /// To avoid a memory leak, the request must eventually be consumed by /// `hyper_request_free` or `hyper_clientconn_send`. fn hyper_request_new() -> *mut hyper_request { - Box::into_raw(Box::new(hyper_request(Request::new(IncomingBody::empty())))) + Box::into_raw(Box::new(hyper_request::from(Request::new(IncomingBody::empty())))) } ?= std::ptr::null_mut() } @@ -120,6 +122,30 @@ ffi_fn! { } } +ffi_fn! { + /// Get the HTTP Method of the request. + /// + /// `method` must be a pointer to a buffer that this function will populate with the HTTP + /// method of the request. The `header_len` argument must be a pointer to a `size_t` which, on + /// call, is populated with the maximum length of the `method` buffer and, on successful + /// response, will be set to the actual length of the value written into the buffer. + fn hyper_request_method(req: *const hyper_request, method: *mut u8, method_len: *mut size_t) -> hyper_code { + let req = non_null!(&*req ?= hyper_code::HYPERE_INVALID_ARG); + if method.is_null() { + return hyper_code::HYPERE_INVALID_ARG; + } + let req_method_str = req.0.method().as_str(); + unsafe { + if non_null!(*method_len ?= hyper_code::HYPERE_INVALID_ARG) < req_method_str.len() { + return hyper_code::HYPERE_INSUFFICIENT_SPACE; + } + std::ptr::copy_nonoverlapping(req_method_str.as_ptr(), method, req_method_str.len()); + *method_len = req_method_str.len(); + } + hyper_code::HYPERE_OK + } +} + ffi_fn! { /// Set the URI of the request. /// @@ -200,6 +226,74 @@ ffi_fn! { } } +ffi_fn! { + /// Get the URI of the request split into scheme, authority and path/query strings. + /// + /// Each of `scheme`, `authority` and `path_and_query` may be pointers to buffers that this + /// function will populate with the appropriate values from the request. If one of these + /// pointers is non-NULL then the associated `_len` field must be a pointer to a `size_t` + /// which, on call, is populated with the maximum length of the buffer and, on successful + /// response, will be set to the actual length of the value written into the buffer. + /// + /// If a buffer is passed as `NULL` then the `_len` field will be ignored and that component + /// will be skipped. + /// + /// This function may fail with `HYPERE_INSUFFICIENT_SPACE` if one of the provided buffers is + /// not long enough to hold the value from the request. + fn hyper_request_uri_parts( + req: *const hyper_request, + scheme: *mut u8, + scheme_len: *mut size_t, + authority: *mut u8, + authority_len: *mut size_t, + path_and_query: *mut u8, + path_and_query_len: *mut size_t + ) -> hyper_code { + let req = non_null!(&*req ?= hyper_code::HYPERE_INVALID_ARG); + let uri = req.0.uri(); + if !scheme.is_null() { + let req_scheme_str = match uri.scheme() { + Some(s) => s.as_str(), + None => "", + }; + unsafe { + if non_null!(*scheme_len ?= hyper_code::HYPERE_INVALID_ARG) < req_scheme_str.len() { + return hyper_code::HYPERE_INSUFFICIENT_SPACE; + } + std::ptr::copy_nonoverlapping(req_scheme_str.as_ptr(), scheme, req_scheme_str.len()); + *scheme_len = req_scheme_str.len(); + } + } + if !authority.is_null() { + let req_authority_str = match uri.authority() { + Some(s) => s.as_str(), + None => "", + }; + unsafe { + if non_null!(*authority_len ?= hyper_code::HYPERE_INVALID_ARG) < req_authority_str.len() { + return hyper_code::HYPERE_INSUFFICIENT_SPACE; + } + std::ptr::copy_nonoverlapping(req_authority_str.as_ptr(), authority, req_authority_str.len()); + *authority_len = req_authority_str.len(); + } + } + if !path_and_query.is_null() { + let req_path_and_query_str = match uri.path_and_query() { + Some(s) => s.as_str(), + None => "", + }; + unsafe { + if non_null!(*path_and_query_len ?= hyper_code::HYPERE_INVALID_ARG) < req_path_and_query_str.len() { + return hyper_code::HYPERE_INSUFFICIENT_SPACE; + } + std::ptr::copy_nonoverlapping(req_path_and_query_str.as_ptr(), path_and_query, req_path_and_query_str.len()); + *path_and_query_len = req_path_and_query_str.len(); + } + } + hyper_code::HYPERE_OK + } +} + ffi_fn! { /// Set the preferred HTTP version of the request. /// @@ -225,6 +319,27 @@ ffi_fn! { } } +ffi_fn! { + /// Get the HTTP version used by this request. + /// + /// The returned value could be: + /// + /// - `HYPER_HTTP_VERSION_1_0` + /// - `HYPER_HTTP_VERSION_1_1` + /// - `HYPER_HTTP_VERSION_2` + /// - `HYPER_HTTP_VERSION_NONE` if newer (or older). + fn hyper_request_version(resp: *const hyper_request) -> c_int { + use http::Version; + + match non_null!(&*resp ?= 0).0.version() { + Version::HTTP_10 => super::HYPER_HTTP_VERSION_1_0, + Version::HTTP_11 => super::HYPER_HTTP_VERSION_1_1, + Version::HTTP_2 => super::HYPER_HTTP_VERSION_2, + _ => super::HYPER_HTTP_VERSION_NONE, + } + } +} + ffi_fn! { /// Gets a mutable reference to the HTTP headers of this request /// @@ -250,6 +365,16 @@ ffi_fn! { } } +ffi_fn! { + /// Take ownership of the body of this request. + /// + /// It is safe to free the request even after taking ownership of its body. + fn hyper_request_body(req: *mut hyper_request) -> *mut hyper_body { + let body = std::mem::replace(non_null!(&mut *req ?= std::ptr::null_mut()).0.body_mut(), IncomingBody::empty()); + Box::into_raw(Box::new(hyper_body(body))) + } ?= std::ptr::null_mut() +} + ffi_fn! { /// Set an informational (1xx) response callback. /// @@ -266,10 +391,10 @@ ffi_fn! { /// NOTE: The `hyper_response *` is just borrowed data, and will not /// be valid after the callback finishes. You must copy any data you wish /// to persist. - fn hyper_request_on_informational(req: *mut hyper_request, callback: hyper_request_on_informational_callback, data: *mut c_void) -> hyper_code { + fn hyper_request_on_informational(req: *mut hyper_request, callback: hyper_request_on_informational_callback, data: *mut c_void, drop: hyper_userdata_drop) -> hyper_code { let ext = OnInformational { func: callback, - data: UserDataPointer(data), + userdata: Arc::new(Userdata::new(data, drop)), }; let req = non_null!(&mut *req ?= hyper_code::HYPERE_INVALID_ARG); req.0.extensions_mut().insert(ext); @@ -278,17 +403,46 @@ ffi_fn! { } impl hyper_request { - pub(super) fn finalize_request(&mut self) { + pub(super) fn finalize(mut self) -> Request { if let Some(headers) = self.0.extensions_mut().remove::() { *self.0.headers_mut() = headers.headers; self.0.extensions_mut().insert(headers.orig_casing); self.0.extensions_mut().insert(headers.orig_order); } + self.0 + } +} + +impl From> for hyper_request { + fn from(mut req: Request) -> Self { + let headers = std::mem::take(req.headers_mut()); + let orig_casing = req + .extensions_mut() + .remove::() + .unwrap_or_else(HeaderCaseMap::default); + let orig_order = req + .extensions_mut() + .remove::() + .unwrap_or_else(OriginalHeaderOrder::default); + req.extensions_mut().insert(hyper_headers { + headers, + orig_casing, + orig_order, + }); + + hyper_request(req) } } // ===== impl hyper_response ===== +ffi_fn! { + /// Construct a new HTTP 200 Ok response + fn hyper_response_new() -> *mut hyper_response { + Box::into_raw(Box::new(hyper_response(Response::new(IncomingBody::empty())))) + } ?= std::ptr::null_mut() +} + ffi_fn! { /// Free an HTTP response. /// @@ -307,6 +461,14 @@ ffi_fn! { } } +ffi_fn! { + /// Set the HTTP Status-Code of this response. + fn hyper_response_set_status(resp: *mut hyper_response, status: u16) { + let status = crate::StatusCode::from_u16(status).unwrap(); + *non_null!(&mut *resp ?= ()).0.status_mut() = status; + } +} + ffi_fn! { /// Get a pointer to the reason-phrase of this response. /// @@ -331,6 +493,31 @@ ffi_fn! { } } +ffi_fn! { + /// Set the preferred HTTP version of the response. + /// + /// The version value should be one of the `HYPER_HTTP_VERSION_` constants. + /// + /// Note that this won't change the major HTTP version of the connection, + /// since that is determined at the handshake step. + fn hyper_response_set_version(req: *mut hyper_response, version: c_int) -> hyper_code { + use http::Version; + + let req = non_null!(&mut *req ?= hyper_code::HYPERE_INVALID_ARG); + *req.0.version_mut() = match version { + super::HYPER_HTTP_VERSION_NONE => Version::HTTP_11, + super::HYPER_HTTP_VERSION_1_0 => Version::HTTP_10, + super::HYPER_HTTP_VERSION_1_1 => Version::HTTP_11, + super::HYPER_HTTP_VERSION_2 => Version::HTTP_2, + _ => { + // We don't know this version + return hyper_code::HYPERE_INVALID_ARG; + } + }; + hyper_code::HYPERE_OK + } +} + ffi_fn! { /// Get the HTTP version used by this response. /// @@ -362,6 +549,21 @@ ffi_fn! { } ?= std::ptr::null_mut() } +ffi_fn! { + /// Set the body of the response. + /// + /// The default is an empty body. + /// + /// This takes ownership of the `hyper_body *`, you must not use it or + /// free it after setting it on the request. + fn hyper_response_set_body(rsp: *mut hyper_response, body: *mut hyper_body) -> hyper_code { + let body = non_null!(Box::from_raw(body) ?= hyper_code::HYPERE_INVALID_ARG); + let rsp = non_null!(&mut *rsp ?= hyper_code::HYPERE_INVALID_ARG); + *rsp.0.body_mut() = body.0; + hyper_code::HYPERE_OK + } +} + ffi_fn! { /// Take ownership of the body of this response. /// @@ -376,35 +578,46 @@ ffi_fn! { } impl hyper_response { - pub(super) fn wrap(mut resp: Response) -> hyper_response { - let headers = std::mem::take(resp.headers_mut()); - let orig_casing = resp + fn reason_phrase(&self) -> &[u8] { + if let Some(reason) = self.0.extensions().get::() { + return reason.as_bytes(); + } + + if let Some(reason) = self.0.status().canonical_reason() { + return reason.as_bytes(); + } + + &[] + } + + pub(super) fn finalize(mut self) -> Response { + if let Some(headers) = self.0.extensions_mut().remove::() { + *self.0.headers_mut() = headers.headers; + self.0.extensions_mut().insert(headers.orig_casing); + self.0.extensions_mut().insert(headers.orig_order); + } + self.0 + } +} + +impl From> for hyper_response { + fn from(mut rsp: Response) -> Self { + let headers = std::mem::take(rsp.headers_mut()); + let orig_casing = rsp .extensions_mut() .remove::() .unwrap_or_else(HeaderCaseMap::default); - let orig_order = resp + let orig_order = rsp .extensions_mut() .remove::() .unwrap_or_else(OriginalHeaderOrder::default); - resp.extensions_mut().insert(hyper_headers { + rsp.extensions_mut().insert(hyper_headers { headers, orig_casing, orig_order, }); - hyper_response(resp) - } - - fn reason_phrase(&self) -> &[u8] { - if let Some(reason) = self.0.extensions().get::() { - return reason.as_bytes(); - } - - if let Some(reason) = self.0.status().canonical_reason() { - return reason.as_bytes(); - } - - &[] + hyper_response(rsp) } } @@ -568,8 +781,8 @@ unsafe fn raw_name_value( impl OnInformational { pub(crate) fn call(&mut self, resp: Response) { - let mut resp = hyper_response::wrap(resp); - (self.func)(self.data.0, &mut resp); + let mut resp = hyper_response::from(resp); + (self.func)(self.userdata.as_ptr(), &mut resp); } } diff --git a/src/ffi/io.rs b/src/ffi/io.rs index 1bf9aa7a97..c3fd22f990 100644 --- a/src/ffi/io.rs +++ b/src/ffi/io.rs @@ -6,6 +6,7 @@ use crate::rt::{Read, Write}; use libc::size_t; use super::task::hyper_context; +use super::userdata::{hyper_userdata_drop, Userdata}; /// Sentinel value to return from a read or write callback that the operation /// is pending. @@ -36,7 +37,7 @@ type hyper_io_write_callback = pub struct hyper_io { read: hyper_io_read_callback, write: hyper_io_write_callback, - userdata: *mut c_void, + userdata: Userdata, } ffi_fn! { @@ -56,7 +57,7 @@ ffi_fn! { Box::into_raw(Box::new(hyper_io { read: read_noop, write: write_noop, - userdata: std::ptr::null_mut(), + userdata: Userdata::default(), })) } ?= std::ptr::null_mut() } @@ -75,8 +76,28 @@ ffi_fn! { /// Set the user data pointer for this IO to some value. /// /// This value is passed as an argument to the read and write callbacks. - fn hyper_io_set_userdata(io: *mut hyper_io, data: *mut c_void) { - non_null!(&mut *io ?= ()).userdata = data; + /// + /// If passed, the `drop_func` will be called on the `userdata` when the + /// `hyper_io` is destroyed (either explicitly by `hyper_io_free` or + /// implicitly by an associated hyper task completing). + fn hyper_io_set_userdata( + io: *mut hyper_io, + data: *mut c_void, + drop_func: hyper_userdata_drop, + ) { + let io = non_null!(&mut *io? = ()); + io.userdata = Userdata::new(data, drop_func); + } +} + +ffi_fn! { + /// Get the user data pointer for this IO value. + /// + /// The userdata is still owned by the IO so must be treated as "borrowed" + /// + /// Returns NULL if no userdata has been set. + fn hyper_io_get_userdata(io: *mut hyper_io) -> *mut c_void { + non_null!(&mut *io ?= std::ptr::null_mut()).userdata.as_ptr() } } @@ -151,7 +172,12 @@ impl Read for hyper_io { let buf_ptr = unsafe { buf.as_mut() }.as_mut_ptr() as *mut u8; let buf_len = buf.remaining(); - match (self.read)(self.userdata, hyper_context::wrap(cx), buf_ptr, buf_len) { + match (self.read)( + self.userdata.as_ptr(), + hyper_context::wrap(cx), + buf_ptr, + buf_len, + ) { HYPER_IO_PENDING => Poll::Pending, HYPER_IO_ERROR => Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::Other, @@ -176,7 +202,12 @@ impl Write for hyper_io { let buf_ptr = buf.as_ptr(); let buf_len = buf.len(); - match (self.write)(self.userdata, hyper_context::wrap(cx), buf_ptr, buf_len) { + match (self.write)( + self.userdata.as_ptr(), + hyper_context::wrap(cx), + buf_ptr, + buf_len, + ) { HYPER_IO_PENDING => Poll::Pending, HYPER_IO_ERROR => Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::Other, diff --git a/src/ffi/macros.rs b/src/ffi/macros.rs index 022711baaa..b8937b0aa2 100644 --- a/src/ffi/macros.rs +++ b/src/ffi/macros.rs @@ -1,5 +1,5 @@ macro_rules! ffi_fn { - ($(#[$doc:meta])* fn $name:ident($($arg:ident: $arg_ty:ty),*) -> $ret:ty $body:block ?= $default:expr) => { + ($(#[$doc:meta])* fn $name:ident($($arg:ident: $arg_ty:ty),* $(,)? ) -> $ret:ty $body:block ?= $default:expr) => { $(#[$doc])* #[no_mangle] pub extern fn $name($($arg: $arg_ty),*) -> $ret { @@ -14,18 +14,18 @@ macro_rules! ffi_fn { } }; - ($(#[$doc:meta])* fn $name:ident($($arg:ident: $arg_ty:ty),*) -> $ret:ty $body:block) => { + ($(#[$doc:meta])* fn $name:ident($($arg:ident: $arg_ty:ty),* $(,)? ) -> $ret:ty $body:block) => { ffi_fn!($(#[$doc])* fn $name($($arg: $arg_ty),*) -> $ret $body ?= { eprintln!("panic unwind caught, aborting"); std::process::abort() }); }; - ($(#[$doc:meta])* fn $name:ident($($arg:ident: $arg_ty:ty),*) $body:block ?= $default:expr) => { + ($(#[$doc:meta])* fn $name:ident($($arg:ident: $arg_ty:ty),* $(,)? ) $body:block ?= $default:expr) => { ffi_fn!($(#[$doc])* fn $name($($arg: $arg_ty),*) -> () $body ?= $default); }; - ($(#[$doc:meta])* fn $name:ident($($arg:ident: $arg_ty:ty),*) $body:block) => { + ($(#[$doc:meta])* fn $name:ident($($arg:ident: $arg_ty:ty),* $(,)? ) $body:block) => { ffi_fn!($(#[$doc])* fn $name($($arg: $arg_ty),*) -> () $body); }; } @@ -36,7 +36,13 @@ macro_rules! non_null { if $ptr.is_null() { return $err; } - unsafe { $eval } + #[allow(unused_unsafe)] + unsafe { + $eval + } + }}; + (*$ptr:ident ?= $err:expr) => {{ + non_null!($ptr, *$ptr, $err) }}; (&*$ptr:ident ?= $err:expr) => {{ non_null!($ptr, &*$ptr, $err) diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 664b6439d6..e8a261c261 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -2,8 +2,6 @@ #![allow(non_camel_case_types)] // fmt::Debug isn't helpful on FFI types #![allow(missing_debug_implementations)] -// unreachable_pub warns `#[no_mangle] pub extern fn` in private mod. -#![allow(unreachable_pub)] //! # hyper C API //! @@ -28,14 +26,6 @@ //! RUSTFLAGS="--cfg hyper_unstable_ffi" cargo rustc --crate-type cdylib --features client,http1,http2,ffi //! ``` -// We may eventually allow the FFI to be enabled without `client` or `http1`, -// that is why we don't auto enable them as `ffi = ["client", "http1"]` in -// the `Cargo.toml`. -// -// But for now, give a clear message that this compile error is expected. -#[cfg(not(all(feature = "client", feature = "http1")))] -compile_error!("The `ffi` feature currently requires the `client` and `http1` features."); - #[cfg(not(hyper_unstable_ffi))] compile_error!( "\ @@ -52,19 +42,23 @@ mod client; mod error; mod http_types; mod io; +mod server; mod task; +mod time; +mod userdata; pub use self::body::*; pub use self::client::*; pub use self::error::*; pub use self::http_types::*; pub use self::io::*; +pub use self::server::*; pub use self::task::*; +pub use self::userdata::hyper_userdata_drop; /// Return in iter functions to continue iterating. pub const HYPER_ITER_CONTINUE: libc::c_int = 0; /// Return in iter functions to stop iterating. -#[allow(unused)] pub const HYPER_ITER_BREAK: libc::c_int = 1; /// An HTTP Version that is unspecified. @@ -76,14 +70,6 @@ pub const HYPER_HTTP_VERSION_1_1: libc::c_int = 11; /// The HTTP/2 version. pub const HYPER_HTTP_VERSION_2: libc::c_int = 20; -#[derive(Clone)] -struct UserDataPointer(*mut std::ffi::c_void); - -// We don't actually know anything about this pointer, it's up to the user -// to do the right thing. -unsafe impl Send for UserDataPointer {} -unsafe impl Sync for UserDataPointer {} - /// cbindgen:ignore static VERSION_CSTR: &str = concat!(env!("CARGO_PKG_VERSION"), "\0"); diff --git a/src/ffi/server.rs b/src/ffi/server.rs new file mode 100644 index 0000000000..61b8a7d568 --- /dev/null +++ b/src/ffi/server.rs @@ -0,0 +1,617 @@ +use std::ffi::{c_uint, c_void}; +use std::ptr; +use std::sync::Arc; +use std::task::ready; + +use crate::body::Incoming as IncomingBody; +use crate::ffi::error::hyper_code; +use crate::ffi::http_types::{hyper_request, hyper_response}; +use crate::ffi::io::hyper_io; +use crate::ffi::task::{hyper_executor, hyper_task, WeakExec}; +use crate::ffi::userdata::{hyper_userdata_drop, Userdata}; +use crate::server::conn::http1; +use crate::server::conn::http2; + +/// Configuration options for HTTP/1 server connections. +pub struct hyper_http1_serverconn_options(http1::Builder); + +/// Configuration options for HTTP/2 server connections. +pub struct hyper_http2_serverconn_options(http2::Builder); + +/// A service that can serve a single server connection. +pub struct hyper_service { + service_fn: hyper_service_callback, + userdata: Userdata, +} + +/// A channel on which to send back a response to complete a transaction for a service. +pub struct hyper_response_channel(futures_channel::oneshot::Sender>); + +/// The main definition of a service. This callback will be invoked for each transaction on the +/// connection. +/// +/// The first argument contains the userdata registered with this service. +/// +/// The second argument contains the `hyper_request` that started this transaction. This request +/// is given to the callback which should free it when it is no longer needed (see +/// [crate::ffi::hyper_request_free]). +/// +/// The third argument contains a channel on which a single `hyper_response` must be sent in order +/// to conclude the transaction. This channel is given to the callback so the sending of the +/// response can be deferred (e.g. by passing it to a different thread, or waiting until other +/// async operations have completed). +pub type hyper_service_callback = + extern "C" fn(*mut c_void, *mut hyper_request, *mut hyper_response_channel); + +// ===== impl http1_serverconn_options ===== + +ffi_fn! { + /// Create a new HTTP/1 serverconn options object. + fn hyper_http1_serverconn_options_new( + exec: *const hyper_executor + ) -> *mut hyper_http1_serverconn_options { + let exec = non_null! { Arc::from_raw(exec) ?= ptr::null_mut() }; + let mut builder = http1::Builder::new(); + builder.timer(Arc::clone(exec.timer_heap())); + std::mem::forget(exec); // We never incremented the strong count in this function so can't + // drop our Arc. + Box::into_raw(Box::new(hyper_http1_serverconn_options( + builder + ))) + } +} + +ffi_fn! { + /// Free a `hyper_http1_serverconn_options*`. + fn hyper_http1_serverconn_options_free(opts: *mut hyper_http1_serverconn_options) { + let _ = non_null! { Box::from_raw(opts) ?= () }; + } +} + +ffi_fn! { + /// Set whether HTTP/1 connections should support half-closures. + /// + /// Clients can chose to shutdown their write-side while waiting for the server to respond. + /// Setting this to true will prevent closing the connection immediately if read detects an EOF + /// in the middle of a request. + /// + /// Default is `false` + fn hyper_http1_serverconn_options_half_close( + opts: *mut hyper_http1_serverconn_options, + enabled: bool, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.half_close(enabled); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Enables or disables HTTP/1 keep-alive. + /// + /// Default is `true`. + fn hyper_http1_serverconn_options_keep_alive( + opts: *mut hyper_http1_serverconn_options, + enabled: bool, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.keep_alive(enabled); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Set whether HTTP/1 connections will write header names as title case at the socket level. + /// + /// Default is `false`. + fn hyper_http1_serverconn_options_title_case_headers( + opts: *mut hyper_http1_serverconn_options, + enabled: bool, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.title_case_headers(enabled); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Set whether to support preserving original header cases. + /// + /// Currently, this will record the original cases received, and store them in a private + /// extension on the Request. It will also look for and use such an extension in any provided + /// Response. + /// + /// Since the relevant extension is still private, there is no way to interact with the + /// original cases. The only effect this can have now is to forward the cases in a proxy-like + /// fashion. + /// + /// Default is `false`. + fn hyper_http1_serverconn_options_preserve_header_case( + opts: *mut hyper_http1_serverconn_options, + enabled: bool, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.preserve_header_case(enabled); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Set a timeout for reading client request headers. If a client does not + /// transmit the entire header within this time, the connection is closed. + /// + /// Default is to have no timeout. + fn hyper_http1_serverconn_options_header_read_timeout( + opts: *mut hyper_http1_serverconn_options, + millis: u64, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.header_read_timeout(std::time::Duration::from_millis(millis)); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Set whether HTTP/1 connections should try to use vectored writes, or always flatten into a + /// single buffer. + /// + /// Note that setting this to false may mean more copies of body data, but may also improve + /// performance when an IO transport doesn’t support vectored writes well, such as most TLS + /// implementations. + /// + /// Setting this to true will force hyper to use queued strategy which may eliminate + /// unnecessary cloning on some TLS backends. + /// + /// Default is to automatically guess which mode to use, this function overrides the heuristic. + fn hyper_http1_serverconn_options_writev( + opts: *mut hyper_http1_serverconn_options, + enabled: bool, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.writev(enabled); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Set the maximum buffer size for the HTTP/1 connection. Must be no lower `8192`. + /// + /// Default is a sensible value. + fn hyper_http1_serverconn_options_max_buf_size( + opts: *mut hyper_http1_serverconn_options, + max_buf_size: usize, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.max_buf_size(max_buf_size); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Aggregates flushes to better support pipelined responses. + /// + /// Experimental, may have bugs. + /// + /// Default is `false`. + fn hyper_http1_serverconn_options_pipeline_flush( + opts: *mut hyper_http1_serverconn_options, + enabled: bool, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.pipeline_flush(enabled); + hyper_code::HYPERE_OK + } +} + +// ===== impl hyper_http2_serverconn_options ===== + +ffi_fn! { + /// Create a new HTTP/2 serverconn options object bound to the provided executor. + fn hyper_http2_serverconn_options_new( + exec: *const hyper_executor, + ) -> *mut hyper_http2_serverconn_options { + let exec = non_null! { Arc::from_raw(exec) ?= ptr::null_mut() }; + let weak = hyper_executor::downgrade(&exec); + let mut builder = http2::Builder::new(weak.clone()); + builder.timer(Arc::clone(exec.timer_heap())); + std::mem::forget(exec); // We never incremented the strong count in this function so can't + // drop our Arc. + Box::into_raw(Box::new(hyper_http2_serverconn_options( + builder + ))) + } +} + +ffi_fn! { + /// Free a `hyper_http2_serverconn_options*`. + fn hyper_http2_serverconn_options_free(opts: *mut hyper_http2_serverconn_options) { + let _ = non_null! { Box::from_raw(opts) ?= () }; + } +} + +ffi_fn! { + /// Sets the `SETTINGS_INITIAL_WINDOW_SIZE` option for HTTP/2 stream-level flow control. + /// + /// Passing `0` instructs hyper to use a sensible default value. + fn hyper_http2_serverconn_options_initial_stream_window_size( + opts: *mut hyper_http2_serverconn_options, + window_size: c_uint, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0 + .initial_stream_window_size(if window_size == 0 { + None + } else { + Some(window_size) + }); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Sets the max connection-level flow control for HTTP/2. + /// + /// Passing `0` instructs hyper to use a sensible default value. + fn hyper_http2_serverconn_options_initial_connection_window_size( + opts: *mut hyper_http2_serverconn_options, + window_size: c_uint, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0 + .initial_connection_window_size(if window_size == 0 { + None + } else { + Some(window_size) + }); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Sets whether to use an adaptive flow control. + /// + /// Enabling this will override the limits set in http2_initial_stream_window_size and + /// http2_initial_connection_window_size. + /// + /// Default is `false`. + fn hyper_http2_serverconn_options_adaptive_window( + opts: *mut hyper_http2_serverconn_options, + enabled: bool, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.adaptive_window(enabled); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Sets the maximum frame size to use for HTTP/2. + /// + /// Passing `0` instructs hyper to use a sensible default value. + fn hyper_http2_serverconn_options_max_frame_size( + opts: *mut hyper_http2_serverconn_options, + frame_size: c_uint, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.max_frame_size(if frame_size == 0 { None } else { Some(frame_size) }); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Sets the `SETTINGS_MAX_CONCURRENT_STREAMS` option for HTTP2 connections. + /// + /// Default is no limit (`std::u32::MAX`). Passing `0` will use this default. + fn hyper_http2_serverconn_options_max_concurrent_streams( + opts: *mut hyper_http2_serverconn_options, + max_streams: c_uint, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.max_concurrent_streams(if max_streams == 0 { + None + } else { + Some(max_streams) + }); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Sets an interval for HTTP/2 Ping frames should be sent to keep a connection alive. + /// + /// Default is to not use keep-alive pings. Passing `0` will use this default. + fn hyper_http2_serverconn_options_keep_alive_interval( + opts: *mut hyper_http2_serverconn_options, + interval_seconds: u64, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.keep_alive_interval(if interval_seconds == 0 { + None + } else { + Some(std::time::Duration::from_secs(interval_seconds)) + }); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. + /// + /// If the ping is not acknowledged within the timeout, the connection will be closed. Does + /// nothing if `hyper_http2_serverconn_options_keep_alive_interval` is disabled. + /// + /// Default is 20 seconds. + fn hyper_http2_serverconn_options_keep_alive_timeout( + opts: *mut hyper_http2_serverconn_options, + timeout_seconds: u64, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.keep_alive_timeout(std::time::Duration::from_secs(timeout_seconds)); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Set the maximum write buffer size for each HTTP/2 stream. Must be no larger than + /// `u32::MAX`. + /// + /// Default is a sensible value. + fn hyper_http2_serverconn_options_max_send_buf_size( + opts: *mut hyper_http2_serverconn_options, + max_buf_size: usize, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.max_send_buf_size(max_buf_size); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Enables the extended `CONNECT` protocol. + fn hyper_http2_serverconn_options_enable_connect_protocol( + opts: *mut hyper_http2_serverconn_options, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.enable_connect_protocol(); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Sets the max size of received header frames. + /// + /// Default is a sensible value. + fn hyper_http2_serverconn_options_max_header_list_size( + opts: *mut hyper_http2_serverconn_options, + max: u32, + ) -> hyper_code { + let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; + opts.0.max_header_list_size(max); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Create a service from a wrapped callback function. + fn hyper_service_new(service_fn: hyper_service_callback) -> *mut hyper_service { + Box::into_raw(Box::new(hyper_service { + service_fn: service_fn, + userdata: Userdata::default(), + })) + } ?= ptr::null_mut() +} + +ffi_fn! { + /// Register opaque userdata with the `hyper_service`. This userdata must be `Send` in a rust + /// sense (i.e. can be passed between threads) though it doesn't have to be thread-safe (it + /// won't be accessed from multiple thread concurrently). + /// + /// The service takes ownership of the userdata and will call the `drop_userdata` callback when + /// the service task is complete. If the `drop_userdata` callback is `NULL` then the service + /// will instead borrow the userdata and forget it when the associated task is completed and + /// thus the calling code is responsible for cleaning up the userdata through some other + /// mechanism. + fn hyper_service_set_userdata(service: *mut hyper_service, userdata: *mut c_void, drop: hyper_userdata_drop) { + let s = non_null! { &mut *service ?= () }; + s.userdata = Userdata::new(userdata, drop); + } +} + +ffi_fn! { + /// Frees a hyper_service object if no longer needed + fn hyper_service_free(service: *mut hyper_service) { + drop(non_null! { Box::from_raw(service) ?= () }); + } +} + +ffi_fn! { + /// Serve the provided `hyper_service *` as an HTTP/1 endpoint over the provided `hyper_io *` + /// and configured as per the `hyper_http1_serverconn_options *`. + /// + /// Returns a `hyper_task*` which must be given to an executor to make progress. + /// + /// This function consumes the IO and Service objects and thus they should not be accessed + /// after this function is called. + fn hyper_serve_http1_connection( + serverconn_options: *mut hyper_http1_serverconn_options, + io: *mut hyper_io, + service: *mut hyper_service, + ) -> *mut hyper_task { + let serverconn_options = non_null! { &*serverconn_options ?= ptr::null_mut() }; + let io = non_null! { Box::from_raw(io) ?= ptr::null_mut() }; + let service = non_null! { Box::from_raw(service) ?= ptr::null_mut() }; + let task = hyper_task::boxed(serverconn_options.0.serve_connection(*io, *service)); + Box::into_raw(task) + } ?= ptr::null_mut() +} + +ffi_fn! { + /// Serve the provided `hyper_service *` as an HTTP/2 endpoint over the provided `hyper_io *` + /// and configured as per the `hyper_http2_serverconn_options *`. + /// + /// Returns a `hyper_task*` which must be given to an executor to make progress. + /// + /// This function consumes the IO and Service objects and thus they should not be accessed + /// after this function is called. + fn hyper_serve_http2_connection( + serverconn_options: *mut hyper_http2_serverconn_options, + io: *mut hyper_io, + service: *mut hyper_service, + ) -> *mut hyper_task { + let serverconn_options = non_null! { &*serverconn_options ?= ptr::null_mut() }; + let io = non_null! { Box::from_raw(io) ?= ptr::null_mut() }; + let service = non_null! { Box::from_raw(service) ?= ptr::null_mut() }; + let task = hyper_task::boxed(serverconn_options.0.serve_connection(*io, *service)); + Box::into_raw(task) + } ?= ptr::null_mut() +} + +ffi_fn! { + /// Serve the provided `hyper_service *` as either an HTTP/1 or HTTP/2 (depending on what the + /// client requests) endpoint over the provided `hyper_io *` and configured as per the + /// appropriate `hyper_httpX_serverconn_options *`. + /// + /// Returns a `hyper_task*` which must be given to an executor to make progress. + /// + /// This function consumes the IO and Service objects and thus they should not be accessed + /// after this function is called. + fn hyper_serve_httpX_connection( + http1_serverconn_options: *mut hyper_http1_serverconn_options, + http2_serverconn_options: *mut hyper_http2_serverconn_options, + io: *mut hyper_io, + service: *mut hyper_service, + ) -> *mut hyper_task { + let http1_serverconn_options = non_null! { &*http1_serverconn_options ?= ptr::null_mut() }; + let http2_serverconn_options = non_null! { &*http2_serverconn_options ?= ptr::null_mut() }; + let io = non_null! { Box::from_raw(io) ?= ptr::null_mut() }; + let service = non_null! { Box::from_raw(service) ?= ptr::null_mut() }; + let task = hyper_task::boxed( + AutoConnection::H1( + Some(( + http1_serverconn_options.0.serve_connection(*io, *service), + http2_serverconn_options.0.clone() + )) + ) + ); + Box::into_raw(task) + } ?= ptr::null_mut() +} + +ffi_fn! { + /// Sends a `hyper_response*` back to the client. This function consumes the response and the + /// channel. + /// + /// See [hyper_service_callback] for details. + fn hyper_response_channel_send( + channel: *mut hyper_response_channel, + response: *mut hyper_response, + ) { + let channel = non_null! { Box::from_raw(channel) ?= () }; + let response = non_null! { Box::from_raw(response) ?= () }; + let _ = channel.0.send(response); + } +} + +impl crate::service::Service> for hyper_service { + type Response = crate::Response; + type Error = crate::Error; + type Future = std::pin::Pin< + Box> + Send>, + >; + + fn call(&self, req: crate::Request) -> Self::Future { + let req_ptr = Box::into_raw(Box::new(hyper_request::from(req))); + + let (tx, rx) = futures_channel::oneshot::channel(); + let rsp_channel = Box::into_raw(Box::new(hyper_response_channel(tx))); + + (self.service_fn)(self.userdata.as_ptr(), req_ptr, rsp_channel); + + Box::pin(async move { + let rsp = rx.await.expect("Channel closed?"); + Ok(rsp.finalize()) + }) + } +} + +enum AutoConnection +where + Serv: crate::service::HttpService, +{ + // The internals are in an `Option` so they can be extracted during H1->H2 fallback. Otherwise + // this must always be `Some(h1, h2)` (and code is allowed to panic if that's not true). + H1(Option<(http1::Connection, http2::Builder)>), + H2(http2::Connection, Serv, Exec>), +} + +// Marker type so the `hyper_task` can be distinguished from internal timer/h2 tasks. +struct ServerConn; + +unsafe impl crate::ffi::task::AsTaskType for ServerConn { + fn as_task_type(&self) -> crate::ffi::task::hyper_task_return_type { + crate::ffi::task::hyper_task_return_type::HYPER_TASK_SERVERCONN + } +} + +impl std::future::Future for AutoConnection +where + IO: crate::rt::Read + crate::rt::Write + Unpin + 'static, + Serv: crate::service::HttpService, + Exec: crate::rt::Executor> + + Unpin + + Clone, + http1::Connection: std::future::Future> + Unpin, + http2::Connection, Serv, Exec>: + std::future::Future> + Unpin, +{ + type Output = crate::Result; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let zelf = std::pin::Pin::into_inner(self); + let (h1, h2) = match zelf { + AutoConnection::H1(inner) => { + match ready!(std::pin::Pin::new(&mut inner.as_mut().unwrap().0).poll(cx)) { + Ok(()) => return std::task::Poll::Ready(Ok(ServerConn)), + Err(e) => { + let kind = e.kind(); + if matches!( + kind, + crate::error::Kind::Parse(crate::error::Parse::VersionH2) + ) { + // Fallback - switching variant has to happen outside the match block since + // `self` is borrowed. + // + // This breaks the invariant of the H1 variant, so we _must_ fix up `zelf` + // before returning from this function. + inner.take().unwrap() + } else { + // Some other error, pass upwards + return std::task::Poll::Ready(Err(e)); + } + } + } + } + AutoConnection::H2(h2) => match ready!(std::pin::Pin::new(h2).poll(cx)) { + Ok(()) => return std::task::Poll::Ready(Ok(ServerConn)), + Err(e) => return std::task::Poll::Ready(Err(e)), + }, + }; + + // We've not returned already (for pending, success or "other" errors) so we must be + // switching to H2 - rewind the IO, build an H2 connection, update `zelf` to the H2 variant + // then re-schedule this future for mainline processing. + let http1::Parts { + io, + read_buf, + service, + .. + } = h1.into_parts(); + let rewind = crate::common::io::Rewind::new_buffered(io, read_buf); + let h2 = h2.serve_connection(rewind, service); + *zelf = AutoConnection::H2(h2); + std::pin::Pin::new(zelf).poll(cx) + } +} diff --git a/src/ffi/task.rs b/src/ffi/task.rs index f53a7b1f5a..dd81a33b44 100644 --- a/src/ffi/task.rs +++ b/src/ffi/task.rs @@ -12,7 +12,7 @@ use futures_util::stream::{FuturesUnordered, Stream}; use libc::c_int; use super::error::hyper_code; -use super::UserDataPointer; +use super::userdata::{hyper_userdata_drop, Userdata}; type BoxFuture = Pin + Send>>; type BoxAny = Box; @@ -69,10 +69,14 @@ pub struct hyper_executor { /// This is used to track when a future calls `wake` while we are within /// `hyper_executor::poll_next`. is_woken: Arc, + + /// The heap of programmed timers, these will be progressed at the start of + /// `hyper_executor_poll` + timers: Arc>, } #[derive(Clone)] -pub(crate) struct WeakExec(Weak); +pub(super) struct WeakExec(Weak); struct ExecWaker(AtomicBool); @@ -114,7 +118,7 @@ struct ExecWaker(AtomicBool); pub struct hyper_task { future: BoxFuture, output: Option, - userdata: UserDataPointer, + userdata: Userdata, } struct TaskFuture { @@ -169,13 +173,15 @@ pub enum hyper_task_return_type { HYPER_TASK_RESPONSE, /// The value of this task is `hyper_buf *`. HYPER_TASK_BUF, + /// The value of this task is null (the task was a server-side connection task) + HYPER_TASK_SERVERCONN, } -pub(crate) unsafe trait AsTaskType { +pub(super) unsafe trait AsTaskType { fn as_task_type(&self) -> hyper_task_return_type; } -pub(crate) trait IntoDynTaskType { +pub(super) trait IntoDynTaskType { fn into_dyn_task_type(self) -> BoxAny; } @@ -187,13 +193,18 @@ impl hyper_executor { driver: Mutex::new(FuturesUnordered::new()), spawn_queue: Mutex::new(Vec::new()), is_woken: Arc::new(ExecWaker(AtomicBool::new(false))), + timers: Arc::new(Mutex::new(crate::ffi::time::TimerHeap::new())), }) } - pub(crate) fn downgrade(exec: &Arc) -> WeakExec { + pub(super) fn downgrade(exec: &Arc) -> WeakExec { WeakExec(Arc::downgrade(exec)) } + pub(super) fn timer_heap(&self) -> &Arc> { + &self.timers + } + fn spawn(&self, task: Box) { self.spawn_queue .lock() @@ -202,37 +213,39 @@ impl hyper_executor { } fn poll_next(&self) -> Option> { - // Drain the queue first. + // Move any new tasks to the runnable queue self.drain_queue(); + // Wake all popped timers + self.pop_timers(); + let waker = futures_util::task::waker_ref(&self.is_woken); let mut cx = Context::from_waker(&waker); loop { - { - // Scope the lock on the driver to ensure it is dropped before - // calling drain_queue below. - let mut driver = self.driver.lock().unwrap(); - match Pin::new(&mut *driver).poll_next(&mut cx) { - Poll::Ready(val) => return val, - Poll::Pending => {} - }; - } - - // poll_next returned Pending. - // Check if any of the pending tasks tried to spawn - // some new tasks. If so, drain into the driver and loop. - if self.drain_queue() { - continue; - } - - // If the driver called `wake` while we were polling, - // we should poll again immediately! - if self.is_woken.0.swap(false, Ordering::SeqCst) { - continue; + let poll = Pin::new(&mut *self.driver.lock().unwrap()).poll_next(&mut cx); + match poll { + Poll::Ready(val) => return val, + Poll::Pending => { + // Time has progressed while polling above, so fire any wakers for timers that + // have popped in that window. + self.pop_timers(); + + // Check if any of the pending tasks tried to spawn some new tasks. If so, + // drain into the driver and loop. + if self.drain_queue() { + continue; + } + + // If the driver called `wake` while we were polling or any timers have popped, + // we should poll again immediately! + if self.is_woken.0.swap(false, Ordering::SeqCst) { + continue; + } + + return None; + } } - - return None; } } @@ -252,6 +265,12 @@ impl hyper_executor { true } + + // Walk the timer heap waking active timers and discarding cancelled ones. + fn pop_timers(&self) { + let mut heap = self.timers.lock().unwrap(); + heap.process_timers(); + } } impl futures_util::task::ArcWake for ExecWaker { @@ -263,7 +282,7 @@ impl futures_util::task::ArcWake for ExecWaker { // ===== impl WeakExec ===== impl WeakExec { - pub(crate) fn new() -> Self { + pub(super) fn new() -> Self { WeakExec(Weak::new()) } } @@ -338,10 +357,29 @@ ffi_fn! { } ?= ptr::null_mut() } +ffi_fn! { + /// Returns the time until the executor will be able to make progress on tasks due to internal + /// timers popping. The executor should be polled soon after this time (if not earlier due to + /// IO operations becoming available). + /// + /// Returns the time in milliseconds - a return value of -1 means there's no configured timers + /// and the executor doesn't need polling until there's IO work available. + fn hyper_executor_next_timer_pop(exec: *const hyper_executor) -> std::ffi::c_int { + let exec = non_null!(&*exec ?= -1); + match exec.timers.lock().unwrap().next_timer_pop() { + Some(duration) => { + let micros = duration.as_micros(); + ((micros + 999) / 1000) as _ + } + None => -1 + } + } +} + // ===== impl hyper_task ===== impl hyper_task { - pub(crate) fn boxed(fut: F) -> Box + pub(super) fn boxed(fut: F) -> Box where F: Future + Send + 'static, F::Output: IntoDynTaskType + Send + Sync + 'static, @@ -349,7 +387,7 @@ impl hyper_task { Box::new(hyper_task { future: Box::pin(async move { fut.await.into_dyn_task_type() }), output: None, - userdata: UserDataPointer(ptr::null_mut()), + userdata: Userdata::default(), }) } @@ -433,19 +471,16 @@ ffi_fn! { /// /// This is useful for telling apart tasks for different requests that are /// running on the same executor. - fn hyper_task_set_userdata(task: *mut hyper_task, userdata: *mut c_void) { - if task.is_null() { - return; - } - - unsafe { (*task).userdata = UserDataPointer(userdata) }; + fn hyper_task_set_userdata(task: *mut hyper_task, userdata: *mut c_void, drop: hyper_userdata_drop) { + let task = non_null!(&mut*task ?= ()); + task.userdata = Userdata::new(userdata, drop); } } ffi_fn! { /// Retrieve the userdata that has been set via `hyper_task_set_userdata`. fn hyper_task_userdata(task: *mut hyper_task) -> *mut c_void { - non_null!(&*task ?= ptr::null_mut()).userdata.0 + non_null!(&*task ?= ptr::null_mut()).userdata.as_ptr() } ?= ptr::null_mut() } @@ -499,7 +534,7 @@ where // ===== impl hyper_context ===== impl hyper_context<'_> { - pub(crate) fn wrap<'a, 'b>(cx: &'a mut Context<'b>) -> &'a mut hyper_context<'b> { + pub(super) fn wrap<'a, 'b>(cx: &'a mut Context<'b>) -> &'a mut hyper_context<'b> { // A struct with only one field has the same layout as that field. unsafe { std::mem::transmute::<&mut Context<'_>, &mut hyper_context<'_>>(cx) } } diff --git a/src/ffi/time.rs b/src/ffi/time.rs new file mode 100644 index 0000000000..f3f2747752 --- /dev/null +++ b/src/ffi/time.rs @@ -0,0 +1,158 @@ +use std::collections::binary_heap::{BinaryHeap, PeekMut}; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; +use std::time::{Duration, Instant}; + +/// A heap of timer entries with their associated wakers, backing `TimerFuture` instances. +pub(super) struct TimerHeap(BinaryHeap); + +/// The entry in the timer heap for a programmed timer. The heap should expire the timer at +/// `wake_at` and wake any waker it finds in the `shared` state. +struct TimerEntry { + shared: Arc>, + wake_at: Instant, +} + +/// A future that completes at `wake_at`. Requires that the associated `TimerHeap` is driven +/// in order to make progress. +struct TimerFuture { + heap: Arc>, + wake_at: Instant, + // This is None when the timer isn't programmed in the heap + shared: Option>>, +} + +/// Shared between the timer future and the timer heap. If the heap expires a timer it should wake +/// the associated waker if one is present (if not, that indicates that the timer has been cancelled +/// and can be discarded). +struct TimerShared { + waker: Option, +} + +// ===== impl TimerEntry ===== + +// Consistency with `Ord` requires us to report `TimerEntry`s with the same `wake_at` as equal. +impl std::cmp::PartialEq for TimerEntry { + fn eq(&self, other: &Self) -> bool { + self.wake_at.eq(&other.wake_at) + } +} +impl std::cmp::Eq for TimerEntry {} + +// BinaryHeap is a max-heap and we want the top of the heap to be the nearest to popping timer +// so we want the "bigger" timer to have the earlier `wake_at` time. We achieve this by flipping +// the sides of the comparisons in `Ord` and implementing `PartialOrd` in terms of `Ord`. +impl std::cmp::PartialOrd for TimerEntry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} +impl std::cmp::Ord for TimerEntry { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Note flipped order + other.wake_at.cmp(&self.wake_at) + } +} + +// ===== impl TimerFuture ===== + +impl std::future::Future for TimerFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let now = Instant::now(); + + if self.wake_at <= now { + return Poll::Ready(()); + } + + match &self.shared { + Some(shared) => { + // Timer was already programmed, update the waker + shared.lock().unwrap().waker = Some(cx.waker().clone()); + } + None => { + // Need to program the timer into the heap + let shared = Arc::new(Mutex::new(TimerShared { + waker: Some(cx.waker().clone()), + })); + { + let mut heap = self.heap.lock().unwrap(); + let t = TimerEntry { + shared: Arc::clone(&shared), + wake_at: self.wake_at, + }; + heap.0.push(t); + } + self.shared = Some(shared); + } + } + + return Poll::Pending; + } +} + +impl std::ops::Drop for TimerFuture { + fn drop(&mut self) { + if let Some(shared) = &self.shared { + let _ = shared.lock().unwrap().waker.take(); + } + } +} + +// ===== impl TimerHeap ===== + +impl crate::rt::Timer for Arc> { + fn sleep(&self, duration: Duration) -> Pin> { + self.sleep_until(Instant::now() + duration) + } + + fn sleep_until(&self, instant: Instant) -> Pin> { + Box::pin(TimerFuture { + heap: Arc::clone(self), + wake_at: instant, + shared: None, + }) + } +} + +impl TimerHeap { + pub(super) fn new() -> Self { + Self(BinaryHeap::new()) + } + + /// Walk the timer heap waking active timers and discarding cancelled ones. + pub(super) fn process_timers(&mut self) { + let now = Instant::now(); + while let Some(timer) = self.0.peek_mut() { + if let Some(waker) = &mut timer.shared.lock().unwrap().waker { + if timer.wake_at < now { + waker.wake_by_ref(); + } else { + break; + } + } + // This time was for the past so pop it now. + let _ = PeekMut::pop(timer); + } + } + + /// Returns the time until the executor will be able to make progress on tasks due to internal + /// timers popping. The executor should be polled soon after this time (if not earlier due to + /// IO operations becoming available). + /// + /// If no timers are currently programmed, returns `None`. + pub(super) fn next_timer_pop(&mut self) -> Option { + let now = Instant::now(); + while let Some(timer) = self.0.peek_mut() { + if timer.shared.lock().unwrap().waker.is_some() { + return Some(timer.wake_at - now); + } else { + PeekMut::pop(timer); + } + } + + return None; + } +} diff --git a/src/ffi/userdata.rs b/src/ffi/userdata.rs new file mode 100644 index 0000000000..78132cd176 --- /dev/null +++ b/src/ffi/userdata.rs @@ -0,0 +1,50 @@ +use std::ffi::c_void; + +/// Many hyper entities can be given userdata to allow user callbacks to correlate work together. +/// Since much of hyper is asynchronous it's often useful to treat these userdata objects as +/// "owned" by the hyper entity (and hence to be cleaned up when that entity is dropped). +/// +/// To achieve this a `hyper_userdata_drop` callback is passed by calling code alongside the +/// userdata to register a cleanup function. +/// +/// This function may be provided as NULL if the calling code wants to manage memory lifetimes +/// itself, in which case the hyper object will logically consider the userdata "borrowed" until +/// the hyper entity is dropped. +pub type hyper_userdata_drop = Option; + +/// A handle to a user-provided arbitrary object, along with an optional drop callback for the +/// object. +pub(crate) struct Userdata { + data: *mut c_void, + drop: hyper_userdata_drop, +} + +impl Userdata { + pub(crate) fn new(data: *mut c_void, drop: hyper_userdata_drop) -> Self { + Self { data, drop } + } + + pub(crate) fn as_ptr(&self) -> *mut c_void { + self.data + } +} + +impl Default for Userdata { + fn default() -> Self { + Self { + data: std::ptr::null_mut(), + drop: None, + } + } +} + +unsafe impl Sync for Userdata {} +unsafe impl Send for Userdata {} + +impl Drop for Userdata { + fn drop(&mut self) { + if let Some(drop) = self.drop { + drop(self.data); + } + } +} diff --git a/src/rt/timer.rs b/src/rt/timer.rs index c6a6f1dbc0..0f42d5fe12 100644 --- a/src/rt/timer.rs +++ b/src/rt/timer.rs @@ -1,6 +1,7 @@ //! Provides a timer trait with timer-like functions //! //! Example using tokio timer: +//! //! ```rust //! use std::{ //! future::Future, @@ -17,46 +18,19 @@ //! //! impl Timer for TokioTimer { //! fn sleep(&self, duration: Duration) -> Pin> { -//! Box::pin(TokioSleep { -//! inner: tokio::time::sleep(duration), -//! }) +//! Box::pin(tokio::time::sleep(duration)) //! } //! //! fn sleep_until(&self, deadline: Instant) -> Pin> { -//! Box::pin(TokioSleep { -//! inner: tokio::time::sleep_until(deadline.into()), -//! }) +//! Box::pin(tokio::time::sleep_until(deadline.into())) //! } //! //! fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { -//! if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { +//! if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { //! sleep.reset(new_deadline.into()) //! } //! } //! } -//! -//! pin_project! { -//! pub(crate) struct TokioSleep { -//! #[pin] -//! pub(crate) inner: tokio::time::Sleep, -//! } -//! } -//! -//! impl Future for TokioSleep { -//! type Output = (); -//! -//! fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { -//! self.project().inner.poll(cx) -//! } -//! } -//! -//! impl Sleep for TokioSleep {} -//! -//! impl TokioSleep { -//! pub fn reset(self: Pin<&mut Self>, deadline: Instant) { -//! self.project().inner.as_mut().reset(deadline.into()); -//! } -//! } //! ``` use std::{ @@ -91,6 +65,7 @@ pub trait Sleep: Send + Sync + Future { TypeId::of::() } } +impl Sleep for T where T: Send + Sync + Future {} impl dyn Sleep { //! This is a re-implementation of downcast methods from std::any::Any