Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary tcp support #15

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion defnet/http_server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ function M.create(port)

local request_handlers = {}

local ss = tcp_server.create(port, function() end)
local options = { binary = false }
local on_data = function() end
local on_client_connected = function() end
local on_client_disconnected = function() end
local ss = tcp_server.create(port, on_data, on_client_connected, on_client_disconnected, options)

-- Replace the underlying socket server's receive function
-- Read lines until end of request
Expand Down
32 changes: 14 additions & 18 deletions defnet/tcp_client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
-- self.client.send("Sending this to the server\n")
-- end

local tcp_send_queue = require "defnet.tcp_send_queue"
local tcp_data_queue = require "defnet.tcp_data_queue"

local M = {}

Expand All @@ -40,7 +40,7 @@ local log = function(...) M.log(...) end
-- @param server_port
-- @param on_data Function to call when data is received from the server
-- @param on_disconnect Function to call when the connection to the server ends
-- @param options Table with options (keys: connection_timeout (s))
-- @param options Table with options (keys: connection_timeout (s), binary, chunk_size)
-- @return client
-- @return error
function M.create(server_ip, server_port, on_data, on_disconnect, options)
Expand All @@ -51,24 +51,26 @@ function M.create(server_ip, server_port, on_data, on_disconnect, options)

log("Creating TCP client", server_ip, server_port)

local client = {
pattern = "*l",
}
local client = {}

local client_socket = nil
local send_queue = nil
local data_queue = nil
local client_socket_table = nil
local connection_timeout = options and options.connection_timeout or nil
local data_queue_options = {
chunk_size = options and options.chunk_size or M.TCP_SEND_CHUNK_SIZE,
binary = options and options.binary or false,
}

local ok, err = pcall(function()
client_socket = socket.tcp()
assert(client_socket:settimeout(connection_timeout))
assert(client_socket:connect(server_ip, server_port))
assert(client_socket:settimeout(0))
client_socket_table = { client_socket }
send_queue = tcp_send_queue.create(client_socket, M.TCP_SEND_CHUNK_SIZE)
data_queue = tcp_data_queue.create(client_socket, data_queue_options)
end)
if not ok or not client_socket or not send_queue then
if not ok or not client_socket or not data_queue then
log("tcp_client.create() error", err)
return nil, ("Unable to connect to %s:%d"):format(server_ip, server_port)
end
Expand All @@ -82,11 +84,9 @@ function M.create(server_ip, server_port, on_data, on_disconnect, options)
end

client.send = function(data)
send_queue.add(data)
data_queue.add(data)
end

local loaded_data = ""

client.update = function()
if not client_socket then
return
Expand All @@ -96,7 +96,7 @@ function M.create(server_ip, server_port, on_data, on_disconnect, options)
local receivet, sendt = socket.select(client_socket_table, client_socket_table, 0)

if sendt[client_socket] then
local ok, err = send_queue.send()
local ok, err = data_queue.send()
if not ok and err == "closed" then
client.destroy()
on_disconnect()
Expand All @@ -106,13 +106,9 @@ function M.create(server_ip, server_port, on_data, on_disconnect, options)

if receivet[client_socket] then
while client_socket do
local data, err, partial = client_socket:receive(client.pattern or "*l")
if partial then
loaded_data = loaded_data..partial
end
local data, err = data_queue.receive()
if data then
local response = on_data(loaded_data..data)
loaded_data = ""
local response = on_data(data)
if response then
client.send(response)
end
Expand Down
148 changes: 148 additions & 0 deletions defnet/tcp_data_queue.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
local M = {}

local BxFF000000 = bit.lshift(255, 24)
local Bx00FF0000 = bit.lshift(255, 16)
local Bx0000FF00 = bit.lshift(255, 8)
local Bx000000FF = bit.lshift(255, 0)

local function encode_size(data)
local length = #data
-- split length into four bytes
local b1 = bit.rshift(bit.band(length, BxFF000000), 24)
local b2 = bit.rshift(bit.band(length, Bx00FF0000), 16)
local b3 = bit.rshift(bit.band(length, Bx0000FF00), 8)
local b4 = bit.rshift(bit.band(length, Bx000000FF), 0)
-- convert the four bytes to a string
return string.char(b1, b2, b3, b4)
end

local function decode_size(data)
local b1, b2, b3, b4 = data:byte(1, 4)
return bit.lshift(b1, 24) + bit.lshift(b2, 16) + bit.lshift(b3, 8) + b4
end

