Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/echo_ssl/gleam.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ version = "0.1.0"
# links = [{ title = "Website", href = "https://gleam.run" }]

[dependencies]
gleam_stdlib = "~> 0.36"
glisten = { path = "../../" }
gleam_erlang = "~> 0.24"
gleam_otp = "~> 0.9"
gleam_stdlib = ">= 0.68.1 and < 1.0.0"
gleam_erlang = ">= 1.3.0 and < 2.0.0"
gleam_otp = ">= 1.2.0 and < 2.0.0"
logging = ">= 1.3.0 and < 2.0.0"

[dev-dependencies]
Expand Down
17 changes: 8 additions & 9 deletions examples/echo_ssl/manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@
# You typically do not need to edit this file

packages = [
{ name = "gleam_erlang", version = "1.0.0-rc1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "6E0CF4E1F66E2C9226B7554589544F00F12CE14858440EB1BF7EFDACDE1BBC64" },
{ name = "gleam_otp", version = "1.0.0-rc2", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "27138425316D9AACC4881CE56A8F2BDE98AC9FB669755BEF712F37CB79090EAF" },
{ name = "gleam_stdlib", version = "0.60.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "621D600BB134BC239CB2537630899817B1A42E60A1D46C5E9F3FAE39F88C800B" },
{ name = "gleeunit", version = "1.5.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "D33B7736CF0766ED3065F64A1EBB351E72B2E8DE39BAFC8ADA0E35E92A6A934F" },
{ name = "glisten", version = "8.0.0-rc1", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_otp", "gleam_stdlib", "logging", "telemetry"], source = "local", path = "../.." },
{ name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" },
{ name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" },
{ name = "gleam_stdlib", version = "0.68.1", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "F7FAEBD8EF260664E86A46C8DBA23508D1D11BB3BCC6EE1B89B3BC3E5C83FF1E" },
{ name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" },
{ name = "glisten", version = "9.0.0-rc1", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_otp", "gleam_stdlib", "logging"], source = "local", path = "../.." },
{ name = "logging", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "logging", source = "hex", outer_checksum = "1098FBF10B54B44C2C7FDF0B01C1253CAFACDACABEFB4B0D027803246753E06D" },
{ name = "telemetry", version = "1.3.0", build_tools = ["rebar3"], requirements = [], otp_app = "telemetry", source = "hex", outer_checksum = "7015FC8919DBE63764F4B4B87A95B7C0996BD539E0D499BE6EC9D7F3875B79E6" },
]

[requirements]
gleam_erlang = { version = "~> 0.24" }
gleam_otp = { version = "~> 0.9" }
gleam_stdlib = { version = "~> 0.36" }
gleam_erlang = { version = ">= 1.3.0 and < 2.0.0" }
gleam_otp = { version = ">= 1.2.0 and < 2.0.0" }
gleam_stdlib = { version = ">= 0.68.1 and < 1.0.0" }
gleeunit = { version = "~> 1.0" }
glisten = { path = "../../" }
logging = { version = ">= 1.3.0 and < 2.0.0" }
3 changes: 2 additions & 1 deletion examples/echo_ssl/src/echo_server.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ pub fn main() {
glisten.continue(state)
})
|> glisten.with_tls(certfile: "localhost.crt", keyfile: "localhost.key")
|> glisten.start_with_listener_name(0, listener_name)
|> glisten.with_listener_name(listener_name)
|> glisten.start(0)

let info = glisten.get_server_info(listener_name, 5000)

Expand Down
19 changes: 19 additions & 0 deletions src/glisten.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ pub opaque type Builder(state, user_message) {
factory.Message(Socket, Subject(handler.Message(user_message))),
),
),
active_state: options.ActiveState,
)
}

Expand Down Expand Up @@ -288,6 +289,7 @@ pub fn new(
tls_options: None,
listener_name: None,
connection_factory_name: None,
active_state: options.Once,
)
}

Expand Down Expand Up @@ -354,6 +356,22 @@ pub fn with_tls(
Builder(..builder, tls_options: Some(options.CertKeyFiles(cert, key)))
}

/// Set the server's `ActiveState` for flow control of received packets.
/// Default is `Once`. Allowed are `Once`, `Active` and `Count(n)` where n > 1.
pub fn with_active_state(
builder: Builder(state, user_message),
active_state: options.ActiveState,
) -> Builder(state, user_message) {
case active_state {
options.Once | options.Active ->
Builder(..builder, active_state: active_state)
options.Count(n) if n > 1 -> Builder(..builder, active_state: active_state)
options.Count(_) -> panic as "Count shall be greater than 1"
options.Passive ->
panic as "You cannot set the server's `ActiveState` to `Passive`"
}
}

@internal
pub fn with_listener_name(
builder: Builder(state, user_message),
Expand Down Expand Up @@ -412,6 +430,7 @@ pub fn start(
on_init: convert_on_init(builder.on_init),
on_close: builder.on_close,
transport:,
active_state: builder.active_state,
)
|> acceptor.start_pool(transport, port, options, listener_name)
}
Expand Down
2 changes: 2 additions & 0 deletions src/glisten/internal/acceptor.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub type Pool(data, user_message) {
#(data, Option(Selector(user_message))),
on_close: Option(fn(data) -> Nil),
transport: Transport,
active_state: options.ActiveState,
)
}

Expand Down Expand Up @@ -149,6 +150,7 @@ pub fn start_pool(
on_init: pool.on_init,
on_close: pool.on_close,
transport: pool.transport,
active_state: pool.active_state,
))
})
|> factory.named(pool.name)
Expand Down
84 changes: 56 additions & 28 deletions src/glisten/internal/handler.gleam
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import gleam/dynamic.{type Dynamic}
import gleam/dynamic/decode
import gleam/erlang/atom
import gleam/erlang/process.{type Selector, type Subject}
import gleam/option.{type Option, None, Some}
import gleam/otp/actor
import gleam/result
import gleam/string
import glisten/socket.{type Socket}
import glisten/socket/options.{type IpAddress}
import glisten/socket.{type Socket, type SocketReason}
import glisten/socket/options.{type ActiveState, type IpAddress}
import glisten/transport.{type Transport}
import logging

