Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Server Sent Events #28

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions examples/sse/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<!DOCTYPE html>
<html lang="en">
<head>
<title>SSE</title>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
</head>
<body>
<h1>Hello, SSE!</h1>
<p>Time from the server: <span id="time"></span></p>
<p>Other events: <span id="other"></span></p>
<script>
const evtSource = new EventSource("http://localhost:8080");

const time = document.getElementById("time");
const other = document.getElementById("other");

evtSource.onmessage = (event) => {
console.log(event);
other.innerHTML = event.data;
};

evtSource.onerror = (event) => {
console.warn(event);
other.innerHTML = "error";
};

evtSource.addEventListener("tick", (event) => {
console.log(event);
time.innerHTML = event.data;
});

</script>
</body>
</html>
64 changes: 64 additions & 0 deletions examples/sse/main.odin
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package sse_example

import "core:fmt"
import "core:log"
import "core:net"
import "core:time"

import http "../.."
import "../../nbio"

/*
Responds to any requests with a 200 OK that starts an event stream (aka server sent events).
The first event sent is a general "Hello, World!",
then it sends one event every second with the current time.

All this is done without spawning any extra threads by using the underlying nbio (non-blocking IO) package.
*/
main :: proc() {
context.logger = log.create_console_logger(.Debug)

s: http.Server

handler := http.handler( proc(_: ^http.Request, res: ^http.Response) {
http.headers_set_unsafe(&res.headers, "access-control-allow-origin", "*")

sse: http.Sse
http.sse_init(&sse, res)
http.sse_start(&sse)

http.sse_event(&sse, {data = "Hello, World!"})

tick :: proc(sse: rawptr, now: Maybe(time.Time) = nil) {
sse := cast(^http.Sse)sse
i := uintptr(sse.user_data)

// If you were using a custom allocator:
// the temp_allocator is automatically free'd after the response is sent and the connection is closed.
// if sse.state == .Close do free(sse)

if sse.state > .Ending do return

// Queue next tick.
nbio.timeout(&http.td.io, time.Second, sse, tick)

http.sse_event(sse, {
event = "tick",
data = http.date_string(now.? or_else time.now()),
})

// End after a minute.
if i > uintptr(time.Second * 60) {
http.sse_end(sse)
}

sse.user_data = rawptr(i + 1)
}
tick(&sse)
},
)

http.server_shutdown_on_interrupt(&s)

fmt.printf("Server stopped: %s", http.listen_and_serve(&s, handler))
}
273 changes: 273 additions & 0 deletions sse.odin
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
package http

import "core:bytes"
import "core:container/queue"
import "core:log"
import "core:net"
import "core:strings"

import "nbio"

// TODO: might make sense as its own package (sse).

// TODO: shutdown doesn't work.

Sse :: struct {
user_data: rawptr,
on_err: Maybe(Sse_On_Error),

r: ^Response,

// State should be considered read-only by users.
state: Sse_State,

_events: queue.Queue(Sse_Event),

_buf: strings.Builder,
_sent: int,
}

Sse_Event :: struct {
event: Maybe(string),
data: Maybe(string),
id: Maybe(string),
retry: Maybe(int),
comment: Maybe(string),
}

Sse_State :: enum {
Pre_Start,

// The initial HTTP response is being sent over the connection (status code&headers) before
// we can start sending events.
Starting,

// No events are being sent over the connection but it is ready to.
Idle,

// An event is being sent over the connection.
Sending,

// Set to when sse_end is called when there are still events in the queue.
// The events in the queue will be processed and then closed.
Ending,

// Either done ending or forced ending.
// Every callback will return immediately, nothing else is processed.
Close,
}

/*
A handler that is called when there is an error (client disconnected for example) or when sse_end is called.
This will always be called in a cycle, and only once, so cleaning up after yourself is easily done here.
If this is called after a sse_end call the err is nil.
This is called before the connection is closed.
*/
Sse_On_Error :: #type proc(sse: ^Sse, err: net.Network_Error)

/*
Initializes an sse struct with the given arguments.
*/
sse_init :: proc(
sse: ^Sse,
r: ^Response,
user_data: rawptr = nil,
on_error: Maybe(Sse_On_Error) = nil,
allocator := context.temp_allocator,
) {
sse.r = r
sse.user_data = user_data
sse.on_err = on_error

queue.init(&sse._events, allocator = allocator)
strings.builder_init(&sse._buf, allocator)

// Set the status and content type if they haven't been changed by the user.
if r.status == .Not_Found do r.status = .OK
if !headers_has_unsafe(r.headers, "content-type") {
headers_set_unsafe(&r.headers, "content-type", "text/event-stream")
}
}

