Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement batched messages for Digiline Chests #87

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/chest.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,24 @@ The message is sent when user takes `<stack>` from `<output slot>` in the chest.
```
The message is sent when user puts `<stack>` to the chest to `<input slot>`

----------------------------

```
{
action = "batch",
messages = <messages>
}
```
The message contains an array of other messages (`<messages>`) which were emitted within a very short interval, and for this reason merged into a batch and sent as a single message. This is needed to prevent burning of connected Lua controllers in case of a large amount of messages within a very short time. This can happen in case of using the feature which allows to move all items of the same type between inventories at once by taking one of them, and clicking on another one while holding Shift. Another theoretical scenario - sending a lot of requests to the server associated with inventory operations on a Digiline chest using a hacked client or custom software. The decision whether to include a message into a batch is made based on the interval from the previous message. For this reason, a batch message is always preceded by a single message which wasn't included into the batch because there was enough time from the previous message, and we cannot know in advance when the next message will be sent to include the current one into the batch too. Only messages with the following actions can be included into a batch: "uput", "utake", "uswap", "umove", "empty", "full"; all the other messages are related to tube events. This restriction was added because, when we send a batch, the area might become unloaded, so we need to load it back, and the possibility to include messages of all types into a batch might make a possibility to create mechanisms which keep the area loaded by causing a large amount of events to a Digiline chest. Messages with actions "empty" and "full" are included into a batch only if the batch is already not empty. If the chest tries to send a message which cannot be batched while its current batch is not empty, the batch is sent immediately to preserve the original order of messages.

### Fields used within the messages

| Field | Description |
| ----- | ----------- |
| `<stack>` | A table which contains data about the stack, and corresponds to the format returned by the :to_table() method of ItemStack (check the Minetest API documentation). |
| `<input slot>`, `<output slot>`, `<slot1>`, `<slot2>` | The index of the corresponding slot starting from 1. |
| `<side>` | A vector represented as a table of format `{ x = <x>, y = <y>, z = <z> }` which represent the direction from which the tube is connected to the chest. |
| `<messages>` | An array of messages which are emitted by Digiline chest in the same format. |

## Additional information

Expand Down
87 changes: 87 additions & 0 deletions inventory.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,55 @@ local S = digilines.S

local pipeworks_enabled = minetest.get_modpath("pipeworks") ~= nil

-- Messages which will be sent in a single batch
local batches = {}
andriyndev marked this conversation as resolved.
Show resolved Hide resolved
-- Maximum interval from the previous message to include the current one into batch (in seconds)
local interval_to_batch = 0.1
-- Maximum number of messages in batch
local max_messages_in_batch = 100
-- Time of the last message for each chest
local last_message_time_for_chest = {}

-- Messages which can be included into batch
local can_be_batched = {
["empty"] = true, ["full"] = true, ["umove"] = true,
["uswap"] = true, ["utake"] = true, ["uput"] = true
}

-- Messages which shouldn't be included into batch when the batch is empty
local dont_batch_when_empty = { ["empty"] = true, ["full"] = true }

-- Sends the current batch message of a Digiline chest
-- pos: the position of the Digilines chest node
-- channel: the channel to which the message will be sent
local function send_and_clear_batch(pos, channel)
local pos_hash = minetest.hash_node_position(pos)
if #batches[pos_hash].messages == 1 then
-- If there is only one message is the batch, don't send it in a batch
digilines.receptor_send(pos, digilines.rules.default, channel,
batches[pos_hash].messages[1])
else
digilines.receptor_send(pos, digilines.rules.default, channel, {
action = "batch",
messages = batches[pos_hash].messages
})
end
batches[pos_hash] = nil
last_message_time_for_chest[pos_hash] = nil
end

-- Send all the batched messages on timer expiration for the chest if present
local function send_batch_on_timer(pos)
if not batches[minetest.hash_node_position(pos)] then
return
end
if minetest.get_node(pos).name == "ignore" then
minetest.load_area(pos)
end
local channel = minetest.get_meta(pos):get_string("channel")
send_and_clear_batch(pos, channel)
end

-- Sends a message onto the Digilines network.
-- pos: the position of the Digilines chest node.
-- action: the action string indicating what happened.
Expand All @@ -11,6 +60,7 @@ local pipeworks_enabled = minetest.get_modpath("pipeworks") ~= nil
-- side: which side of the chest the action occurred (optional).
local function send_message(pos, action, stack, from_slot, to_slot, side)
local channel = minetest.get_meta(pos):get_string("channel")

local msg = {
action = action,
stack = stack and stack:to_table(),
Expand All @@ -19,6 +69,33 @@ local function send_message(pos, action, stack, from_slot, to_slot, side)
-- Duplicate the vector in case the caller expects it not to change.
side = side and vector.new(side)
}

-- Check if we need to include the current message into batch
local pos_hash = minetest.hash_node_position(pos)
if can_be_batched[msg.action] and (batches[pos_hash] or not dont_batch_when_empty[msg.action]) then
local prev_time = last_message_time_for_chest[pos_hash] or 0
local cur_time = minetest.get_us_time()
last_message_time_for_chest[pos_hash] = cur_time
if cur_time - prev_time < 1000000 * interval_to_batch or batches[pos_hash] then
batches[pos_hash] = batches[pos_hash] or { messages = {} }
table.insert(batches[pos_hash].messages, msg)
if #batches[pos_hash].messages >= max_messages_in_batch then
-- Send the batch immediately if it's full
send_and_clear_batch(pos, channel)
elseif not batches[pos_hash].timer then
batches[pos_hash].timer = minetest.after(interval_to_batch, send_batch_on_timer, pos)
end

return
end
else
-- If the current message cannot be batched, flush the current batch to preserve order
if batches[pos_hash] then
batches[pos_hash].timer:cancel()
send_and_clear_batch(pos, channel)
end
end

digilines.receptor_send(pos, digilines.rules.default, channel, msg)
end

Expand Down Expand Up @@ -186,6 +263,16 @@ minetest.register_node("digilines:chest", {
local inv = meta:get_inventory()
inv:set_size("main", 8*4)
end,
on_destruct = function(pos)
local pos_hash = minetest.hash_node_position(pos)
if not batches[pos_hash] then
return
end

batches[pos_hash].timer:cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job:cancel() was added in 5.4.0. Increase the required version in README.md accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, got it. However I'm not sure what's the best: increase the supported version or make a workaround and not use job:cancel()

local channel = minetest.get_meta(pos):get_string("channel")
send_and_clear_batch(pos, channel)
end,
after_place_node = tubescan,
after_dig_node = tubescan,
can_dig = function(pos)
Expand Down