--- Create a TCP data queue instance
-- @param client The TCP client used when sending and receiving data
-- @param options Table with data options. Accepted options:
-- * chunk_size - The maximum size of any data that will be
-- sent. Defaults to 10000. If data is added that is larger than this value
-- it will be split into multiple "chunks". Note that there is no guarantee
-- that all data in a chunk is sent in a single call. Individual chunks may
-- still be split into multiple TCP send calls.
-- * binary - Data that is received and sent will be prefixed with the data
-- length. Use this mode when working with binary data (including 0x00).
-- @return The created data queue instance
function M.create(client, options)
assert(client, "You must provide a TCP client")

local chunk_size = options and options.chunk_size or 10000
local binary = options and options.binary or false

local instance = {}

local send_queue = {}
local received_data = ""
local received_data_size = nil

function instance.clear()
send_queue = {}
end

function instance.add(data)
assert(data, "You must provide some data")
if binary then
data = encode_size(data) .. data
else
data = data
end
for i=1,#data,chunk_size do
table.insert(send_queue, { data = data:sub(i, i + chunk_size - 1), sent_index = 0 })
end
end

function instance.send()
while true do
local first = send_queue[1]
if not first then
return true
end

local sent_index, err, sent_index_on_err = client:send(first.data, first.sent_index + 1, #first.data)
if err then
first.sent_index = sent_index_on_err
return false, err
end

first.sent_index = sent_index
if first.sent_index == #first.data then
table.remove(send_queue, 1)
end
end
end


local function receive_binary()
-- calculate number of bytes to receive
-- 1) size: read 4 bytes to get the size of the data
-- 2) data: read the data itself
local n = nil
if not received_data_size then
n = 4 - #received_data
else
n = received_data_size - #received_data
end

-- receive some bytes (partially or all)
local data, err, partial = client:receive(n)
if partial then
received_data = received_data .. partial
end

-- exepected number of bytes received
if data then
data = received_data .. data
received_data = ""
if not received_data_size then
received_data_size = decode_size(data)
else
received_data_size = nil
return data, nil
end
elseif err == "closed" then
return nil, err
end
return nil, nil
end

local function receive_lines()
-- receive some bytes (partially or all)
local data, err, partial = client:receive("*l")
if partial then
received_data = received_data .. partial
end

-- all bytes received?
if data then
print("received ALL data")
data = received_data .. data
received_data = ""
return data, nil
elseif err == "closed" then
return nil, err
end
return nil, nil
end

function instance.receive()
if binary then
return receive_binary()
else
return receive_lines()
end
end

return instance
end


return M
55 changes: 0 additions & 55 deletions defnet/tcp_send_queue.lua

This file was deleted.

17 changes: 11 additions & 6 deletions defnet/tcp_server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
-- end
--

local tcp_send_queue = require "defnet.tcp_send_queue"
local tcp_data_queue = require "defnet.tcp_data_queue"
local socket = require "builtins.scripts.socket"

local M = {}
Expand All @@ -51,8 +51,9 @@ local log = function(...) M.log(...) end
-- connected. The function will be called with the following args: ip
-- @param on_client_disconnected Function to call when a client has
-- disconnected. The function will be called with the following args: ip
-- @param options Table with options (keys: binary, chunk_size)
-- @return Server instance
function M.create(port, on_data, on_client_connected, on_client_disconnected)
function M.create(port, on_data, on_client_connected, on_client_disconnected, options)
assert(port, "You must provide a port")
assert(on_data, "You must provide an on_data function")

Expand All @@ -65,6 +66,10 @@ function M.create(port, on_data, on_client_connected, on_client_disconnected)

local clients = {}
local queues = {}
local queue_options = {
chunk_size = options and options.chunk_size or M.TCP_SEND_CHUNK_SIZE,
binary = options and options.binary or false,
}

local function remove_client(connection_to_remove)
for i,connection in pairs(clients) do
Expand Down Expand Up @@ -118,11 +123,11 @@ function M.create(port, on_data, on_client_connected, on_client_disconnected)
client:close()
end
end

server.receive = function(client)
return client:receive("*l")
return queues[client].receive()
end

server.broadcast = function(data)
log("Broadcasting")
for client,queue in pairs(queues) do
Expand Down Expand Up @@ -150,7 +155,7 @@ function M.create(port, on_data, on_client_connected, on_client_disconnected)
if client then
client:settimeout(0)
table.insert(clients, client)
queues[client] = tcp_send_queue.create(client, M.TCP_SEND_CHUNK_SIZE)
queues[client] = tcp_data_queue.create(client, queue_options)
if on_client_connected then
local client_ip, client_port = client:getpeername()
on_client_connected(client_ip, client_port, client)
Expand Down
Loading