diff --git a/lib/lasso/chain_supervisor.ex b/lib/lasso/chain_supervisor.ex index 5249cb2..cfb8792 100644 --- a/lib/lasso/chain_supervisor.ex +++ b/lib/lasso/chain_supervisor.ex @@ -61,7 +61,7 @@ defmodule Lasso.RPC.ChainSupervisor do %{ id: pp.provider_id, - name: get_in(instance_config, [:canonical_config, :name]) || pp.provider_id, + name: pp[:name] || pp.provider_id, config: instance_config, status: health.status, http_status: health.http_status, diff --git a/lib/lasso/config/profile_validator.ex b/lib/lasso/config/profile_validator.ex index 645a90d..24778f6 100644 --- a/lib/lasso/config/profile_validator.ex +++ b/lib/lasso/config/profile_validator.ex @@ -189,13 +189,13 @@ defmodule Lasso.Config.ProfileValidator do 400 iex> ProfileValidator.error_to_http_status(:profile_not_found) - 503 + 404 """ @spec error_to_http_status(validation_error()) :: integer() def error_to_http_status(:profile_nil), do: 400 def error_to_http_status(:profile_invalid_type), do: 400 def error_to_http_status(:profile_empty), do: 400 - def error_to_http_status(:profile_not_found), do: 503 + def error_to_http_status(:profile_not_found), do: 404 @doc """ Converts validation error types to JSON-RPC error codes. diff --git a/lib/lasso/core/block_sync/strategies/http_strategy.ex b/lib/lasso/core/block_sync/strategies/http_strategy.ex index b419bba..1a41ef5 100644 --- a/lib/lasso/core/block_sync/strategies/http_strategy.ex +++ b/lib/lasso/core/block_sync/strategies/http_strategy.ex @@ -5,6 +5,13 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do Polls `eth_blockNumber` at a configurable interval and reports block heights to the parent Worker. Uses a reference profile and Channel for auth injection. + ## Health Writes + + Each successful poll writes `http_status: :healthy` to `{:health, instance_id}` in + ETS and signals circuit breaker recovery. Failed polls write `http_status` as + `:degraded` (2+) or `:unhealthy` (5+). Top-level `status`, `consecutive_failures`, + and `last_error` are left to ProbeCoordinator and Observability. + ## Circuit Breaker Integration Goes through the shared HTTP circuit breaker keyed by `{instance_id, :http}`. @@ -22,6 +29,8 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do @default_poll_interval_ms 15_000 @default_timeout_ms 3_000 @max_consecutive_failures 3 + @degraded_threshold 2 + @unhealthy_threshold 5 defstruct [ :instance_id, @@ -109,6 +118,12 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do execute_poll(state) end + @spec set_poll_interval(t(), pos_integer()) :: t() + def set_poll_interval(%__MODULE__{} = state, interval_ms) + when is_integer(interval_ms) and interval_ms > 0 do + %{state | poll_interval_ms: interval_ms} + end + ## Private Functions defp schedule_poll(state, delay_ms) do @@ -125,6 +140,8 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do case result do {:ok, height} -> send(state.parent, {:block_height, state.instance_id, height, %{latency_ms: latency_ms}}) + write_health_success(state.instance_id) + CircuitBreaker.signal_recovery_cast({state.instance_id, :http}) new_state = %{ state @@ -144,6 +161,7 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do {:error, reason} -> failures = state.consecutive_failures + 1 + write_health_failure(state.instance_id, failures, reason) if failures == @max_consecutive_failures do Logger.warning("HTTP polling degraded", @@ -164,6 +182,50 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do end end + defp write_health_success(instance_id) do + existing = read_existing_health(instance_id) + + merged = + Map.merge(existing, %{ + http_status: :healthy, + last_health_check: System.system_time(:millisecond) + }) + + :ets.insert(:lasso_instance_state, {{:health, instance_id}, merged}) + rescue + ArgumentError -> :ok + end + + defp write_health_failure(instance_id, consecutive_failures, _reason) do + existing = read_existing_health(instance_id) + + http_status = + cond do + consecutive_failures >= @unhealthy_threshold -> :unhealthy + consecutive_failures >= @degraded_threshold -> :degraded + true -> Map.get(existing, :http_status, :healthy) + end + + merged = + Map.merge(existing, %{ + http_status: http_status, + last_health_check: System.system_time(:millisecond) + }) + + :ets.insert(:lasso_instance_state, {{:health, instance_id}, merged}) + rescue + ArgumentError -> :ok + end + + defp read_existing_health(instance_id) do + case :ets.lookup(:lasso_instance_state, {:health, instance_id}) do + [{_, data}] -> data + [] -> %{} + end + rescue + ArgumentError -> %{} + end + defp do_poll(instance_id, chain) do cb_id = {instance_id, :http} @@ -191,7 +253,6 @@ defmodule Lasso.BlockSync.Strategies.HttpStrategy do end end - # Use a reference profile + Channel for auth header injection defp do_poll_request(instance_id, chain) do case resolve_channel(instance_id, chain) do {:ok, channel} -> diff --git a/lib/lasso/core/block_sync/worker.ex b/lib/lasso/core/block_sync/worker.ex index 56a8b88..99b4305 100644 --- a/lib/lasso/core/block_sync/worker.ex +++ b/lib/lasso/core/block_sync/worker.ex @@ -13,6 +13,12 @@ defmodule Lasso.BlockSync.Worker do The Worker operates in one of two modes: - `:http_only` - HTTP polling only (WS unavailable or disabled) - `:http_with_ws` - HTTP polling + WS subscription (real-time layer) + + ## WS-Aware Polling + + When WS subscription is active, HTTP polling interval is reduced (3x normal) + to conserve RPC usage while maintaining connection warmth and WS liveness + detection. Normal interval is restored when WS degrades or disconnects. """ use GenServer @@ -24,11 +30,13 @@ defmodule Lasso.BlockSync.Worker do alias Lasso.Providers.Catalog @reconnect_delay_ms 5_000 + @ws_active_poll_multiplier 3 @type mode :: :http_only | :http_with_ws @type config :: %{ subscribe_new_heads: boolean(), poll_interval_ms: pos_integer(), + ws_active_poll_interval_ms: pos_integer(), staleness_threshold_ms: pos_integer() } @@ -39,7 +47,8 @@ defmodule Lasso.BlockSync.Worker do ws_strategy: pid() | nil, http_strategy: pid() | nil, config: config(), - ws_retry_count: non_neg_integer() + ws_retry_count: non_neg_integer(), + http_reduced: boolean() } defstruct [ @@ -49,7 +58,8 @@ defmodule Lasso.BlockSync.Worker do :ws_strategy, :http_strategy, :config, - :ws_retry_count + :ws_retry_count, + http_reduced: false ] ## Client API @@ -75,10 +85,7 @@ defmodule Lasso.BlockSync.Worker do @impl true def init({chain, instance_id}) do - # Instance-scoped WS connection events Phoenix.PubSub.subscribe(Lasso.PubSub, "ws:conn:instance:#{instance_id}") - - # Chain-scoped manager restart topic Phoenix.PubSub.subscribe(Lasso.PubSub, "instance_sub_manager:restarted:#{chain}") config = load_config(instance_id, chain) @@ -90,7 +97,8 @@ defmodule Lasso.BlockSync.Worker do ws_strategy: nil, http_strategy: nil, config: config, - ws_retry_count: 0 + ws_retry_count: 0, + http_reduced: false } Process.send_after(self(), :start_strategies, 2_000) @@ -111,7 +119,8 @@ defmodule Lasso.BlockSync.Worker do ws_status: if(state.ws_strategy, do: WsStrategy.get_status(state.ws_strategy), else: nil), http_status: if(state.http_strategy, do: HttpStrategy.get_status(state.http_strategy), else: nil), - config: state.config + config: state.config, + http_reduced: state.http_reduced } {:reply, {:ok, status}, state} @@ -262,6 +271,7 @@ defmodule Lasso.BlockSync.Worker do default_config = %{ subscribe_new_heads: has_ws, poll_interval_ms: 15_000, + ws_active_poll_interval_ms: 15_000 * @ws_active_poll_multiplier, staleness_threshold_ms: 35_000 } @@ -271,9 +281,12 @@ defmodule Lasso.BlockSync.Worker do resolve_subscribe_new_heads(chain_config, ref_profile, chain, instance_id) or Enum.any?(refs, &check_profile_subscribe_new_heads(&1, chain, instance_id)) + poll_interval_ms = chain_config.monitoring.probe_interval_ms + %{ subscribe_new_heads: subscribe_new_heads and has_ws, - poll_interval_ms: chain_config.monitoring.probe_interval_ms, + poll_interval_ms: poll_interval_ms, + ws_active_poll_interval_ms: poll_interval_ms * @ws_active_poll_multiplier, staleness_threshold_ms: chain_config.websocket.new_heads_timeout_ms } else @@ -374,7 +387,8 @@ defmodule Lasso.BlockSync.Worker do end defp handle_strategy_status(state, :ws, :active) do - %{state | mode: :http_with_ws, ws_retry_count: 0} + state = %{state | mode: :http_with_ws, ws_retry_count: 0} + reduce_http_polling(state) end defp handle_strategy_status(state, :ws, :failed) do @@ -383,16 +397,16 @@ defmodule Lasso.BlockSync.Worker do instance_id: state.instance_id ) - state + restore_http_polling(state) end defp handle_strategy_status(state, :ws, status) when status in [:stale, :degraded] do - Logger.info("WS subscription #{status}, HTTP polling continues", + Logger.info("WS subscription #{status}, restoring normal HTTP polling", chain: state.chain, instance_id: state.instance_id ) - state + restore_http_polling(state) end defp handle_strategy_status(state, :http, :degraded) do @@ -417,6 +431,38 @@ defmodule Lasso.BlockSync.Worker do state end + defp reduce_http_polling(%{http_strategy: nil} = state), do: state + defp reduce_http_polling(%{http_reduced: true} = state), do: state + + defp reduce_http_polling(state) do + reduced_interval = state.config.ws_active_poll_interval_ms + new_http = HttpStrategy.set_poll_interval(state.http_strategy, reduced_interval) + + Logger.debug("HTTP polling reduced (WS active)", + chain: state.chain, + instance_id: state.instance_id, + poll_interval_ms: reduced_interval + ) + + %{state | http_strategy: new_http, http_reduced: true} + end + + defp restore_http_polling(%{http_strategy: nil} = state), do: state + defp restore_http_polling(%{http_reduced: false} = state), do: state + + defp restore_http_polling(state) do + normal_interval = state.config.poll_interval_ms + new_http = HttpStrategy.set_poll_interval(state.http_strategy, normal_interval) + + Logger.debug("HTTP polling restored to normal", + chain: state.chain, + instance_id: state.instance_id, + poll_interval_ms: normal_interval + ) + + %{state | http_strategy: new_http, http_reduced: false} + end + defp handle_ws_reconnected(%{mode: nil} = state), do: state defp handle_ws_reconnected(state) do @@ -452,7 +498,7 @@ defmodule Lasso.BlockSync.Worker do schedule_ws_reconnect(state.ws_retry_count) end - state + restore_http_polling(state) end defp handle_manager_restarted(state) do diff --git a/lib/lasso/core/providers/providers.ex b/lib/lasso/core/providers/providers.ex index 6eee1ef..15aca82 100644 --- a/lib/lasso/core/providers/providers.ex +++ b/lib/lasso/core/providers/providers.ex @@ -151,7 +151,7 @@ defmodule Lasso.Providers do %{ id: pp.provider_id, - name: get_in(instance_config, [:canonical_config, :name]) || pp.provider_id, + name: pp[:name] || pp.provider_id, status: health.status, availability: InstanceState.status_to_availability(health.status), has_http: is_binary(url), diff --git a/lib/lasso/core/request/request_options.ex b/lib/lasso/core/request/request_options.ex index 80ee2f3..99ecb30 100644 --- a/lib/lasso/core/request/request_options.ex +++ b/lib/lasso/core/request/request_options.ex @@ -13,7 +13,6 @@ defmodule Lasso.RPC.RequestOptions do @enforce_keys [:timeout_ms] defstruct profile: "default", - account_id: nil, strategy: :load_balanced, provider_override: nil, transport: nil, @@ -26,7 +25,6 @@ defmodule Lasso.RPC.RequestOptions do @type t :: %__MODULE__{ profile: String.t(), - account_id: String.t() | nil, strategy: strategy, provider_override: String.t() | nil, transport: transport, @@ -56,7 +54,7 @@ defmodule Lasso.RPC.RequestOptions do end defp validate_strategy(strategy) - when strategy in [:fastest, :priority, :load_balanced, :round_robin, :latency_weighted], + when strategy in [:fastest, :priority, :load_balanced, :latency_weighted], do: :ok defp validate_strategy(strategy), diff --git a/lib/lasso/core/request/request_pipeline.ex b/lib/lasso/core/request/request_pipeline.ex index a750354..3e8887d 100644 --- a/lib/lasso/core/request/request_pipeline.ex +++ b/lib/lasso/core/request/request_pipeline.ex @@ -475,8 +475,7 @@ defmodule Lasso.RPC.RequestPipeline do transport: opts.transport || :http, strategy: opts.strategy, request_id: opts.request_id, - plug_start_time: opts.plug_start_time, - account_id: opts.account_id + plug_start_time: opts.plug_start_time ) end diff --git a/lib/lasso/core/request/request_pipeline/observability.ex b/lib/lasso/core/request/request_pipeline/observability.ex index dcb5036..11ac4f2 100644 --- a/lib/lasso/core/request/request_pipeline/observability.ex +++ b/lib/lasso/core/request/request_pipeline/observability.ex @@ -55,7 +55,6 @@ defmodule Lasso.RPC.RequestPipeline.Observability do publish_routing_decision( request_id: ctx.request_id, - account_id: ctx.account_id, profile: profile, chain: ctx.chain, method: method, @@ -114,7 +113,6 @@ defmodule Lasso.RPC.RequestPipeline.Observability do publish_routing_decision( request_id: ctx.request_id, - account_id: ctx.account_id, profile: profile, chain: ctx.chain, method: method, @@ -425,7 +423,6 @@ defmodule Lasso.RPC.RequestPipeline.Observability do event = RoutingDecision.new( request_id: opts[:request_id], - account_id: opts[:account_id], profile: opts[:profile], chain: opts[:chain], method: opts[:method], diff --git a/lib/lasso/core/selection/strategies/round_robin.ex b/lib/lasso/core/selection/strategies/round_robin.ex deleted file mode 100644 index 710d9ba..0000000 --- a/lib/lasso/core/selection/strategies/round_robin.ex +++ /dev/null @@ -1,30 +0,0 @@ -defmodule Lasso.RPC.Strategies.RoundRobin do - @moduledoc "Round-robin selection based on a rolling request counter from pool context." - - @behaviour Lasso.RPC.Strategy - - alias Lasso.RPC.ProviderPool - - @impl true - def prepare_context(profile, chain, _method, timeout) do - base_ctx = Lasso.RPC.StrategyContext.new(chain, timeout) - - total_requests = - case ProviderPool.get_status(profile, chain) do - {:ok, %{total_requests: tr}} when is_integer(tr) -> tr - {:ok, status} when is_map(status) -> Map.get(status, :total_requests, 0) - _ -> base_ctx.total_requests || 0 - end - - %{base_ctx | total_requests: total_requests} - end - - @doc """ - Strategy-provided channel ranking: random shuffle per call (legacy behavior). - """ - @impl true - def rank_channels(channels, _method, ctx, _profile, _chain) do - _ = ctx - Enum.shuffle(channels) - end -end diff --git a/lib/lasso/core/strategies/strategy_registry.ex b/lib/lasso/core/strategies/strategy_registry.ex index 31ea038..f9f1391 100644 --- a/lib/lasso/core/strategies/strategy_registry.ex +++ b/lib/lasso/core/strategies/strategy_registry.ex @@ -6,7 +6,7 @@ defmodule Lasso.RPC.Strategies.Registry do scattering case statements across the codebase. """ - @type strategy :: :fastest | :load_balanced | :round_robin | :latency_weighted + @type strategy :: :fastest | :load_balanced | :latency_weighted @doc """ Resolve a strategy atom to its implementation module. @@ -45,7 +45,6 @@ defmodule Lasso.RPC.Strategies.Registry do def default_registry do %{ load_balanced: Lasso.RPC.Strategies.LoadBalanced, - round_robin: Lasso.RPC.Strategies.LoadBalanced, fastest: Lasso.RPC.Strategies.Fastest, latency_weighted: Lasso.RPC.Strategies.LatencyWeighted } diff --git a/lib/lasso/events/routing_decision.ex b/lib/lasso/events/routing_decision.ex index 3c89475..90b4328 100644 --- a/lib/lasso/events/routing_decision.ex +++ b/lib/lasso/events/routing_decision.ex @@ -15,7 +15,6 @@ defmodule Lasso.Events.RoutingDecision do @type t :: %__MODULE__{ ts: pos_integer(), request_id: String.t(), - account_id: String.t() | nil, profile: String.t(), source_node: node(), source_node_id: String.t(), @@ -33,7 +32,6 @@ defmodule Lasso.Events.RoutingDecision do defstruct [ :ts, :request_id, - :account_id, :profile, :source_node, :source_node_id, @@ -56,7 +54,6 @@ defmodule Lasso.Events.RoutingDecision do %__MODULE__{ ts: attrs[:ts] || System.system_time(:millisecond), request_id: attrs[:request_id], - account_id: attrs[:account_id], profile: attrs[:profile] || "default", source_node: node(), source_node_id: get_source_node_id(), diff --git a/lib/lasso/providers/candidate_listing.ex b/lib/lasso/providers/candidate_listing.ex index e741828..0c2a336 100644 --- a/lib/lasso/providers/candidate_listing.ex +++ b/lib/lasso/providers/candidate_listing.ex @@ -100,7 +100,7 @@ defmodule Lasso.Providers.CandidateListing do priority: profile_provider.priority, capabilities: profile_provider.capabilities, archival: profile_provider.archival, - name: get_in(instance_config, [:canonical_config, :name]) || profile_provider.provider_id + name: profile_provider[:name] || profile_provider.provider_id } http_cb = InstanceState.read_circuit(instance_id, :http) diff --git a/lib/lasso/providers/catalog.ex b/lib/lasso/providers/catalog.ex index f7aa3f0..e9a0023 100644 --- a/lib/lasso/providers/catalog.ex +++ b/lib/lasso/providers/catalog.ex @@ -221,6 +221,7 @@ defmodule Lasso.Providers.Catalog do %{ instance_id: instance_id, provider_id: provider.id, + name: provider.name, priority: provider.priority, capabilities: provider.capabilities, archival: provider.archival diff --git a/lib/lasso/providers/probe_coordinator.ex b/lib/lasso/providers/probe_coordinator.ex index 2499d50..51882ff 100644 --- a/lib/lasso/providers/probe_coordinator.ex +++ b/lib/lasso/providers/probe_coordinator.ex @@ -8,7 +8,7 @@ defmodule Lasso.Providers.ProbeCoordinator do ## Probe Cycle With a 200ms tick interval and N unique instances for a chain: - - Each instance is probed every `200ms × N` on average + - Each instance is probed at most every 10s (minimum probe interval floor) - Exponential backoff per-instance on failure (2s base, 30s max, ±20% jitter) ## ETS Write Partitioning @@ -32,6 +32,7 @@ defmodule Lasso.Providers.ProbeCoordinator do @base_backoff_ms 2_000 @max_backoff_ms 30_000 @jitter_percent 0.2 + @min_probe_interval_ms 10_000 @type instance_state :: %{ instance_id: String.t(), @@ -161,7 +162,7 @@ defmodule Lasso.Providers.ProbeCoordinator do defp should_probe?(inst, now) do case inst.last_probe_monotonic do nil -> true - last -> now - last >= inst.current_backoff_ms + last -> now - last >= max(inst.current_backoff_ms, @min_probe_interval_ms) end end diff --git a/lib/lasso_web/components/core_components.ex b/lib/lasso_web/components/core_components.ex index 3d87e02..b290d7c 100644 --- a/lib/lasso_web/components/core_components.ex +++ b/lib/lasso_web/components/core_components.ex @@ -121,21 +121,24 @@ defmodule LassoWeb.CoreComponents do phx-click={JS.push("lv:clear-flash", value: %{key: @kind}) |> hide("##{@id}")} role="alert" class={[ - "fixed top-2 right-2 z-50 mr-2 w-80 rounded-lg p-3 ring-1 sm:w-96", - @kind == :info && "bg-emerald-50 fill-cyan-900 text-emerald-800 ring-emerald-500", - @kind == :error && "bg-rose-50 fill-rose-900 text-rose-900 shadow-md ring-rose-500" + "fixed bottom-4 right-4 z-50 flex items-center gap-2 rounded-md px-3 py-2 text-xs shadow-lg cursor-pointer", + @kind == :info && "bg-gray-900/95 text-gray-200 border border-gray-700", + @kind == :error && "bg-gray-900/95 text-gray-200 border border-red-900/50" ]} {@rest} > -
- <.icon :if={@kind == :info} name="hero-information-circle-mini" class="h-4 w-4" /> - <.icon :if={@kind == :error} name="hero-exclamation-circle-mini" class="h-4 w-4" /> - {@title} -
-{msg}
- + <.icon + :if={@kind == :info} + name="hero-check-circle-mini" + class="h-4 w-4 text-green-500 shrink-0" + /> + <.icon + :if={@kind == :error} + name="hero-exclamation-circle-mini" + class="h-4 w-4 text-red-400 shrink-0" + /> + {msg} + <.icon name="hero-x-mark-mini" class="h-3 w-3 text-gray-500 hover:text-gray-300 shrink-0" /> """ end @@ -161,7 +164,7 @@ defmodule LassoWeb.CoreComponents do title="We can't find the internet" phx-disconnected={show(".phx-client-error #client-error")} phx-connected={hide("#client-error")} - hidden + style="display:none" > Attempting to reconnect <.icon name="hero-arrow-path" class="ml-1 h-3 w-3 animate-spin" /> @@ -172,7 +175,7 @@ defmodule LassoWeb.CoreComponents do title="Something went wrong!" phx-disconnected={show(".phx-server-error #server-error")} phx-connected={hide("#server-error")} - hidden + style="display:none" > Hang in there while we get back on track <.icon name="hero-arrow-path" class="ml-1 h-3 w-3 animate-spin" /> diff --git a/lib/lasso_web/components/network_status_legend.ex b/lib/lasso_web/components/network_status_legend.ex index 85e6844..e9b97b0 100644 --- a/lib/lasso_web/components/network_status_legend.ex +++ b/lib/lasso_web/components/network_status_legend.ex @@ -15,75 +15,75 @@ defmodule LassoWeb.Components.NetworkStatusLegend do def legend(assigns) do ~H""" -