Skip to content

Commit

Permalink
Fix tasks scheduling with cron-like intervals:
Browse files Browse the repository at this point in the history
- Self Repair
- Node Shared Key Renewal
- BeaconSlot Timer
  • Loading branch information
Samuel Manzanera committed Aug 11, 2020
1 parent cc6d0ec commit fa77c75
Show file tree
Hide file tree
Showing 42 changed files with 601 additions and 573 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ erl_crash.dump
# Self signed cert are used for local and dev purposes
/priv/cert

/assets/node_modules
/assets/node_modules

log_*
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use Mix.Config

# Configures Elixir's Logger
config :logger, :console,
format: "$time $metadata[$level] $message\n",
format: "$date $time $metadata[$level] $message\n",
metadata: [:request_id]

config :logger,
Expand Down
18 changes: 11 additions & 7 deletions config/dev.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use Mix.Config

# Do not include metadata nor timestamps in development logs
config :logger, :console, format: "[$level] $message\n"
config :logger,
backends: [:console, {LoggerFileBackend, :file_log}]

config :logger, :file_log, path: "log_#{System.get_env("UNIRIS_CRYPTO_SEED")}"

# Set a higher stacktrace during development. Avoid configuring such
# in production as building large stacktraces may be expensive.
Expand Down Expand Up @@ -64,16 +66,18 @@ config :uniris, Uniris.Bootstrap.NetworkInit,
]

config :uniris, Uniris.BeaconSlotTimer,
interval: 60_000,
trigger_offset: 2_000
interval: "* * * * * *",
# Trigger it 5 seconds before
trigger_offset: 5

config :uniris, Uniris.SharedSecrets.NodeRenewal,
interval: 60_000,
trigger_offset: 10_000
interval: "* * * * * *",
# Trigger it 20 seconds before
trigger_offset: 20

config :uniris, Uniris.SelfRepair,
last_sync_file: "priv/p2p/last_sync/#{System.get_env("UNIRIS_CRYPTO_SEED")}",
interval: 60_000,
interval: "* * * * * *",
network_startup_date: %DateTime{
year: DateTime.utc_now().year,
month: DateTime.utc_now().month,
Expand Down
12 changes: 7 additions & 5 deletions config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ config :uniris, Uniris.Bootstrap.NetworkInit,
]

config :uniris, Uniris.BeaconSlotTimer,
interval: 600_000,
trigger_offset: 2_000
interval: "10 * * * * *",
# Trigger it 5 minute before
trigger_offset: 300

config :uniris, Uniris.SharedSecrets.NodeRenewal,
interval: 86_400_000,
trigger_offset: 10_000
interval: "* 0 * * * *",
# Trigger it 10 minute before
trigger_offset: 600

