From 61dc6eb2d1eaa100e907ce4d6aedccb25625225b Mon Sep 17 00:00:00 2001 From: Laytan Date: Sun, 17 Sep 2023 02:17:15 +0200 Subject: [PATCH 01/16] server sent events start --- examples/sse/main.odin | 51 +++++++++ sse.odin | 235 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 286 insertions(+) create mode 100644 examples/sse/main.odin create mode 100644 sse.odin diff --git a/examples/sse/main.odin b/examples/sse/main.odin new file mode 100644 index 0000000..dc03881 --- /dev/null +++ b/examples/sse/main.odin @@ -0,0 +1,51 @@ +package sse_example + +import "core:fmt" +import "core:log" +import "core:time" +import "core:net" + +import http "../.." +import "../../nbio" + +// Minimal server that listens on 127.0.0.1:8080 and responds to every request with 200 Ok. +main :: proc() { + context.logger = log.create_console_logger(.Debug) + + s: http.Server + + handler := http.handler(proc(_: ^http.Request, res: ^http.Response) { + sse := new(http.Sse) + + http.sse_start(sse, res, rawptr(uintptr(0)), proc(sse: ^http.Sse, err: net.Network_Error) { + log.errorf("sse error: %v", err) + }) + + http.sse_event(sse, {data = "Hello, World!"}) + + tick :: proc(sse: rawptr, now: Maybe(time.Time)) { + sse := cast(^http.Sse)sse + + if sse.state > .Ending do return + + nbio.timeout(&http.td.io, time.Second, sse, tick) + + http.sse_event(sse, { + id = int(uintptr(sse.user_data)), + event = "tick", + data = http.date_string(now.? or_else time.now()), + }) + + if uintptr(sse.user_data) > 10 { + http.sse_end(sse) + } + + sse.user_data = rawptr(uintptr(sse.user_data) + 1) + } + tick(sse, nil) + }) + + http.server_shutdown_on_interrupt(&s) + + fmt.printf("Server stopped: %s", http.listen_and_serve(&s, handler)) +} diff --git a/sse.odin b/sse.odin new file mode 100644 index 0000000..54536ae --- /dev/null +++ b/sse.odin @@ -0,0 +1,235 @@ +package http + +import "core:container/queue" +import "core:net" +import "core:strings" +import "core:bytes" + +import "nbio" + +Sse :: struct { + user_data: rawptr, + on_err: Maybe(Sse_On_Error), + + r: ^Response, + events: queue.Queue(Sse_Event), + state: Sse_State, +} + +Sse_Event :: struct { + event: Maybe(string), + data: Maybe(string), + id: Maybe(int), + retry: Maybe(int), + comment: Maybe(string), + + _buf: strings.Builder, + _sent: int, +} + +Sse_State :: enum { + Uninitialized, + + // The initial HTTP response is being sent over the connection (status code&headers) before + // we can start sending events. + Starting, + + // No events are being sent over the connection but it is ready to. + Idle, + + // An event is being sent over the connection. + Sending, + + // Set to when sse_end is called when there are still events in the queue. + // The events in the queue will be processed and then closed. + Ending, + + // Either done ending or forced ending. + // Every callback will return immediately, nothing else is processed. + Close, +} + +/* +A handler that is called when there is an error or when sse_end is called. +This will always be called in a cycle, and only once. +If this is called after a sse_end call the err is nil. +This is called before the connection is closed. +*/ +Sse_On_Error :: #type proc(sse: ^Sse, err: net.Network_Error) + +/* +Initialize the Sse struct, and start sending the status code and headers. +*/ +sse_start :: proc(sse: ^Sse, r: ^Response, user_data: rawptr = nil, on_error: Maybe(Sse_On_Error) = nil) { + sse.r = r + sse.user_data = user_data + sse.on_err = on_error + sse.state = .Starting + + r.status = .OK + r.headers["cache-control"] = "no-store" + r.headers["content-type"] = "text/event-stream" + _response_write_heading(r, -1) + + // TODO: use other response logic from response_send proc, have a way to send a response without + // actually cleaning up the request, and a way to hook into when that is done. + + on_start_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { + sse := cast(^Sse)sse + + if err != nil { + _sse_err(sse, err) + return + } + + res := &sse.r._conn.loop.inflight.(Response_Inflight) + + res.sent += n + if len(res.buf) != res.sent { + nbio.send(&td.io, sse.r._conn.socket, res.buf[res.sent:], sse, on_start_send) + return + } + + _sse_process(sse) + } + + buf := bytes.buffer_to_bytes(&r._buf) + r._conn.loop.inflight = Response_Inflight { + buf = buf, + } + nbio.send(&td.io, r._conn.socket, buf, sse, on_start_send) +} + +/* +Queues an event to be sent over the connection. +You must call `sse_start` first, this is a no-op when end has been called or an error has occurred. +*/ +sse_event :: proc(sse: ^Sse, ev: Sse_Event, loc := #caller_location) { + switch sse.state { + case .Starting, .Sending, .Ending, .Idle: + queue.push_back(&sse.events, ev) + _sse_event_prepare(queue.peek_back(&sse.events)) + + case .Uninitialized: + panic("sse_start must be called first", loc) + + case .Close: + } + + if sse.state == .Idle { + _sse_process(sse) + } +} + +/* +Ends the event stream without sending all queued events. +*/ +sse_force_end :: proc(sse: ^Sse) { + sse.state = .Close + if cb, ok := sse.on_err.?; ok do cb(sse, nil) + connection_close(sse.r._conn) +} + +/* +Ends the event stream as soon as all queued events are sent. +*/ +sse_end :: proc(sse: ^Sse) { + if sse.state >= .Ending do return + + if sse.state == .Sending { + sse.state = .Ending + return + } + + if cb, ok := sse.on_err.?; ok do cb(sse, nil) + connection_close(sse.r._conn) +} + +_sse_err :: proc(sse: ^Sse, err: net.Network_Error) { + if sse.state >= .Ending do return + + sse.state = .Close + + if cb, ok := sse.on_err.?; ok do cb(sse, err) + connection_close(sse.r._conn) +} + +_sse_process :: proc(sse: ^Sse) { + if sse.state == .Close do return + + if queue.len(sse.events) == 0 { + #partial switch sse.state { + // We have sent all events in the queue, complete the ending if we are. + case .Ending: sse_force_end(sse) + case: sse.state = .Idle + } + return + } + + ev := queue.peek_front(&sse.events) + + #partial switch sse.state { + case .Ending: // noop + case: sse.state = .Sending + } + + nbio.send(&td.io, sse.r._conn.socket, ev._buf.buf[:], sse, _sse_on_send) +} + +_sse_on_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { + sse := cast(^Sse)sse + ev := queue.peek_front(&sse.events) + + if err != nil { + _sse_err(sse, err) + return + } + + if sse.state == .Close do return + + ev._sent += n + if len(ev._buf.buf) > ev._sent { + nbio.send(&td.io, sse.r._conn.socket, ev._buf.buf[ev._sent:], sse, _sse_on_send) + return + } + + queue.pop_front(&sse.events) + _sse_process(sse) +} + +// TODO :doesn't handle multiline values +_sse_event_prepare :: proc(ev: ^Sse_Event) { + b := &ev._buf + + if name, ok := ev.event.?; ok { + strings.write_string(b, "event: ") + strings.write_string(b, name) + strings.write_string(b, "\r\n") + } + + if cmnt, ok := ev.comment.?; ok { + strings.write_string(b, "; ") + strings.write_string(b, cmnt) + strings.write_string(b, "\r\n") + } + + if id, ok := ev.id.?; ok { + strings.write_string(b, "id: ") + strings.write_int(b, id) + strings.write_string(b, "\r\n") + } + + if retry, ok := ev.retry.?; ok { + strings.write_string(b, "retry: ") + strings.write_int(b, retry) + strings.write_string(b, "\r\n") + } + + if data, ok := ev.data.?; ok { + strings.write_string(b, "data: ") + strings.write_string(b, data) + strings.write_string(b, "\r\n") + } + + strings.write_string(b, "\r\n") +} From c196418df2c94de32186840daa3a9dd7cdd44c69 Mon Sep 17 00:00:00 2001 From: Laytan Date: Sun, 17 Sep 2023 02:27:53 +0200 Subject: [PATCH 02/16] todos --- sse.odin | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/sse.odin b/sse.odin index 54536ae..b267b6c 100644 --- a/sse.odin +++ b/sse.odin @@ -7,6 +7,14 @@ import "core:bytes" import "nbio" +// TODO: memory management. + +// TODO: browser example, maybe a clock ran on server time (high frequency)? + +// TODO: is there a way to say we are done to the client (in the spec), afaik the browser will always reconnect. + +// TODO: might make sense as its own package (sse). + Sse :: struct { user_data: rawptr, on_err: Maybe(Sse_On_Error), @@ -23,6 +31,7 @@ Sse_Event :: struct { retry: Maybe(int), comment: Maybe(string), + // TODO: put buf on Sse, we only need one because we sent one at a time. _buf: strings.Builder, _sent: int, } @@ -50,8 +59,8 @@ Sse_State :: enum { } /* -A handler that is called when there is an error or when sse_end is called. -This will always be called in a cycle, and only once. +A handler that is called when there is an error (client disconnected for example) or when sse_end is called. +This will always be called in a cycle, and only once, so cleaning up after yourself is easily done here. If this is called after a sse_end call the err is nil. This is called before the connection is closed. */ @@ -67,6 +76,7 @@ sse_start :: proc(sse: ^Sse, r: ^Response, user_data: rawptr = nil, on_error: Ma sse.state = .Starting r.status = .OK + // TODO: do we need this header? r.headers["cache-control"] = "no-store" r.headers["content-type"] = "text/event-stream" _response_write_heading(r, -1) From 47a27ae3c0562ae31de94aed99e503b9b4df58ac Mon Sep 17 00:00:00 2001 From: Laytan Date: Mon, 18 Sep 2023 20:29:30 +0200 Subject: [PATCH 03/16] server sent events: add example with web page, one buffer, memory management --- examples/sse/index.html | 35 +++++++++++ examples/sse/main.odin | 25 ++++---- sse.odin | 129 +++++++++++++++++++++++++--------------- 3 files changed, 131 insertions(+), 58 deletions(-) create mode 100644 examples/sse/index.html diff --git a/examples/sse/index.html b/examples/sse/index.html new file mode 100644 index 0000000..5a422e2 --- /dev/null +++ b/examples/sse/index.html @@ -0,0 +1,35 @@ + + + + SSE + + + + +

