Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/lasso/chain_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ defmodule Lasso.RPC.ChainSupervisor do

%{
id: pp.provider_id,
name: get_in(instance_config, [:canonical_config, :name]) || pp.provider_id,
name: pp[:name] || pp.provider_id,
config: instance_config,
status: health.status,
http_status: health.http_status,
Expand Down
4 changes: 2 additions & 2 deletions lib/lasso/config/profile_validator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,13 @@ defmodule Lasso.Config.ProfileValidator do
400

iex> ProfileValidator.error_to_http_status(:profile_not_found)
503
404
"""
@spec error_to_http_status(validation_error()) :: integer()
def error_to_http_status(:profile_nil), do: 400
def error_to_http_status(:profile_invalid_type), do: 400
def error_to_http_status(:profile_empty), do: 400
def error_to_http_status(:profile_not_found), do: 503
def error_to_http_status(:profile_not_found), do: 404

@doc """
Converts validation error types to JSON-RPC error codes.
Expand Down
63 changes: 62 additions & 1 deletion lib/lasso/core/block_sync/strategies/http_strategy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do
Polls `eth_blockNumber` at a configurable interval and reports block heights
to the parent Worker. Uses a reference profile and Channel for auth injection.

## Health Writes

Each successful poll writes `http_status: :healthy` to `{:health, instance_id}` in
ETS and signals circuit breaker recovery. Failed polls write `http_status` as
`:degraded` (2+) or `:unhealthy` (5+). Top-level `status`, `consecutive_failures`,
and `last_error` are left to ProbeCoordinator and Observability.

## Circuit Breaker Integration

Goes through the shared HTTP circuit breaker keyed by `{instance_id, :http}`.
Expand All @@ -22,6 +29,8 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do
@default_poll_interval_ms 15_000
@default_timeout_ms 3_000
@max_consecutive_failures 3
@degraded_threshold 2
@unhealthy_threshold 5

defstruct [
:instance_id,
Expand Down Expand Up @@ -109,6 +118,12 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do
execute_poll(state)
end

@spec set_poll_interval(t(), pos_integer()) :: t()
def set_poll_interval(%__MODULE__{} = state, interval_ms)
when is_integer(interval_ms) and interval_ms > 0 do
%{state | poll_interval_ms: interval_ms}
end

## Private Functions

defp schedule_poll(state, delay_ms) do
Expand All @@ -125,6 +140,8 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do
case result do
{:ok, height} ->
send(state.parent, {:block_height, state.instance_id, height, %{latency_ms: latency_ms}})
write_health_success(state.instance_id)
CircuitBreaker.signal_recovery_cast({state.instance_id, :http})

new_state = %{
state
Expand All @@ -144,6 +161,7 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do

{:error, reason} ->
failures = state.consecutive_failures + 1
write_health_failure(state.instance_id, failures, reason)

if failures == @max_consecutive_failures do
Logger.warning("HTTP polling degraded",
Expand All @@ -164,6 +182,50 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do
end
end

defp write_health_success(instance_id) do
existing = read_existing_health(instance_id)

merged =
Map.merge(existing, %{
http_status: :healthy,
last_health_check: System.system_time(:millisecond)
})

:ets.insert(:lasso_instance_state, {{:health, instance_id}, merged})
rescue
ArgumentError -> :ok
end

defp write_health_failure(instance_id, consecutive_failures, _reason) do
existing = read_existing_health(instance_id)

http_status =
cond do
consecutive_failures >= @unhealthy_threshold -> :unhealthy
consecutive_failures >= @degraded_threshold -> :degraded
true -> Map.get(existing, :http_status, :healthy)
end

merged =
Map.merge(existing, %{
http_status: http_status,
last_health_check: System.system_time(:millisecond)
})

:ets.insert(:lasso_instance_state, {{:health, instance_id}, merged})
rescue
ArgumentError -> :ok
end

defp read_existing_health(instance_id) do
case :ets.lookup(:lasso_instance_state, {:health, instance_id}) do
[{_, data}] -> data
[] -> %{}
end
rescue
ArgumentError -> %{}
end

defp do_poll(instance_id, chain) do
cb_id = {instance_id, :http}

Expand Down Expand Up @@ -191,7 +253,6 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do
end
end

# Use a reference profile + Channel for auth header injection
defp do_poll_request(instance_id, chain) do
case resolve_channel(instance_id, chain) do
{:ok, channel} ->
Expand Down
72 changes: 59 additions & 13 deletions lib/lasso/core/block_sync/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ defmodule Lasso.BlockSync.Worker do
The Worker operates in one of two modes:
- `:http_only` - HTTP polling only (WS unavailable or disabled)
- `:http_with_ws` - HTTP polling + WS subscription (real-time layer)

## WS-Aware Polling

When WS subscription is active, HTTP polling interval is reduced (3x normal)
to conserve RPC usage while maintaining connection warmth and WS liveness
detection. Normal interval is restored when WS degrades or disconnects.
"""

use GenServer
Expand All @@ -24,11 +30,13 @@ defmodule Lasso.BlockSync.Worker do
alias Lasso.Providers.Catalog

@reconnect_delay_ms 5_000
@ws_active_poll_multiplier 3

@type mode :: :http_only | :http_with_ws
@type config :: %{
subscribe_new_heads: boolean(),
poll_interval_ms: pos_integer(),
ws_active_poll_interval_ms: pos_integer(),
staleness_threshold_ms: pos_integer()
}

Expand All @@ -39,7 +47,8 @@ defmodule Lasso.BlockSync.Worker do
ws_strategy: pid() | nil,
http_strategy: pid() | nil,
config: config(),
ws_retry_count: non_neg_integer()
ws_retry_count: non_neg_integer(),
http_reduced: boolean()
}

defstruct [
Expand All @@ -49,7 +58,8 @@ defmodule Lasso.BlockSync.Worker do
:ws_strategy,
:http_strategy,
:config,
:ws_retry_count
:ws_retry_count,
http_reduced: false
]

## Client API
Expand All @@ -75,10 +85,7 @@ defmodule Lasso.BlockSync.Worker do

@impl true
def init({chain, instance_id}) do
# Instance-scoped WS connection events
Phoenix.PubSub.subscribe(Lasso.PubSub, "ws:conn:instance:#{instance_id}")

# Chain-scoped manager restart topic
Phoenix.PubSub.subscribe(Lasso.PubSub, "instance_sub_manager:restarted:#{chain}")

config = load_config(instance_id, chain)
Expand All @@ -90,7 +97,8 @@ defmodule Lasso.BlockSync.Worker do
ws_strategy: nil,
http_strategy: nil,
config: config,
ws_retry_count: 0
ws_retry_count: 0,
http_reduced: false
}

Process.send_after(self(), :start_strategies, 2_000)
Expand All @@ -111,7 +119,8 @@ defmodule Lasso.BlockSync.Worker do
ws_status: if(state.ws_strategy, do: WsStrategy.get_status(state.ws_strategy), else: nil),
http_status:
if(state.http_strategy, do: HttpStrategy.get_status(state.http_strategy), else: nil),
config: state.config
config: state.config,
http_reduced: state.http_reduced
}

{:reply, {:ok, status}, state}
Expand Down Expand Up @@ -262,6 +271,7 @@ defmodule Lasso.BlockSync.Worker do
default_config = %{
subscribe_new_heads: has_ws,
poll_interval_ms: 15_000,
ws_active_poll_interval_ms: 15_000 * @ws_active_poll_multiplier,
staleness_threshold_ms: 35_000
}

Expand All @@ -271,9 +281,12 @@ defmodule Lasso.BlockSync.Worker do
resolve_subscribe_new_heads(chain_config, ref_profile, chain, instance_id) or
Enum.any?(refs, &check_profile_subscribe_new_heads(&1, chain, instance_id))

poll_interval_ms = chain_config.monitoring.probe_interval_ms

%{
subscribe_new_heads: subscribe_new_heads and has_ws,
poll_interval_ms: chain_config.monitoring.probe_interval_ms,
poll_interval_ms: poll_interval_ms,
ws_active_poll_interval_ms: poll_interval_ms * @ws_active_poll_multiplier,
staleness_threshold_ms: chain_config.websocket.new_heads_timeout_ms
}
else
Expand Down Expand Up @@ -374,7 +387,8 @@ defmodule Lasso.BlockSync.Worker do
end

defp handle_strategy_status(state, :ws, :active) do
%{state | mode: :http_with_ws, ws_retry_count: 0}
state = %{state | mode: :http_with_ws, ws_retry_count: 0}
reduce_http_polling(state)
end

defp handle_strategy_status(state, :ws, :failed) do
Expand All @@ -383,16 +397,16 @@ defmodule Lasso.BlockSync.Worker do
instance_id: state.instance_id
)

state
restore_http_polling(state)
end

defp handle_strategy_status(state, :ws, status) when status in [:stale, :degraded] do
Logger.info("WS subscription #{status}, HTTP polling continues",
Logger.info("WS subscription #{status}, restoring normal HTTP polling",
chain: state.chain,
instance_id: state.instance_id
)

state
restore_http_polling(state)
end

defp handle_strategy_status(state, :http, :degraded) do
Expand All @@ -417,6 +431,38 @@ defmodule Lasso.BlockSync.Worker do
state
end

defp reduce_http_polling(%{http_strategy: nil} = state), do: state
defp reduce_http_polling(%{http_reduced: true} = state), do: state

defp reduce_http_polling(state) do
reduced_interval = state.config.ws_active_poll_interval_ms
new_http = HttpStrategy.set_poll_interval(state.http_strategy, reduced_interval)

Logger.debug("HTTP polling reduced (WS active)",
chain: state.chain,
instance_id: state.instance_id,
poll_interval_ms: reduced_interval
)

%{state | http_strategy: new_http, http_reduced: true}
end

defp restore_http_polling(%{http_strategy: nil} = state), do: state
defp restore_http_polling(%{http_reduced: false} = state), do: state

defp restore_http_polling(state) do
normal_interval = state.config.poll_interval_ms
new_http = HttpStrategy.set_poll_interval(state.http_strategy, normal_interval)

Logger.debug("HTTP polling restored to normal",
chain: state.chain,
instance_id: state.instance_id,
poll_interval_ms: normal_interval
)

%{state | http_strategy: new_http, http_reduced: false}
end

defp handle_ws_reconnected(%{mode: nil} = state), do: state

defp handle_ws_reconnected(state) do
Expand Down Expand Up @@ -452,7 +498,7 @@ defmodule Lasso.BlockSync.Worker do
schedule_ws_reconnect(state.ws_retry_count)
end

state
restore_http_polling(state)
end

defp handle_manager_restarted(state) do
Expand Down
2 changes: 1 addition & 1 deletion lib/lasso/core/providers/providers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ defmodule Lasso.Providers do

%{
id: pp.provider_id,
name: get_in(instance_config, [:canonical_config, :name]) || pp.provider_id,
name: pp[:name] || pp.provider_id,
status: health.status,
availability: InstanceState.status_to_availability(health.status),
has_http: is_binary(url),
Expand Down
4 changes: 1 addition & 3 deletions lib/lasso/core/request/request_options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ defmodule Lasso.RPC.RequestOptions do

@enforce_keys [:timeout_ms]
defstruct profile: "default",
account_id: nil,
strategy: :load_balanced,
provider_override: nil,
transport: nil,
Expand All @@ -26,7 +25,6 @@ defmodule Lasso.RPC.RequestOptions do

@type t :: %__MODULE__{
profile: String.t(),
account_id: String.t() | nil,
strategy: strategy,
provider_override: String.t() | nil,
transport: transport,
Expand Down Expand Up @@ -56,7 +54,7 @@ defmodule Lasso.RPC.RequestOptions do
end

defp validate_strategy(strategy)
when strategy in [:fastest, :priority, :load_balanced, :round_robin, :latency_weighted],
when strategy in [:fastest, :priority, :load_balanced, :latency_weighted],
do: :ok

defp validate_strategy(strategy),
Expand Down
3 changes: 1 addition & 2 deletions lib/lasso/core/request/request_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,7 @@ defmodule Lasso.RPC.RequestPipeline do
transport: opts.transport || :http,
strategy: opts.strategy,
request_id: opts.request_id,
plug_start_time: opts.plug_start_time,
account_id: opts.account_id
plug_start_time: opts.plug_start_time
)
end

Expand Down
3 changes: 0 additions & 3 deletions lib/lasso/core/request/request_pipeline/observability.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ defmodule Lasso.RPC.RequestPipeline.Observability do

publish_routing_decision(
request_id: ctx.request_id,
account_id: ctx.account_id,
profile: profile,
chain: ctx.chain,
method: method,
Expand Down Expand Up @@ -114,7 +113,6 @@ defmodule Lasso.RPC.RequestPipeline.Observability do

publish_routing_decision(
request_id: ctx.request_id,
account_id: ctx.account_id,
profile: profile,
chain: ctx.chain,
method: method,
Expand Down Expand Up @@ -425,7 +423,6 @@ defmodule Lasso.RPC.RequestPipeline.Observability do
event =
RoutingDecision.new(
request_id: opts[:request_id],
account_id: opts[:account_id],
profile: opts[:profile],
chain: opts[:chain],
method: opts[:method],
Expand Down
Loading
Loading