config :uniris, Uniris.SelfRepair,
interval: 86_400_000,
interval: "* 0 * * * *",
last_sync_file: "priv/p2p/last_sync",
# TODO: specify the real network startup date
network_startup_date: %DateTime{
Expand Down
2 changes: 1 addition & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use Mix.Config

# Print only warnings and errors during test
config :logger, level: :info
config :logger, level: :warning

config :uniris, Uniris.Crypto, keystore: MockCrypto

Expand Down
77 changes: 41 additions & 36 deletions lib/uniris/beacon.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ defmodule Uniris.Beacon do
to retrieve the beacon storage nodes involved.
"""

alias Crontab.CronExpression.Parser, as: CronParser
alias Crontab.Scheduler, as: CronScheduler

alias Uniris.Crypto
alias Uniris.Election

Expand All @@ -21,8 +24,6 @@ defmodule Uniris.Beacon do
alias Uniris.Transaction.ValidationStamp
alias Uniris.Transaction.ValidationStamp.LedgerOperations

alias Uniris.Utils

@doc """
List of all transaction subsets (255 subsets for a byte capacity)
"""
Expand All @@ -39,33 +40,41 @@ defmodule Uniris.Beacon do
@spec get_pools(DateTime.t()) :: list({subset :: binary(), nodes: list(Node.t())})
def get_pools(last_sync_date = %DateTime{}) do
slot_interval = BeaconSlotTimer.slot_interval()
sync_offset_time = DateTime.diff(DateTime.utc_now(), last_sync_date, :millisecond)
sync_times = trunc(sync_offset_time / slot_interval)

slot_times =
Enum.map(0..sync_times, fn i ->
last_sync_date
|> DateTime.add(i * slot_interval, :millisecond)
|> Utils.truncate_datetime()
end)

Flow.from_enumerable(BeaconSubsets.all())
|> Flow.partition(stages: 256)
|> Flow.reduce(fn -> %{} end, fn subset, acc ->
slot_times
|> Enum.map(fn slot_time -> {slot_time, get_pool(subset, slot_time)} end)
|> Enum.reject(fn {_, nodes} -> nodes == [] end)
|> case do
[] ->
acc

nodes ->
Map.update(acc, subset, nodes, &Enum.uniq(&1 ++ nodes))
slot_times = previous_slot_times(slot_interval, last_sync_date)
nodes_by_subset(BeaconSubsets.all(), slot_times, [])
end

defp previous_slot_times(slot_interval, last_sync_date) do
slot_interval
|> CronParser.parse!()
|> CronScheduler.get_previous_run_dates(DateTime.utc_now())
|> Stream.transform([], fn date, acc ->
utc_date = DateTime.from_naive!(date, "Etc/UTC")

case DateTime.compare(utc_date, last_sync_date) do
:gt ->
{[utc_date], acc}

_ ->
{:halt, acc}
end
end)
|> Enum.to_list()
end

defp nodes_by_subset([subset | rest], slots_times, acc) do
nodes =
slots_times
|> Enum.map(fn slot_time -> get_pool(subset, slot_time) end)
|> Enum.reject(fn nodes -> nodes == [] end)
|> :lists.flatten()
|> Enum.uniq_by(& &1.first_public_key)

nodes_by_subset(rest, slots_times, [{subset, nodes} | acc])
end

defp nodes_by_subset([], _slots_times, acc), do: acc

@doc """
Retrieve the beacon storage nodes from a given subset and datetime
"""
Expand All @@ -85,22 +94,18 @@ defmodule Uniris.Beacon do
end

defp next_slot(date) do
last_slot_time = BeaconSlotTimer.last_slot_time()

if DateTime.diff(date, last_slot_time) > 0 do
DateTime.add(date, BeaconSlotTimer.slot_interval())
else
last_slot_time
end
|> Utils.truncate_datetime()
BeaconSlotTimer.slot_interval()
|> CronParser.parse!()
|> CronScheduler.get_next_run_date!(DateTime.to_naive(date))
|> DateTime.from_naive!("Etc/UTC")
end

@doc """
Get the last informations regarding a beacon subset slot before the last synchronized dates for the given subset.
Get the last informations from a beacon subset slot before the last synchronized date
"""
@spec previous_slots(subset :: <<_::8>>, dates :: list(DateTime.t())) :: list(BeaconSlot.t())
def previous_slots(subset, dates) when is_binary(subset) and is_list(dates) do
BeaconSubset.previous_slots(subset, dates)
@spec previous_slots(subset :: <<_::8>>, last_sync_date :: DateTime.t()) :: list(BeaconSlot.t())
def previous_slots(subset, last_sync_date = %DateTime{}) when is_binary(subset) do
BeaconSubset.previous_slots(subset, last_sync_date)
end

@doc """
Expand Down
22 changes: 14 additions & 8 deletions lib/uniris/beacon/slot_timer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule Uniris.BeaconSlotTimer do
end

def init(interval: interval, trigger_offset: trigger_offset) do
schedule_new_slot(Utils.time_offset(interval - trigger_offset))
Task.start(fn -> schedule_new_slot(next_slot(interval, trigger_offset)) end)

{:ok,
%{
Expand All @@ -45,27 +45,33 @@ defmodule Uniris.BeaconSlotTimer do
def handle_info(
:new_slot,
state = %{
last_slot_time: last_slot_time,
interval: interval,
trigger_offset: trigger_offset
}
) do
slot_time = DateTime.add(last_slot_time, interval, :millisecond)
Task.start(fn -> schedule_new_slot(next_slot(interval, trigger_offset)) end)

slot_time = DateTime.utc_now()

BeaconSubsets.all()
|> Enum.each(fn subset ->
[{pid, _}] = Registry.lookup(BeaconSubsetRegistry, subset)
send(pid, {:create_slot, slot_time})
end)

schedule_new_slot(interval - trigger_offset)

{:noreply, Map.put(state, :last_slot_time, slot_time)}
end

defp schedule_new_slot(0), do: :ok
defp schedule_new_slot(interval) when is_integer(interval) and interval >= 0 do
Process.send_after(__MODULE__, :new_slot, interval * 1000)
end

defp schedule_new_slot(interval) do
Process.send_after(__MODULE__, :new_slot, interval)
defp next_slot(interval, trigger_offset) do
if Utils.time_offset(interval) - trigger_offset <= 0 do
Process.sleep(Utils.time_offset(interval) * 1000)
Utils.time_offset(interval) - trigger_offset
else
Utils.time_offset(interval) - trigger_offset
end
end
end
19 changes: 12 additions & 7 deletions lib/uniris/beacon/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Uniris.BeaconSubset do
{:reply, :ok, state}
else
Logger.info(
"Transaction #{inspect(tx_info)} added to the beacon chain (subset #{
"Transaction #{Base.encode16(tx_info.address)} added to the beacon chain (subset #{
Base.encode16(state.subset)
})"
)
Expand All @@ -61,10 +61,10 @@ defmodule Uniris.BeaconSubset do
{:reply, :ok, Map.update!(state, :current_slot, &BeaconSlot.add_node_info(&1, node_info))}
end

def handle_call({:previous_slots, dates}, _, state = %{slots: slots}) do
def handle_call({:previous_slots, last_sync_date}, _, state = %{slots: slots}) do
previous_slots =
slots
|> Enum.filter(fn {time, _} -> Enum.any?(dates, &(DateTime.compare(time, &1) == :gt)) end)
|> Enum.filter(fn {time, _} -> DateTime.compare(time, last_sync_date) == :gt end)
|> Enum.sort_by(fn {time, _} -> time end)
|> Enum.map(fn {_, %Transaction{data: %{content: content}}} ->
content
Expand Down Expand Up @@ -109,7 +109,12 @@ defmodule Uniris.BeaconSubset do
|> Map.put(:current_slot, %BeaconSlot{})
|> put_in([:slots, slot_time], tx)

Logger.info("Beacon slot created with #{inspect(current_slot)} at #{inspect(slot_time)}")
Logger.info(
"Beacon slot created with #{Enum.map(current_slot.transactions, &Base.encode16(&1.address))} at #{
inspect(slot_time)
}"
)

{:noreply, new_state}
end

Expand Down Expand Up @@ -170,11 +175,11 @@ defmodule Uniris.BeaconSubset do
GenServer.call(via_tuple(subset), {:add_node_info, node_info})
end

@spec previous_slots(binary(), dates :: list(DateTime.t())) :: list(BeaconSlot.t())
def previous_slots(subset, dates) when is_list(dates) do
@spec previous_slots(binary(), last_sync_date :: DateTime.t()) :: list(BeaconSlot.t())
def previous_slots(subset, last_sync_date = %DateTime{}) do
subset
|> via_tuple
|> GenServer.call({:previous_slots, dates})
|> GenServer.call({:previous_slots, last_sync_date})
end

defp via_tuple(subset) do
Expand Down
10 changes: 9 additions & 1 deletion lib/uniris/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,15 @@ defmodule Uniris.Bootstrap do
else
if require_node_update?(ip, port, last_sync_date) do
Logger.info("Update node chain...")
update_node(ip, port, patch, bootstraping_seeds)

case bootstraping_seeds do
[] ->
Logger.warn("Not enought nodes in the network. No node update")
:ok

_ ->
update_node(ip, port, patch, bootstraping_seeds)
end
else
:ok
end
Expand Down
2 changes: 1 addition & 1 deletion lib/uniris/crypto/transaction_loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Uniris.Crypto.TransactionLoader do
Process.send_after(
self(),
{:set_daily_nonce, encrypted_daily_nonce_seed, encrypted_key},
renewal_offset
renewal_offset * 1000
)

new_state = Map.put(state, :ref_daily_nonce_scheduler, ref_daily_nonce_scheduler)
Expand Down
2 changes: 2 additions & 0 deletions lib/uniris/mining.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ defmodule Uniris.Mining do

def start(tx = %Transaction{}, _, [_ | []]) do
Task.start(fn ->
Logger.info("Mining transaction #{Base.encode16(tx.address)}")

tx =
%Transaction{validation_stamp: %ValidationStamp{ledger_operations: ledger_ops}} =
NetworkInit.self_validation!(tx, Context.fetch_history(%Context{}, tx))
Expand Down
Loading

0 comments on commit fa77c75

Please sign in to comment.