/*
Start by sending the status code and headers.
*/
sse_start :: proc(sse: ^Sse) {
sse.state = .Starting
_response_write_heading(sse.r, -1)

// TODO: use other response logic from response_send proc, have a way to send a response without
// actually cleaning up the request, and a way to hook into when that is done.

on_start_send :: proc(sse: rawptr, n: int, err: net.Network_Error) {
sse := cast(^Sse)sse

if err != nil {
_sse_err(sse, err)
return
}

_sse_process(sse)
}

buf := bytes.buffer_to_bytes(&sse.r._buf)
nbio.send_all(&td.io, sse.r._conn.socket, buf, sse, on_start_send)
}

/*
Queues an event to be sent over the connection.
You must call `sse_start` first, this is a no-op when end has been called or an error has occurred.
*/
sse_event :: proc(sse: ^Sse, ev: Sse_Event, loc := #caller_location) {
assert_has_td(loc)

switch sse.state {
case .Starting, .Sending, .Ending, .Idle:
queue.push_back(&sse._events, ev)

case .Pre_Start:
panic("sse_start must be called first", loc)

case .Close:
}

if sse.state == .Idle {
_sse_process(sse)
}
}

/*
Ends the event stream without sending all queued events.
*/
sse_end_force :: proc(sse: ^Sse) {
sse.state = .Close

_sse_call_on_err(sse, nil)
sse_destroy(sse)
connection_close(sse.r._conn)
}

/*
Ends the event stream as soon as all queued events are sent.
*/
sse_end :: proc(sse: ^Sse) {
if sse.state >= .Ending do return

if sse.state == .Sending {
sse.state = .Ending
return
}

sse.state = .Close

_sse_call_on_err(sse, nil)
sse_destroy(sse)
connection_close(sse.r._conn)
}

/*
Destroys any memory allocated, and if `sse_new` was used, frees the sse struct.
This is usually not a call you need to make, it is automatically called after an error or `sse_end`/`sse_end_force`.
*/
sse_destroy :: proc(sse: ^Sse) {
strings.builder_destroy(&sse._buf)
queue.destroy(&sse._events)
}

_sse_err :: proc(sse: ^Sse, err: net.Network_Error) {
if sse.state >= .Ending do return

sse.state = .Close

_sse_call_on_err(sse, err)
sse_destroy(sse)
connection_close(sse.r._conn)
}

_sse_call_on_err :: proc(sse: ^Sse, err: net.Network_Error) {
if cb, ok := sse.on_err.?; ok {
cb(sse, err)
} else if err != nil {
// Most likely that the client closed the connection.
log.infof("Server Sent Event error: %v", err)
}
}

_sse_process :: proc(sse: ^Sse) {
if sse.state == .Close do return

if queue.len(sse._events) == 0 {
#partial switch sse.state {
// We have sent all events in the queue, complete the ending if we are.
case .Ending:
sse_end_force(sse)
case:
sse.state = .Idle
}
return
}

#partial switch sse.state {
case .Ending: // noop
case:
sse.state = .Sending
}

_sse_event_prepare(sse)
nbio.send_all(&td.io, sse.r._conn.socket, sse._buf.buf[:], sse, _sse_on_send)
}

_sse_on_send :: proc(sse: rawptr, n: int, err: net.Network_Error) {
sse := cast(^Sse)sse

if err != nil {
_sse_err(sse, err)
return
}

if sse.state == .Close do return

queue.pop_front(&sse._events)
_sse_process(sse)
}

// TODO :doesn't handle multiline values
_sse_event_prepare :: proc(sse: ^Sse) {
ev := queue.peek_front(&sse._events)
b := &sse._buf

strings.builder_reset(b)
sse._sent = 0

if name, ok := ev.event.?; ok {
strings.write_string(b, "event: ")
strings.write_string(b, name)
strings.write_string(b, "\r\n")
}

if cmnt, ok := ev.comment.?; ok {
strings.write_string(b, "; ")
strings.write_string(b, cmnt)
strings.write_string(b, "\r\n")
}

if id, ok := ev.id.?; ok {
strings.write_string(b, "id: ")
strings.write_string(b, id)
strings.write_string(b, "\r\n")
}

if retry, ok := ev.retry.?; ok {
strings.write_string(b, "retry: ")
strings.write_int(b, retry)
strings.write_string(b, "\r\n")
}

if data, ok := ev.data.?; ok {
strings.write_string(b, "data: ")
strings.write_string(b, data)
strings.write_string(b, "\r\n")
}

strings.write_string(b, "\r\n")
}