Hello, SSE!

+

Time from the server:

+

Other events:

+ + + diff --git a/examples/sse/main.odin b/examples/sse/main.odin index dc03881..e762aaa 100644 --- a/examples/sse/main.odin +++ b/examples/sse/main.odin @@ -15,34 +15,39 @@ main :: proc() { s: http.Server handler := http.handler(proc(_: ^http.Request, res: ^http.Response) { - sse := new(http.Sse) + res.headers["access-control-allow-origin"] = "*" - http.sse_start(sse, res, rawptr(uintptr(0)), proc(sse: ^http.Sse, err: net.Network_Error) { - log.errorf("sse error: %v", err) - }) + sse: http.Sse + http.sse_init(&sse, res) + http.sse_start(&sse) - http.sse_event(sse, {data = "Hello, World!"}) + http.sse_event(&sse, {data = "Hello, World!"}) - tick :: proc(sse: rawptr, now: Maybe(time.Time)) { + tick :: proc(sse: rawptr, now: Maybe(time.Time) = nil) { sse := cast(^http.Sse)sse + i := uintptr(sse.user_data) + + // If you were using a custom allocator: + // the temp_allocator is automatically free'd after the response is sent and the connection is closed. + // if sse.state == .Close do free(sse) if sse.state > .Ending do return nbio.timeout(&http.td.io, time.Second, sse, tick) http.sse_event(sse, { - id = int(uintptr(sse.user_data)), event = "tick", data = http.date_string(now.? or_else time.now()), }) - if uintptr(sse.user_data) > 10 { + // End after a minute. + if i > uintptr(time.Second * 60) { http.sse_end(sse) } - sse.user_data = rawptr(uintptr(sse.user_data) + 1) + sse.user_data = rawptr(i + 1) } - tick(sse, nil) + tick(&sse) }) http.server_shutdown_on_interrupt(&s) diff --git a/sse.odin b/sse.odin index b267b6c..59f18fd 100644 --- a/sse.odin +++ b/sse.odin @@ -1,43 +1,42 @@ package http +import "core:bytes" import "core:container/queue" +import "core:log" import "core:net" import "core:strings" -import "core:bytes" import "nbio" -// TODO: memory management. - -// TODO: browser example, maybe a clock ran on server time (high frequency)? - -// TODO: is there a way to say we are done to the client (in the spec), afaik the browser will always reconnect. - // TODO: might make sense as its own package (sse). +// TODO: shutdown doesn't work. + Sse :: struct { user_data: rawptr, on_err: Maybe(Sse_On_Error), r: ^Response, - events: queue.Queue(Sse_Event), + + // State should be considered read-only by users. state: Sse_State, + + _events: queue.Queue(Sse_Event), + + _buf: strings.Builder, + _sent: int, } Sse_Event :: struct { event: Maybe(string), data: Maybe(string), - id: Maybe(int), + id: Maybe(string), retry: Maybe(int), comment: Maybe(string), - - // TODO: put buf on Sse, we only need one because we sent one at a time. - _buf: strings.Builder, - _sent: int, } Sse_State :: enum { - Uninitialized, + Pre_Start, // The initial HTTP response is being sent over the connection (status code&headers) before // we can start sending events. @@ -67,19 +66,27 @@ This is called before the connection is closed. Sse_On_Error :: #type proc(sse: ^Sse, err: net.Network_Error) /* -Initialize the Sse struct, and start sending the status code and headers. +Initializes an sse struct with the given arguments. */ -sse_start :: proc(sse: ^Sse, r: ^Response, user_data: rawptr = nil, on_error: Maybe(Sse_On_Error) = nil) { +sse_init :: proc(sse: ^Sse, r: ^Response, user_data: rawptr = nil, on_error: Maybe(Sse_On_Error) = nil, allocator := context.temp_allocator) { sse.r = r sse.user_data = user_data sse.on_err = on_error - sse.state = .Starting - r.status = .OK - // TODO: do we need this header? - r.headers["cache-control"] = "no-store" - r.headers["content-type"] = "text/event-stream" - _response_write_heading(r, -1) + queue.init(&sse._events, allocator = allocator) + strings.builder_init(&sse._buf, allocator) + + // Set the status and content type if they haven't been changed by the user. + if r.status == .Not_Found do r.status = .OK + if "content-type" not_in r.headers do r.headers["content-type"] = "text/event-stream" +} + +/* +Start by sending the status code and headers. +*/ +sse_start :: proc(sse: ^Sse) { + sse.state = .Starting + _response_write_heading(sse.r, -1) // TODO: use other response logic from response_send proc, have a way to send a response without // actually cleaning up the request, and a way to hook into when that is done. @@ -103,11 +110,11 @@ sse_start :: proc(sse: ^Sse, r: ^Response, user_data: rawptr = nil, on_error: Ma _sse_process(sse) } - buf := bytes.buffer_to_bytes(&r._buf) - r._conn.loop.inflight = Response_Inflight { + buf := bytes.buffer_to_bytes(&sse.r._buf) + sse.r._conn.loop.inflight = Response_Inflight { buf = buf, } - nbio.send(&td.io, r._conn.socket, buf, sse, on_start_send) + nbio.send(&td.io, sse.r._conn.socket, buf, sse, on_start_send) } /* @@ -117,10 +124,9 @@ You must call `sse_start` first, this is a no-op when end has been called or an sse_event :: proc(sse: ^Sse, ev: Sse_Event, loc := #caller_location) { switch sse.state { case .Starting, .Sending, .Ending, .Idle: - queue.push_back(&sse.events, ev) - _sse_event_prepare(queue.peek_back(&sse.events)) + queue.push_back(&sse._events, ev) - case .Uninitialized: + case .Pre_Start: panic("sse_start must be called first", loc) case .Close: @@ -134,10 +140,12 @@ sse_event :: proc(sse: ^Sse, ev: Sse_Event, loc := #caller_location) { /* Ends the event stream without sending all queued events. */ -sse_force_end :: proc(sse: ^Sse) { +sse_end_force :: proc(sse: ^Sse) { sse.state = .Close - if cb, ok := sse.on_err.?; ok do cb(sse, nil) - connection_close(sse.r._conn) + + _sse_call_on_err(sse, nil) + sse_destroy(sse) + connection_close(sse.r._conn) } /* @@ -151,8 +159,20 @@ sse_end :: proc(sse: ^Sse) { return } - if cb, ok := sse.on_err.?; ok do cb(sse, nil) - connection_close(sse.r._conn) + sse.state = .Close + + _sse_call_on_err(sse, nil) + sse_destroy(sse) + connection_close(sse.r._conn) +} + +/* +Destroys any memory allocated, and if `sse_new` was used, frees the sse struct. +This is usually not a call you need to make, it is automatically called after an error or `sse_end`/`sse_end_force`. +*/ +sse_destroy :: proc(sse: ^Sse) { + strings.builder_destroy(&sse._buf) + queue.destroy(&sse._events) } _sse_err :: proc(sse: ^Sse, err: net.Network_Error) { @@ -160,35 +180,44 @@ _sse_err :: proc(sse: ^Sse, err: net.Network_Error) { sse.state = .Close - if cb, ok := sse.on_err.?; ok do cb(sse, err) - connection_close(sse.r._conn) + _sse_call_on_err(sse, err) + sse_destroy(sse) + connection_close(sse.r._conn) +} + +_sse_call_on_err :: proc(sse: ^Sse, err: net.Network_Error) { + if cb, ok := sse.on_err.?; ok { + cb(sse, err) + } else if err != nil { + // Most likely that the client closed the connection. + log.infof("Server Sent Event error: %v", err) + } } _sse_process :: proc(sse: ^Sse) { if sse.state == .Close do return - if queue.len(sse.events) == 0 { + if queue.len(sse._events) == 0 { #partial switch sse.state { // We have sent all events in the queue, complete the ending if we are. - case .Ending: sse_force_end(sse) + case .Ending: sse_end_force(sse) case: sse.state = .Idle } return } - ev := queue.peek_front(&sse.events) - #partial switch sse.state { case .Ending: // noop case: sse.state = .Sending } - nbio.send(&td.io, sse.r._conn.socket, ev._buf.buf[:], sse, _sse_on_send) + ev := queue.peek_front(&sse._events) + _sse_event_prepare(sse) + nbio.send(&td.io, sse.r._conn.socket, sse._buf.buf[:], sse, _sse_on_send) } _sse_on_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { sse := cast(^Sse)sse - ev := queue.peek_front(&sse.events) if err != nil { _sse_err(sse, err) @@ -197,19 +226,23 @@ _sse_on_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { if sse.state == .Close do return - ev._sent += n - if len(ev._buf.buf) > ev._sent { - nbio.send(&td.io, sse.r._conn.socket, ev._buf.buf[ev._sent:], sse, _sse_on_send) + sse._sent += n + if len(sse._buf.buf) > sse._sent { + nbio.send(&td.io, sse.r._conn.socket, sse._buf.buf[sse._sent:], sse, _sse_on_send) return } - queue.pop_front(&sse.events) + queue.pop_front(&sse._events) _sse_process(sse) } // TODO :doesn't handle multiline values -_sse_event_prepare :: proc(ev: ^Sse_Event) { - b := &ev._buf +_sse_event_prepare :: proc(sse: ^Sse) { + ev := queue.peek_front(&sse._events) + b := &sse._buf + + strings.builder_reset(b) + sse._sent = 0 if name, ok := ev.event.?; ok { strings.write_string(b, "event: ") @@ -225,7 +258,7 @@ _sse_event_prepare :: proc(ev: ^Sse_Event) { if id, ok := ev.id.?; ok { strings.write_string(b, "id: ") - strings.write_int(b, id) + strings.write_string(b, id) strings.write_string(b, "\r\n") } From 681694a34467ac2a0d548f5e36c8cc4dd8808bb2 Mon Sep 17 00:00:00 2001 From: Laytan Date: Mon, 18 Sep 2023 20:35:08 +0200 Subject: [PATCH 04/16] server sent events: comments --- examples/sse/main.odin | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/examples/sse/main.odin b/examples/sse/main.odin index e762aaa..cdf4239 100644 --- a/examples/sse/main.odin +++ b/examples/sse/main.odin @@ -8,7 +8,13 @@ import "core:net" import http "../.." import "../../nbio" -// Minimal server that listens on 127.0.0.1:8080 and responds to every request with 200 Ok. +/* +Responds to any requests with a 200 OK that starts an event stream (aka server sent events). +The first event sent is a general "Hello, World!", +then it sends one event every second with the current time. + +All this is done without spawning any extra threads by using the underlying nbio (non-blocking IO) package. +*/ main :: proc() { context.logger = log.create_console_logger(.Debug) @@ -33,6 +39,7 @@ main :: proc() { if sse.state > .Ending do return + // Queue next tick. nbio.timeout(&http.td.io, time.Second, sse, tick) http.sse_event(sse, { From 63bf85c4788f294006da18ff4cf3736db80dcddf Mon Sep 17 00:00:00 2001 From: Laytan Date: Mon, 18 Sep 2023 20:39:23 +0200 Subject: [PATCH 05/16] server sent events: formatting --- examples/sse/main.odin | 7 +++--- sse.odin | 57 ++++++++++++++++++++++++------------------ 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/examples/sse/main.odin b/examples/sse/main.odin index cdf4239..e6fd00a 100644 --- a/examples/sse/main.odin +++ b/examples/sse/main.odin @@ -2,8 +2,8 @@ package sse_example import "core:fmt" import "core:log" -import "core:time" import "core:net" +import "core:time" import http "../.." import "../../nbio" @@ -20,7 +20,7 @@ main :: proc() { s: http.Server - handler := http.handler(proc(_: ^http.Request, res: ^http.Response) { + handler := http.handler( proc(_: ^http.Request, res: ^http.Response) { res.headers["access-control-allow-origin"] = "*" sse: http.Sse @@ -55,7 +55,8 @@ main :: proc() { sse.user_data = rawptr(i + 1) } tick(&sse) - }) + }, + ) http.server_shutdown_on_interrupt(&s) diff --git a/sse.odin b/sse.odin index 59f18fd..e4b8b1e 100644 --- a/sse.odin +++ b/sse.odin @@ -18,7 +18,7 @@ Sse :: struct { r: ^Response, - // State should be considered read-only by users. + // State should be considered read-only by users. state: Sse_State, _events: queue.Queue(Sse_Event), @@ -68,16 +68,22 @@ Sse_On_Error :: #type proc(sse: ^Sse, err: net.Network_Error) /* Initializes an sse struct with the given arguments. */ -sse_init :: proc(sse: ^Sse, r: ^Response, user_data: rawptr = nil, on_error: Maybe(Sse_On_Error) = nil, allocator := context.temp_allocator) { - sse.r = r +sse_init :: proc( + sse: ^Sse, + r: ^Response, + user_data: rawptr = nil, + on_error: Maybe(Sse_On_Error) = nil, + allocator := context.temp_allocator, +) { + sse.r = r sse.user_data = user_data - sse.on_err = on_error + sse.on_err = on_error - queue.init(&sse._events, allocator = allocator) - strings.builder_init(&sse._buf, allocator) + queue.init(&sse._events, allocator = allocator) + strings.builder_init(&sse._buf, allocator) - // Set the status and content type if they haven't been changed by the user. - if r.status == .Not_Found do r.status = .OK + // Set the status and content type if they haven't been changed by the user. + if r.status == .Not_Found do r.status = .OK if "content-type" not_in r.headers do r.headers["content-type"] = "text/event-stream" } @@ -144,8 +150,8 @@ sse_end_force :: proc(sse: ^Sse) { sse.state = .Close _sse_call_on_err(sse, nil) - sse_destroy(sse) - connection_close(sse.r._conn) + sse_destroy(sse) + connection_close(sse.r._conn) } /* @@ -162,8 +168,8 @@ sse_end :: proc(sse: ^Sse) { sse.state = .Close _sse_call_on_err(sse, nil) - sse_destroy(sse) - connection_close(sse.r._conn) + sse_destroy(sse) + connection_close(sse.r._conn) } /* @@ -171,8 +177,8 @@ Destroys any memory allocated, and if `sse_new` was used, frees the sse struct. This is usually not a call you need to make, it is automatically called after an error or `sse_end`/`sse_end_force`. */ sse_destroy :: proc(sse: ^Sse) { - strings.builder_destroy(&sse._buf) - queue.destroy(&sse._events) + strings.builder_destroy(&sse._buf) + queue.destroy(&sse._events) } _sse_err :: proc(sse: ^Sse, err: net.Network_Error) { @@ -181,8 +187,8 @@ _sse_err :: proc(sse: ^Sse, err: net.Network_Error) { sse.state = .Close _sse_call_on_err(sse, err) - sse_destroy(sse) - connection_close(sse.r._conn) + sse_destroy(sse) + connection_close(sse.r._conn) } _sse_call_on_err :: proc(sse: ^Sse, err: net.Network_Error) { @@ -200,19 +206,22 @@ _sse_process :: proc(sse: ^Sse) { if queue.len(sse._events) == 0 { #partial switch sse.state { // We have sent all events in the queue, complete the ending if we are. - case .Ending: sse_end_force(sse) - case: sse.state = .Idle + case .Ending: + sse_end_force(sse) + case: + sse.state = .Idle } return } #partial switch sse.state { case .Ending: // noop - case: sse.state = .Sending + case: + sse.state = .Sending } ev := queue.peek_front(&sse._events) - _sse_event_prepare(sse) + _sse_event_prepare(sse) nbio.send(&td.io, sse.r._conn.socket, sse._buf.buf[:], sse, _sse_on_send) } @@ -238,11 +247,11 @@ _sse_on_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { // TODO :doesn't handle multiline values _sse_event_prepare :: proc(sse: ^Sse) { - ev := queue.peek_front(&sse._events) - b := &sse._buf + ev := queue.peek_front(&sse._events) + b := &sse._buf - strings.builder_reset(b) - sse._sent = 0 + strings.builder_reset(b) + sse._sent = 0 if name, ok := ev.event.?; ok { strings.write_string(b, "event: ") From 922d82c1b7dccf2c27f2ce75a3350a1ab74304c1 Mon Sep 17 00:00:00 2001 From: Laytan Date: Mon, 18 Sep 2023 20:41:16 +0200 Subject: [PATCH 06/16] server-sent-events: unused var --- sse.odin | 1 - 1 file changed, 1 deletion(-) diff --git a/sse.odin b/sse.odin index e4b8b1e..758a469 100644 --- a/sse.odin +++ b/sse.odin @@ -220,7 +220,6 @@ _sse_process :: proc(sse: ^Sse) { sse.state = .Sending } - ev := queue.peek_front(&sse._events) _sse_event_prepare(sse) nbio.send(&td.io, sse.r._conn.socket, sse._buf.buf[:], sse, _sse_on_send) } From 622008dc27b742060ab2478cbd329eafbe8db1bc Mon Sep 17 00:00:00 2001 From: Laytan Date: Tue, 31 Oct 2023 20:02:21 +0100 Subject: [PATCH 07/16] server-sent-events: use nbio.send_all --- sse.odin | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/sse.odin b/sse.odin index 758a469..d44913d 100644 --- a/sse.odin +++ b/sse.odin @@ -105,22 +105,11 @@ sse_start :: proc(sse: ^Sse) { return } - res := &sse.r._conn.loop.inflight.(Response_Inflight) - - res.sent += n - if len(res.buf) != res.sent { - nbio.send(&td.io, sse.r._conn.socket, res.buf[res.sent:], sse, on_start_send) - return - } - _sse_process(sse) } buf := bytes.buffer_to_bytes(&sse.r._buf) - sse.r._conn.loop.inflight = Response_Inflight { - buf = buf, - } - nbio.send(&td.io, sse.r._conn.socket, buf, sse, on_start_send) + nbio.send_all(&td.io, sse.r._conn.socket, buf, sse, on_start_send) } /* @@ -221,7 +210,7 @@ _sse_process :: proc(sse: ^Sse) { } _sse_event_prepare(sse) - nbio.send(&td.io, sse.r._conn.socket, sse._buf.buf[:], sse, _sse_on_send) + nbio.send_all(&td.io, sse.r._conn.socket, sse._buf.buf[:], sse, _sse_on_send) } _sse_on_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { @@ -234,12 +223,6 @@ _sse_on_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { if sse.state == .Close do return - sse._sent += n - if len(sse._buf.buf) > sse._sent { - nbio.send(&td.io, sse.r._conn.socket, sse._buf.buf[sse._sent:], sse, _sse_on_send) - return - } - queue.pop_front(&sse._events) _sse_process(sse) } From f6517be00bfe6cbaa730b6fa233d5aeddd3eb2fb Mon Sep 17 00:00:00 2001 From: Laytan Laats Date: Wed, 20 Mar 2024 18:51:09 +0100 Subject: [PATCH 08/16] server-sent-events: rebase --- examples/sse/main.odin | 2 +- sse.odin | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/sse/main.odin b/examples/sse/main.odin index e6fd00a..8c735dc 100644 --- a/examples/sse/main.odin +++ b/examples/sse/main.odin @@ -21,7 +21,7 @@ main :: proc() { s: http.Server handler := http.handler( proc(_: ^http.Request, res: ^http.Response) { - res.headers["access-control-allow-origin"] = "*" + http.headers_set_unsafe(&res.headers, "access-control-allow-origin", "*") sse: http.Sse http.sse_init(&sse, res) diff --git a/sse.odin b/sse.odin index d44913d..cc5709f 100644 --- a/sse.odin +++ b/sse.odin @@ -84,7 +84,9 @@ sse_init :: proc( // Set the status and content type if they haven't been changed by the user. if r.status == .Not_Found do r.status = .OK - if "content-type" not_in r.headers do r.headers["content-type"] = "text/event-stream" + if !headers_has_unsafe(r.headers, "content-type") { + headers_set_unsafe(&r.headers, "content-type", "text/event-stream") + } } /* @@ -117,6 +119,8 @@ Queues an event to be sent over the connection. You must call `sse_start` first, this is a no-op when end has been called or an error has occurred. */ sse_event :: proc(sse: ^Sse, ev: Sse_Event, loc := #caller_location) { + assert_has_td(loc) + switch sse.state { case .Starting, .Sending, .Ending, .Idle: queue.push_back(&sse._events, ev) From 7bdd1db616290c2af8688b839c6ad6b3325bc36f Mon Sep 17 00:00:00 2001 From: Laytan Date: Sun, 17 Sep 2023 02:17:15 +0200 Subject: [PATCH 09/16] server sent events start --- examples/sse/main.odin | 51 +++++++++ sse.odin | 235 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 286 insertions(+) create mode 100644 examples/sse/main.odin create mode 100644 sse.odin diff --git a/examples/sse/main.odin b/examples/sse/main.odin new file mode 100644 index 0000000..dc03881 --- /dev/null +++ b/examples/sse/main.odin @@ -0,0 +1,51 @@ +package sse_example + +import "core:fmt" +import "core:log" +import "core:time" +import "core:net" + +import http "../.." +import "../../nbio" + +// Minimal server that listens on 127.0.0.1:8080 and responds to every request with 200 Ok. +main :: proc() { + context.logger = log.create_console_logger(.Debug) + + s: http.Server + + handler := http.handler(proc(_: ^http.Request, res: ^http.Response) { + sse := new(http.Sse) + + http.sse_start(sse, res, rawptr(uintptr(0)), proc(sse: ^http.Sse, err: net.Network_Error) { + log.errorf("sse error: %v", err) + }) + + http.sse_event(sse, {data = "Hello, World!"}) + + tick :: proc(sse: rawptr, now: Maybe(time.Time)) { + sse := cast(^http.Sse)sse + + if sse.state > .Ending do return + + nbio.timeout(&http.td.io, time.Second, sse, tick) + + http.sse_event(sse, { + id = int(uintptr(sse.user_data)), + event = "tick", + data = http.date_string(now.? or_else time.now()), + }) + + if uintptr(sse.user_data) > 10 { + http.sse_end(sse) + } + + sse.user_data = rawptr(uintptr(sse.user_data) + 1) + } + tick(sse, nil) + }) + + http.server_shutdown_on_interrupt(&s) + + fmt.printf("Server stopped: %s", http.listen_and_serve(&s, handler)) +} diff --git a/sse.odin b/sse.odin new file mode 100644 index 0000000..54536ae --- /dev/null +++ b/sse.odin @@ -0,0 +1,235 @@ +package http + +import "core:container/queue" +import "core:net" +import "core:strings" +import "core:bytes" + +import "nbio" + +Sse :: struct { + user_data: rawptr, + on_err: Maybe(Sse_On_Error), + + r: ^Response, + events: queue.Queue(Sse_Event), + state: Sse_State, +} + +Sse_Event :: struct { + event: Maybe(string), + data: Maybe(string), + id: Maybe(int), + retry: Maybe(int), + comment: Maybe(string), + + _buf: strings.Builder, + _sent: int, +} + +Sse_State :: enum { + Uninitialized, + + // The initial HTTP response is being sent over the connection (status code&headers) before + // we can start sending events. + Starting, + + // No events are being sent over the connection but it is ready to. + Idle, + + // An event is being sent over the connection. + Sending, + + // Set to when sse_end is called when there are still events in the queue. + // The events in the queue will be processed and then closed. + Ending, + + // Either done ending or forced ending. + // Every callback will return immediately, nothing else is processed. + Close, +} + +/* +A handler that is called when there is an error or when sse_end is called. +This will always be called in a cycle, and only once. +If this is called after a sse_end call the err is nil. +This is called before the connection is closed. +*/ +Sse_On_Error :: #type proc(sse: ^Sse, err: net.Network_Error) + +/* +Initialize the Sse struct, and start sending the status code and headers. +*/ +sse_start :: proc(sse: ^Sse, r: ^Response, user_data: rawptr = nil, on_error: Maybe(Sse_On_Error) = nil) { + sse.r = r + sse.user_data = user_data + sse.on_err = on_error + sse.state = .Starting + + r.status = .OK + r.headers["cache-control"] = "no-store" + r.headers["content-type"] = "text/event-stream" + _response_write_heading(r, -1) + + // TODO: use other response logic from response_send proc, have a way to send a response without + // actually cleaning up the request, and a way to hook into when that is done. + + on_start_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { + sse := cast(^Sse)sse + + if err != nil { + _sse_err(sse, err) + return + } + + res := &sse.r._conn.loop.inflight.(Response_Inflight) + + res.sent += n + if len(res.buf) != res.sent { + nbio.send(&td.io, sse.r._conn.socket, res.buf[res.sent:], sse, on_start_send) + return + } + + _sse_process(sse) + } + + buf := bytes.buffer_to_bytes(&r._buf) + r._conn.loop.inflight = Response_Inflight { + buf = buf, + } + nbio.send(&td.io, r._conn.socket, buf, sse, on_start_send) +} + +/* +Queues an event to be sent over the connection. +You must call `sse_start` first, this is a no-op when end has been called or an error has occurred. +*/ +sse_event :: proc(sse: ^Sse, ev: Sse_Event, loc := #caller_location) { + switch sse.state { + case .Starting, .Sending, .Ending, .Idle: + queue.push_back(&sse.events, ev) + _sse_event_prepare(queue.peek_back(&sse.events)) + + case .Uninitialized: + panic("sse_start must be called first", loc) + + case .Close: + } + + if sse.state == .Idle { + _sse_process(sse) + } +} + +/* +Ends the event stream without sending all queued events. +*/ +sse_force_end :: proc(sse: ^Sse) { + sse.state = .Close + if cb, ok := sse.on_err.?; ok do cb(sse, nil) + connection_close(sse.r._conn) +} + +/* +Ends the event stream as soon as all queued events are sent. +*/ +sse_end :: proc(sse: ^Sse) { + if sse.state >= .Ending do return + + if sse.state == .Sending { + sse.state = .Ending + return + } + + if cb, ok := sse.on_err.?; ok do cb(sse, nil) + connection_close(sse.r._conn) +} + +_sse_err :: proc(sse: ^Sse, err: net.Network_Error) { + if sse.state >= .Ending do return + + sse.state = .Close + + if cb, ok := sse.on_err.?; ok do cb(sse, err) + connection_close(sse.r._conn) +} + +_sse_process :: proc(sse: ^Sse) { + if sse.state == .Close do return + + if queue.len(sse.events) == 0 { + #partial switch sse.state { + // We have sent all events in the queue, complete the ending if we are. + case .Ending: sse_force_end(sse) + case: sse.state = .Idle + } + return + } + + ev := queue.peek_front(&sse.events) + + #partial switch sse.state { + case .Ending: // noop + case: sse.state = .Sending + } + + nbio.send(&td.io, sse.r._conn.socket, ev._buf.buf[:], sse, _sse_on_send) +} + +_sse_on_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { + sse := cast(^Sse)sse + ev := queue.peek_front(&sse.events) + + if err != nil { + _sse_err(sse, err) + return + } + + if sse.state == .Close do return + + ev._sent += n + if len(ev._buf.buf) > ev._sent { + nbio.send(&td.io, sse.r._conn.socket, ev._buf.buf[ev._sent:], sse, _sse_on_send) + return + } + + queue.pop_front(&sse.events) + _sse_process(sse) +} + +// TODO :doesn't handle multiline values +_sse_event_prepare :: proc(ev: ^Sse_Event) { + b := &ev._buf + + if name, ok := ev.event.?; ok { + strings.write_string(b, "event: ") + strings.write_string(b, name) + strings.write_string(b, "\r\n") + } + + if cmnt, ok := ev.comment.?; ok { + strings.write_string(b, "; ") + strings.write_string(b, cmnt) + strings.write_string(b, "\r\n") + } + + if id, ok := ev.id.?; ok { + strings.write_string(b, "id: ") + strings.write_int(b, id) + strings.write_string(b, "\r\n") + } + + if retry, ok := ev.retry.?; ok { + strings.write_string(b, "retry: ") + strings.write_int(b, retry) + strings.write_string(b, "\r\n") + } + + if data, ok := ev.data.?; ok { + strings.write_string(b, "data: ") + strings.write_string(b, data) + strings.write_string(b, "\r\n") + } + + strings.write_string(b, "\r\n") +} From bc5582f3a6e052bc382a168bc9e2002b8e5296fd Mon Sep 17 00:00:00 2001 From: Laytan Date: Sun, 17 Sep 2023 02:27:53 +0200 Subject: [PATCH 10/16] todos --- sse.odin | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/sse.odin b/sse.odin index 54536ae..b267b6c 100644 --- a/sse.odin +++ b/sse.odin @@ -7,6 +7,14 @@ import "core:bytes" import "nbio" +// TODO: memory management. + +// TODO: browser example, maybe a clock ran on server time (high frequency)? + +// TODO: is there a way to say we are done to the client (in the spec), afaik the browser will always reconnect. + +// TODO: might make sense as its own package (sse). + Sse :: struct { user_data: rawptr, on_err: Maybe(Sse_On_Error), @@ -23,6 +31,7 @@ Sse_Event :: struct { retry: Maybe(int), comment: Maybe(string), + // TODO: put buf on Sse, we only need one because we sent one at a time. _buf: strings.Builder, _sent: int, } @@ -50,8 +59,8 @@ Sse_State :: enum { } /* -A handler that is called when there is an error or when sse_end is called. -This will always be called in a cycle, and only once. +A handler that is called when there is an error (client disconnected for example) or when sse_end is called. +This will always be called in a cycle, and only once, so cleaning up after yourself is easily done here. If this is called after a sse_end call the err is nil. This is called before the connection is closed. */ @@ -67,6 +76,7 @@ sse_start :: proc(sse: ^Sse, r: ^Response, user_data: rawptr = nil, on_error: Ma sse.state = .Starting r.status = .OK + // TODO: do we need this header? r.headers["cache-control"] = "no-store" r.headers["content-type"] = "text/event-stream" _response_write_heading(r, -1) From 38ede7b431b59b82efdcab92e1f71f2c9febc85c Mon Sep 17 00:00:00 2001 From: Laytan Date: Mon, 18 Sep 2023 20:29:30 +0200 Subject: [PATCH 11/16] server sent events: add example with web page, one buffer, memory management --- examples/sse/index.html | 35 +++++++++++ examples/sse/main.odin | 25 ++++---- sse.odin | 129 +++++++++++++++++++++++++--------------- 3 files changed, 131 insertions(+), 58 deletions(-) create mode 100644 examples/sse/index.html diff --git a/examples/sse/index.html b/examples/sse/index.html new file mode 100644 index 0000000..5a422e2 --- /dev/null +++ b/examples/sse/index.html @@ -0,0 +1,35 @@ + + + + SSE + + + + +

Hello, SSE!

+

Time from the server:

+

Other events:

+ + + diff --git a/examples/sse/main.odin b/examples/sse/main.odin index dc03881..e762aaa 100644 --- a/examples/sse/main.odin +++ b/examples/sse/main.odin @@ -15,34 +15,39 @@ main :: proc() { s: http.Server handler := http.handler(proc(_: ^http.Request, res: ^http.Response) { - sse := new(http.Sse) + res.headers["access-control-allow-origin"] = "*" - http.sse_start(sse, res, rawptr(uintptr(0)), proc(sse: ^http.Sse, err: net.Network_Error) { - log.errorf("sse error: %v", err) - }) + sse: http.Sse + http.sse_init(&sse, res) + http.sse_start(&sse) - http.sse_event(sse, {data = "Hello, World!"}) + http.sse_event(&sse, {data = "Hello, World!"}) - tick :: proc(sse: rawptr, now: Maybe(time.Time)) { + tick :: proc(sse: rawptr, now: Maybe(time.Time) = nil) { sse := cast(^http.Sse)sse + i := uintptr(sse.user_data) + + // If you were using a custom allocator: + // the temp_allocator is automatically free'd after the response is sent and the connection is closed. + // if sse.state == .Close do free(sse) if sse.state > .Ending do return nbio.timeout(&http.td.io, time.Second, sse, tick) http.sse_event(sse, { - id = int(uintptr(sse.user_data)), event = "tick", data = http.date_string(now.? or_else time.now()), }) - if uintptr(sse.user_data) > 10 { + // End after a minute. + if i > uintptr(time.Second * 60) { http.sse_end(sse) } - sse.user_data = rawptr(uintptr(sse.user_data) + 1) + sse.user_data = rawptr(i + 1) } - tick(sse, nil) + tick(&sse) }) http.server_shutdown_on_interrupt(&s) diff --git a/sse.odin b/sse.odin index b267b6c..59f18fd 100644 --- a/sse.odin +++ b/sse.odin @@ -1,43 +1,42 @@ package http +import "core:bytes" import "core:container/queue" +import "core:log" import "core:net" import "core:strings" -import "core:bytes" import "nbio" -// TODO: memory management. - -// TODO: browser example, maybe a clock ran on server time (high frequency)? - -// TODO: is there a way to say we are done to the client (in the spec), afaik the browser will always reconnect. - // TODO: might make sense as its own package (sse). +// TODO: shutdown doesn't work. + Sse :: struct { user_data: rawptr, on_err: Maybe(Sse_On_Error), r: ^Response, - events: queue.Queue(Sse_Event), + + // State should be considered read-only by users. state: Sse_State, + + _events: queue.Queue(Sse_Event), + + _buf: strings.Builder, + _sent: int, } Sse_Event :: struct { event: Maybe(string), data: Maybe(string), - id: Maybe(int), + id: Maybe(string), retry: Maybe(int), comment: Maybe(string), - - // TODO: put buf on Sse, we only need one because we sent one at a time. - _buf: strings.Builder, - _sent: int, } Sse_State :: enum { - Uninitialized, + Pre_Start, // The initial HTTP response is being sent over the connection (status code&headers) before // we can start sending events. @@ -67,19 +66,27 @@ This is called before the connection is closed. Sse_On_Error :: #type proc(sse: ^Sse, err: net.Network_Error) /* -Initialize the Sse struct, and start sending the status code and headers. +Initializes an sse struct with the given arguments. */ -sse_start :: proc(sse: ^Sse, r: ^Response, user_data: rawptr = nil, on_error: Maybe(Sse_On_Error) = nil) { +sse_init :: proc(sse: ^Sse, r: ^Response, user_data: rawptr = nil, on_error: Maybe(Sse_On_Error) = nil, allocator := context.temp_allocator) { sse.r = r sse.user_data = user_data sse.on_err = on_error - sse.state = .Starting - r.status = .OK - // TODO: do we need this header? - r.headers["cache-control"] = "no-store" - r.headers["content-type"] = "text/event-stream" - _response_write_heading(r, -1) + queue.init(&sse._events, allocator = allocator) + strings.builder_init(&sse._buf, allocator) + + // Set the status and content type if they haven't been changed by the user. + if r.status == .Not_Found do r.status = .OK + if "content-type" not_in r.headers do r.headers["content-type"] = "text/event-stream" +} + +/* +Start by sending the status code and headers. +*/ +sse_start :: proc(sse: ^Sse) { + sse.state = .Starting + _response_write_heading(sse.r, -1) // TODO: use other response logic from response_send proc, have a way to send a response without // actually cleaning up the request, and a way to hook into when that is done. @@ -103,11 +110,11 @@ sse_start :: proc(sse: ^Sse, r: ^Response, user_data: rawptr = nil, on_error: Ma _sse_process(sse) } - buf := bytes.buffer_to_bytes(&r._buf) - r._conn.loop.inflight = Response_Inflight { + buf := bytes.buffer_to_bytes(&sse.r._buf) + sse.r._conn.loop.inflight = Response_Inflight { buf = buf, } - nbio.send(&td.io, r._conn.socket, buf, sse, on_start_send) + nbio.send(&td.io, sse.r._conn.socket, buf, sse, on_start_send) } /* @@ -117,10 +124,9 @@ You must call `sse_start` first, this is a no-op when end has been called or an sse_event :: proc(sse: ^Sse, ev: Sse_Event, loc := #caller_location) { switch sse.state { case .Starting, .Sending, .Ending, .Idle: - queue.push_back(&sse.events, ev) - _sse_event_prepare(queue.peek_back(&sse.events)) + queue.push_back(&sse._events, ev) - case .Uninitialized: + case .Pre_Start: panic("sse_start must be called first", loc) case .Close: @@ -134,10 +140,12 @@ sse_event :: proc(sse: ^Sse, ev: Sse_Event, loc := #caller_location) { /* Ends the event stream without sending all queued events. */ -sse_force_end :: proc(sse: ^Sse) { +sse_end_force :: proc(sse: ^Sse) { sse.state = .Close - if cb, ok := sse.on_err.?; ok do cb(sse, nil) - connection_close(sse.r._conn) + + _sse_call_on_err(sse, nil) + sse_destroy(sse) + connection_close(sse.r._conn) } /* @@ -151,8 +159,20 @@ sse_end :: proc(sse: ^Sse) { return } - if cb, ok := sse.on_err.?; ok do cb(sse, nil) - connection_close(sse.r._conn) + sse.state = .Close + + _sse_call_on_err(sse, nil) + sse_destroy(sse) + connection_close(sse.r._conn) +} + +/* +Destroys any memory allocated, and if `sse_new` was used, frees the sse struct. +This is usually not a call you need to make, it is automatically called after an error or `sse_end`/`sse_end_force`. +*/ +sse_destroy :: proc(sse: ^Sse) { + strings.builder_destroy(&sse._buf) + queue.destroy(&sse._events) } _sse_err :: proc(sse: ^Sse, err: net.Network_Error) { @@ -160,35 +180,44 @@ _sse_err :: proc(sse: ^Sse, err: net.Network_Error) { sse.state = .Close - if cb, ok := sse.on_err.?; ok do cb(sse, err) - connection_close(sse.r._conn) + _sse_call_on_err(sse, err) + sse_destroy(sse) + connection_close(sse.r._conn) +} + +_sse_call_on_err :: proc(sse: ^Sse, err: net.Network_Error) { + if cb, ok := sse.on_err.?; ok { + cb(sse, err) + } else if err != nil { + // Most likely that the client closed the connection. + log.infof("Server Sent Event error: %v", err) + } } _sse_process :: proc(sse: ^Sse) { if sse.state == .Close do return - if queue.len(sse.events) == 0 { + if queue.len(sse._events) == 0 { #partial switch sse.state { // We have sent all events in the queue, complete the ending if we are. - case .Ending: sse_force_end(sse) + case .Ending: sse_end_force(sse) case: sse.state = .Idle } return } - ev := queue.peek_front(&sse.events) - #partial switch sse.state { case .Ending: // noop case: sse.state = .Sending } - nbio.send(&td.io, sse.r._conn.socket, ev._buf.buf[:], sse, _sse_on_send) + ev := queue.peek_front(&sse._events) + _sse_event_prepare(sse) + nbio.send(&td.io, sse.r._conn.socket, sse._buf.buf[:], sse, _sse_on_send) } _sse_on_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { sse := cast(^Sse)sse - ev := queue.peek_front(&sse.events) if err != nil { _sse_err(sse, err) @@ -197,19 +226,23 @@ _sse_on_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { if sse.state == .Close do return - ev._sent += n - if len(ev._buf.buf) > ev._sent { - nbio.send(&td.io, sse.r._conn.socket, ev._buf.buf[ev._sent:], sse, _sse_on_send) + sse._sent += n + if len(sse._buf.buf) > sse._sent { + nbio.send(&td.io, sse.r._conn.socket, sse._buf.buf[sse._sent:], sse, _sse_on_send) return } - queue.pop_front(&sse.events) + queue.pop_front(&sse._events) _sse_process(sse) } // TODO :doesn't handle multiline values -_sse_event_prepare :: proc(ev: ^Sse_Event) { - b := &ev._buf +_sse_event_prepare :: proc(sse: ^Sse) { + ev := queue.peek_front(&sse._events) + b := &sse._buf + + strings.builder_reset(b) + sse._sent = 0 if name, ok := ev.event.?; ok { strings.write_string(b, "event: ") @@ -225,7 +258,7 @@ _sse_event_prepare :: proc(ev: ^Sse_Event) { if id, ok := ev.id.?; ok { strings.write_string(b, "id: ") - strings.write_int(b, id) + strings.write_string(b, id) strings.write_string(b, "\r\n") } From 58f3c9804c9bfbf17c27d7450ec5696739bd5ce9 Mon Sep 17 00:00:00 2001 From: Laytan Date: Mon, 18 Sep 2023 20:35:08 +0200 Subject: [PATCH 12/16] server sent events: comments --- examples/sse/main.odin | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/examples/sse/main.odin b/examples/sse/main.odin index e762aaa..cdf4239 100644 --- a/examples/sse/main.odin +++ b/examples/sse/main.odin @@ -8,7 +8,13 @@ import "core:net" import http "../.." import "../../nbio" -// Minimal server that listens on 127.0.0.1:8080 and responds to every request with 200 Ok. +/* +Responds to any requests with a 200 OK that starts an event stream (aka server sent events). +The first event sent is a general "Hello, World!", +then it sends one event every second with the current time. + +All this is done without spawning any extra threads by using the underlying nbio (non-blocking IO) package. +*/ main :: proc() { context.logger = log.create_console_logger(.Debug) @@ -33,6 +39,7 @@ main :: proc() { if sse.state > .Ending do return + // Queue next tick. nbio.timeout(&http.td.io, time.Second, sse, tick) http.sse_event(sse, { From b30c16a6df389b7ee865bbcc04b8e3704395d7e3 Mon Sep 17 00:00:00 2001 From: Laytan Date: Mon, 18 Sep 2023 20:39:23 +0200 Subject: [PATCH 13/16] server sent events: formatting --- examples/sse/main.odin | 7 +++--- sse.odin | 57 ++++++++++++++++++++++++------------------ 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/examples/sse/main.odin b/examples/sse/main.odin index cdf4239..e6fd00a 100644 --- a/examples/sse/main.odin +++ b/examples/sse/main.odin @@ -2,8 +2,8 @@ package sse_example import "core:fmt" import "core:log" -import "core:time" import "core:net" +import "core:time" import http "../.." import "../../nbio" @@ -20,7 +20,7 @@ main :: proc() { s: http.Server - handler := http.handler(proc(_: ^http.Request, res: ^http.Response) { + handler := http.handler( proc(_: ^http.Request, res: ^http.Response) { res.headers["access-control-allow-origin"] = "*" sse: http.Sse @@ -55,7 +55,8 @@ main :: proc() { sse.user_data = rawptr(i + 1) } tick(&sse) - }) + }, + ) http.server_shutdown_on_interrupt(&s) diff --git a/sse.odin b/sse.odin index 59f18fd..e4b8b1e 100644 --- a/sse.odin +++ b/sse.odin @@ -18,7 +18,7 @@ Sse :: struct { r: ^Response, - // State should be considered read-only by users. + // State should be considered read-only by users. state: Sse_State, _events: queue.Queue(Sse_Event), @@ -68,16 +68,22 @@ Sse_On_Error :: #type proc(sse: ^Sse, err: net.Network_Error) /* Initializes an sse struct with the given arguments. */ -sse_init :: proc(sse: ^Sse, r: ^Response, user_data: rawptr = nil, on_error: Maybe(Sse_On_Error) = nil, allocator := context.temp_allocator) { - sse.r = r +sse_init :: proc( + sse: ^Sse, + r: ^Response, + user_data: rawptr = nil, + on_error: Maybe(Sse_On_Error) = nil, + allocator := context.temp_allocator, +) { + sse.r = r sse.user_data = user_data - sse.on_err = on_error + sse.on_err = on_error - queue.init(&sse._events, allocator = allocator) - strings.builder_init(&sse._buf, allocator) + queue.init(&sse._events, allocator = allocator) + strings.builder_init(&sse._buf, allocator) - // Set the status and content type if they haven't been changed by the user. - if r.status == .Not_Found do r.status = .OK + // Set the status and content type if they haven't been changed by the user. + if r.status == .Not_Found do r.status = .OK if "content-type" not_in r.headers do r.headers["content-type"] = "text/event-stream" } @@ -144,8 +150,8 @@ sse_end_force :: proc(sse: ^Sse) { sse.state = .Close _sse_call_on_err(sse, nil) - sse_destroy(sse) - connection_close(sse.r._conn) + sse_destroy(sse) + connection_close(sse.r._conn) } /* @@ -162,8 +168,8 @@ sse_end :: proc(sse: ^Sse) { sse.state = .Close _sse_call_on_err(sse, nil) - sse_destroy(sse) - connection_close(sse.r._conn) + sse_destroy(sse) + connection_close(sse.r._conn) } /* @@ -171,8 +177,8 @@ Destroys any memory allocated, and if `sse_new` was used, frees the sse struct. This is usually not a call you need to make, it is automatically called after an error or `sse_end`/`sse_end_force`. */ sse_destroy :: proc(sse: ^Sse) { - strings.builder_destroy(&sse._buf) - queue.destroy(&sse._events) + strings.builder_destroy(&sse._buf) + queue.destroy(&sse._events) } _sse_err :: proc(sse: ^Sse, err: net.Network_Error) { @@ -181,8 +187,8 @@ _sse_err :: proc(sse: ^Sse, err: net.Network_Error) { sse.state = .Close _sse_call_on_err(sse, err) - sse_destroy(sse) - connection_close(sse.r._conn) + sse_destroy(sse) + connection_close(sse.r._conn) } _sse_call_on_err :: proc(sse: ^Sse, err: net.Network_Error) { @@ -200,19 +206,22 @@ _sse_process :: proc(sse: ^Sse) { if queue.len(sse._events) == 0 { #partial switch sse.state { // We have sent all events in the queue, complete the ending if we are. - case .Ending: sse_end_force(sse) - case: sse.state = .Idle + case .Ending: + sse_end_force(sse) + case: + sse.state = .Idle } return } #partial switch sse.state { case .Ending: // noop - case: sse.state = .Sending + case: + sse.state = .Sending } ev := queue.peek_front(&sse._events) - _sse_event_prepare(sse) + _sse_event_prepare(sse) nbio.send(&td.io, sse.r._conn.socket, sse._buf.buf[:], sse, _sse_on_send) } @@ -238,11 +247,11 @@ _sse_on_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { // TODO :doesn't handle multiline values _sse_event_prepare :: proc(sse: ^Sse) { - ev := queue.peek_front(&sse._events) - b := &sse._buf + ev := queue.peek_front(&sse._events) + b := &sse._buf - strings.builder_reset(b) - sse._sent = 0 + strings.builder_reset(b) + sse._sent = 0 if name, ok := ev.event.?; ok { strings.write_string(b, "event: ") From ccad27330c8f6cdcc1503e6793ad20ba3747a82a Mon Sep 17 00:00:00 2001 From: Laytan Date: Mon, 18 Sep 2023 20:41:16 +0200 Subject: [PATCH 14/16] server-sent-events: unused var --- sse.odin | 1 - 1 file changed, 1 deletion(-) diff --git a/sse.odin b/sse.odin index e4b8b1e..758a469 100644 --- a/sse.odin +++ b/sse.odin @@ -220,7 +220,6 @@ _sse_process :: proc(sse: ^Sse) { sse.state = .Sending } - ev := queue.peek_front(&sse._events) _sse_event_prepare(sse) nbio.send(&td.io, sse.r._conn.socket, sse._buf.buf[:], sse, _sse_on_send) } From 23db38c30726eb2fa545449fae55c466b5c84e8c Mon Sep 17 00:00:00 2001 From: Laytan Date: Tue, 31 Oct 2023 20:02:21 +0100 Subject: [PATCH 15/16] server-sent-events: use nbio.send_all --- sse.odin | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/sse.odin b/sse.odin index 758a469..d44913d 100644 --- a/sse.odin +++ b/sse.odin @@ -105,22 +105,11 @@ sse_start :: proc(sse: ^Sse) { return } - res := &sse.r._conn.loop.inflight.(Response_Inflight) - - res.sent += n - if len(res.buf) != res.sent { - nbio.send(&td.io, sse.r._conn.socket, res.buf[res.sent:], sse, on_start_send) - return - } - _sse_process(sse) } buf := bytes.buffer_to_bytes(&sse.r._buf) - sse.r._conn.loop.inflight = Response_Inflight { - buf = buf, - } - nbio.send(&td.io, sse.r._conn.socket, buf, sse, on_start_send) + nbio.send_all(&td.io, sse.r._conn.socket, buf, sse, on_start_send) } /* @@ -221,7 +210,7 @@ _sse_process :: proc(sse: ^Sse) { } _sse_event_prepare(sse) - nbio.send(&td.io, sse.r._conn.socket, sse._buf.buf[:], sse, _sse_on_send) + nbio.send_all(&td.io, sse.r._conn.socket, sse._buf.buf[:], sse, _sse_on_send) } _sse_on_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { @@ -234,12 +223,6 @@ _sse_on_send :: proc(sse: rawptr, n: int, err: net.Network_Error) { if sse.state == .Close do return - sse._sent += n - if len(sse._buf.buf) > sse._sent { - nbio.send(&td.io, sse.r._conn.socket, sse._buf.buf[sse._sent:], sse, _sse_on_send) - return - } - queue.pop_front(&sse._events) _sse_process(sse) } From c4f038bd6f8abeae4a4607c59f596591b105a924 Mon Sep 17 00:00:00 2001 From: Laytan Laats Date: Wed, 20 Mar 2024 18:51:09 +0100 Subject: [PATCH 16/16] server-sent-events: rebase --- examples/sse/main.odin | 2 +- sse.odin | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/sse/main.odin b/examples/sse/main.odin index e6fd00a..8c735dc 100644 --- a/examples/sse/main.odin +++ b/examples/sse/main.odin @@ -21,7 +21,7 @@ main :: proc() { s: http.Server handler := http.handler( proc(_: ^http.Request, res: ^http.Response) { - res.headers["access-control-allow-origin"] = "*" + http.headers_set_unsafe(&res.headers, "access-control-allow-origin", "*") sse: http.Sse http.sse_init(&sse, res) diff --git a/sse.odin b/sse.odin index d44913d..cc5709f 100644 --- a/sse.odin +++ b/sse.odin @@ -84,7 +84,9 @@ sse_init :: proc( // Set the status and content type if they haven't been changed by the user. if r.status == .Not_Found do r.status = .OK - if "content-type" not_in r.headers do r.headers["content-type"] = "text/event-stream" + if !headers_has_unsafe(r.headers, "content-type") { + headers_set_unsafe(&r.headers, "content-type", "text/event-stream") + } } /* @@ -117,6 +119,8 @@ Queues an event to be sent over the connection. You must call `sse_start` first, this is a no-op when end has been called or an error has occurred. */ sse_event :: proc(sse: ^Sse, ev: Sse_Event, loc := #caller_location) { + assert_has_td(loc) + switch sse.state { case .Starting, .Sending, .Ending, .Idle: queue.push_back(&sse._events, ev)