Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement batched messages for Digiline Chests #87

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/chest.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,24 @@ The message is sent when user takes `<stack>` from `<output slot>` in the chest.
```
The message is sent when user puts `<stack>` to the chest to `<input slot>`

----------------------------

```
{
action = "batch",
messages = <messages>
}
```
The message contains an array of other messages (`<messages>`) which were emitted within a very short interval, and for this reason merged into a batch and sent as a single message. This is needed to prevent burning of connected Lua controllers in case of a large amount of messages within a very short time. This can happen in case of using the feature which allows to move all items of the same type between inventories at once by taking one of them, and clicking on another one while holding Shift. Another theoretical scenario - sending a lot of requests to the server associated with inventory operations on a Digiline chest using a hacked client or custom software. The decision whether to include a message into a batch is made based on the interval from the previous message. For this reason, a batch message is always preceded by a single message which wasn't included into the batch because there was enough time from the previous message, and we cannot know in advance when the next message will be sent to include the current one into the batch too.

### Fields used within the messages

| Field | Description |
| ----- | ----------- |
| `<stack>` | A table which contains data about the stack, and corresponds to the format returned by the :to_table() method of ItemStack (check the Minetest API documentation). |
| `<input slot>`, `<output slot>`, `<slot1>`, `<slot2>` | The index of the corresponding slot starting from 1. |
| `<side>` | A vector represented as a table of format `{ x = <x>, y = <y>, z = <z> }` which represent the direction from which the tube is connected to the chest. |
| `<messages>` | An array of messages which are emitted by Digiline chest in the same format. |

## Additional information

Expand Down
66 changes: 66 additions & 0 deletions inventory.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,43 @@ local S = digilines.S

local pipeworks_enabled = minetest.get_modpath("pipeworks") ~= nil

-- Messages which will be sent in a single batch
local batched_messages = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and last_message_time_for_chest needs periodic cleanups (e.g. every 30 seconds) in cases where mapblocks are unloaded from memory without sending the message(s).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late response. I'm not sure that it's the best solution because this means that the message will be lost that might be crucial in some cases and disrupt the logic of the factory. Also, I don't know how likely the situation is when the map region with a digiline chest is unloaded within 0.1 seconds after placing something to there in a batch (it might happen in case of delivering items to there by tubes in large amounts though). But even when it happens, I'm not sure how crucial is that we'll have some extra records in the structure for a while (maybe even until the server is shutting down). Another option would be to create such a timer, and if it detects such a situation, load the area and flush the signal. But in this case, as far as I understand, it'll activate the factory again that might send more items to the chest and trigger another batched message, and we're stuck in a vicious circle. Some players could theoretically even use it to keep their factories running when offline. I currently haven't come up with a solution of this dilemma.

Copy link
Member

@SmallJoker SmallJoker Oct 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mapblocks can be unloaded from one server step to the next. This case won't occur often, but it can accumulate over time (unless the same mapblocks are loaded again).

Option 1: Clearing old entries periodically

-- TODO: change to 120 seconds if confirmed working
local cleanup_interval = 30
local function cleanup_batches_from_unloaded_blocks()
	local threshold_time = minetest.get_us_time() - 1E6 * (cleanup_interval + interval_to_batch)
	local pos_to_remove = {}

	for pos_hash, prev_time in pairs(last_message_time_for_chest) do
		if prev_time < threshold_time then
			-- Expired
			table.insert(pos_to_remove, pos_hash) 
		end
	end

	-- Remove all data assigned to this position
	for _, pos_hash in ipairs(pos_to_remove) do
		batched_messages[pos_hash] = nil
		last_message_time_for_chest[pos_hash] = nil
	end

	minetest.after(cleanup_interval, cleanup_batches_from_unloaded_blocks)
end

-- Short time to discover code issues early
minetest.after(5, cleanup_batches_from_unloaded_blocks)

Option 2: node name validity check

  1. get rid of node timers
  2. iterate through batched_messages periodically (minetest.register_globalstep)
  3. process the entry or remove it, depending on whether minetest.get_node(pos) returns the digiline node name.

Also: might there be nodes that switch between two states (e.g. active/inactive) ? AFAIK when using minetest.swap_node or equivalent, the node timer is removed but no on _destruct called. Judging from the C++ code, node timers appear to be untouched by swap node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems I've come up with the perfect solution: use batched messages only for inventory actions triggered by a user. In this case it will be very unlikely that the chest is unloaded after 0.1 seconds after interaction with a user. But even if it happens, it shouldn't be a problem to load the map region again, and flush the batched messages. Regarding actions triggered by tubes, it will be the chest's owner's responsibility to ensure that the incoming (or outcoming) items won't burn the controller.

Copy link
Contributor Author

@andriyndev andriyndev Oct 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yes, use minetest.after() instead of node timers to trigger it, so that it'll be triggered even if the node is unloaded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

-- Maximum interval from the previous message to include the current one into batch (in seconds)
local interval_to_batch = 0.1
-- Maximum number of messages in batch
local max_messages_in_batch = 100
-- Time of the last message for each chest
local last_message_time_for_chest = {}

-- Sends the current batch message of a Digiline chest
-- pos: the position of the Digilines chest node
-- channel: the channel to which the message will be sent
local function send_and_clear_batch(pos, channel)
local pos_hash = minetest.hash_node_position(pos)
if #batched_messages[pos_hash] == 1 then
-- If there is only one message is the batch, don't send it in a batch
digilines.receptor_send(pos, digilines.rules.default, channel,
batched_messages[pos_hash][1])
else
digilines.receptor_send(pos, digilines.rules.default, channel, {
action = "batch",
messages = batched_messages[pos_hash]
})
end
batched_messages[pos_hash] = nil
last_message_time_for_chest[pos_hash] = nil
end

-- Send all the batched messages for the chest if present
local function send_batch_for_chest(pos)
if not batched_messages[minetest.hash_node_position(pos)] then
return
end
local channel = minetest.get_meta(pos):get_string("channel")
send_and_clear_batch(pos, channel)
end

-- Sends a message onto the Digilines network.
-- pos: the position of the Digilines chest node.
-- action: the action string indicating what happened.
Expand All @@ -11,6 +48,7 @@ local pipeworks_enabled = minetest.get_modpath("pipeworks") ~= nil
-- side: which side of the chest the action occurred (optional).
local function send_message(pos, action, stack, from_slot, to_slot, side)
local channel = minetest.get_meta(pos):get_string("channel")

local msg = {
action = action,
stack = stack and stack:to_table(),
Expand All @@ -19,6 +57,26 @@ local function send_message(pos, action, stack, from_slot, to_slot, side)
-- Duplicate the vector in case the caller expects it not to change.
side = side and vector.new(side)
}

-- Check if we need to include the current message into batch
local pos_hash = minetest.hash_node_position(pos)
local prev_time = last_message_time_for_chest[pos_hash] or 0
local cur_time = minetest.get_us_time()
last_message_time_for_chest[pos_hash] = cur_time
if cur_time - prev_time < 1000000 * interval_to_batch then
batched_messages[pos_hash] = batched_messages[pos_hash] or {}
table.insert(batched_messages[pos_hash], msg)
local node_timer = minetest.get_node_timer(pos)
if #batched_messages[pos_hash] >= max_messages_in_batch then
-- Send the batch immediately if it's full
node_timer:stop()
send_and_clear_batch(pos, channel)
else
node_timer:start(interval_to_batch)
end
return
end

digilines.receptor_send(pos, digilines.rules.default, channel, msg)
end

Expand Down Expand Up @@ -186,6 +244,9 @@ minetest.register_node("digilines:chest", {
local inv = meta:get_inventory()
inv:set_size("main", 8*4)
end,
on_destruct = function(pos)
send_batch_for_chest(pos)
end,
after_place_node = tubescan,
after_dig_node = tubescan,
can_dig = function(pos)
Expand Down Expand Up @@ -321,6 +382,11 @@ minetest.register_node("digilines:chest", {
send_message(pos, "utake", stack, index)
check_empty(pos)
minetest.log("action", player:get_player_name().." takes stuff from chest at "..minetest.pos_to_string(pos))
end,
on_timer = function(pos, _)
-- Send all the batched messages when enough time since the last message passed
send_batch_for_chest(pos)
return false
end
})

Expand Down