Expand All @@ -20,8 +19,9 @@ pub type InternalMessage {
Close
Ready
ReceiveMessage(BitArray)
SslClosed
TcpClosed
Closed
Passive
SocketError(SocketReason)
}

pub type Message(user_message) {
Expand All @@ -44,6 +44,7 @@ pub type LoopState(state, user_message) {
sender: Subject(Message(user_message)),
transport: Transport,
state: state,
active_state: ActiveState,
)
}

Expand Down Expand Up @@ -96,6 +97,7 @@ pub type Handler(state, user_message) {
#(state, Option(Selector(user_message))),
on_close: Option(fn(state) -> Nil),
transport: Transport,
active_state: ActiveState,
)
}

Expand All @@ -119,26 +121,24 @@ pub fn start(
let selector =
process.new_selector()
|> process.select_record(atom.create("tcp"), 2, fn(record) {
{
use data <- decode.field(2, decode.bit_array)
decode.success(ReceiveMessage(data))
}
|> decode.run(record, _)
|> result.unwrap(ReceiveMessage(<<>>))
ReceiveMessage(socket_data(record))
})
|> process.select_record(atom.create("ssl"), 2, fn(record) {
{
use data <- decode.field(2, decode.bit_array)
decode.success(ReceiveMessage(data))
}
|> decode.run(record, _)
|> result.unwrap(ReceiveMessage(<<>>))
ReceiveMessage(socket_data(record))
})
|> process.select_record(atom.create("ssl_closed"), 1, fn(_nil) { Closed })
|> process.select_record(atom.create("tcp_closed"), 1, fn(_nil) { Closed })
|> process.select_record(atom.create("ssl_passive"), 1, fn(_nil) {
Passive
})
|> process.select_record(atom.create("ssl_closed"), 1, fn(_nil) {
SslClosed
|> process.select_record(atom.create("tcp_passive"), 1, fn(_nil) {
Passive
})
|> process.select_record(atom.create("tcp_closed"), 1, fn(_nil) {
TcpClosed
|> process.select_record(atom.create("tcp_error"), 2, fn(record) {
SocketError(socket_error(record))
})
|> process.select_record(atom.create("ssl_error"), 2, fn(record) {
SocketError(socket_error(record))
})
|> process.map_selector(Internal)
|> process.merge_selector(base_selector)
Expand All @@ -155,6 +155,7 @@ pub fn start(
sender: subject,
transport: handler.transport,
state: initial_state,
active_state: handler.active_state,
)
|> actor.initialised()
|> actor.selecting(selector)
Expand All @@ -170,7 +171,7 @@ pub fn start(
sender: state.sender,
)
case msg {
Internal(TcpClosed) | Internal(SslClosed) | Internal(Close) ->
Internal(Closed) | Internal(Close) ->
case transport.close(state.transport, state.socket) {
Ok(Nil) -> {
let _ = case handler.on_close {
Expand All @@ -194,8 +195,10 @@ pub fn start(
logging.log(logging.Warning, err)
}
}

let options = [options.ActiveMode(options.Once)]
// Note that the active_state must set to Passive at start of
// Listener/Accept and not changed until the Ready message is
// received.
let options = [options.ActiveMode(state.active_state)]
case transport.set_opts(state.transport, state.socket, options) {
Ok(_) -> actor.continue(state)
Error(_) -> actor.stop_abnormal("Failed to set socket active")
Expand All @@ -206,16 +209,20 @@ pub fn start(
let msg = Custom(msg)
let res = rescue(fn() { handler.loop(state.state, msg, connection) })
case res {
Ok(Continue(next_state, _selector)) -> {
Ok(Continue(next_state, _selector))
if state.active_state == options.Once
-> {
case
transport.set_opts(state.transport, state.socket, [
options.ActiveMode(options.Once),
])
{
Ok(Nil) -> actor.continue(LoopState(..state, state: next_state))
Error(Nil) -> actor.stop()
Error(_) -> actor.stop()
}
}
Ok(Continue(next_state, _selector)) ->
actor.continue(LoopState(..state, state: next_state))
Ok(NormalStop) -> actor.stop()
Ok(AbnormalStop(reason)) -> actor.stop_abnormal(reason)
Error(reason) -> {
Expand All @@ -231,16 +238,20 @@ pub fn start(
let msg = Packet(msg)
let res = rescue(fn() { handler.loop(state.state, msg, connection) })
case res {
Ok(Continue(next_state, _selector)) -> {
Ok(Continue(next_state, _selector))
if state.active_state == options.Once
-> {
case
transport.set_opts(state.transport, state.socket, [
options.ActiveMode(options.Once),
])
{
Ok(Nil) -> actor.continue(LoopState(..state, state: next_state))
Error(Nil) -> actor.stop()
Error(_) -> actor.stop()
}
}
Ok(Continue(next_state, _selector)) ->
actor.continue(LoopState(..state, state: next_state))
Ok(NormalStop) -> actor.stop()
Ok(AbnormalStop(reason)) -> actor.stop_abnormal(reason)
Error(reason) -> {
Expand All @@ -252,7 +263,24 @@ pub fn start(
}
}
}
Internal(Passive) -> {
let options = [
options.ActiveMode(state.active_state),
]
case transport.set_opts(state.transport, state.socket, options) {
Ok(_) -> actor.continue(state)
Error(_) -> actor.stop_abnormal("Failed to set socket active")
}
}
Internal(SocketError(reason)) ->
actor.stop_abnormal("Received socket error " <> string.inspect(reason))
}
})
|> actor.start()
}

@external(erlang, "glisten_ffi", "socket_data")
fn socket_data(record: Dynamic) -> BitArray

@external(erlang, "glisten_ffi", "socket_data")
fn socket_error(record: Dynamic) -> SocketReason
Loading