Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ We have build some elixir implementation examples using `plug` based and `phoeni
1. [upcase-server](/priv/dev/upcase/README.md): `plug` based MCP server using streamable_http
2. [echo-elixir](/priv/dev/echo-elixir/README.md): `phoenix` based MCP server using sse
3. [ascii-server](/priv/dev/ascii/README.md): `phoenix_live_view` based MCP server using streamable_http and UI
4. [oauth-example](/priv/dev/oauth_example/README.md): `plug` based MCP server using streamable_http + Oauth authorization (mocked)

## License

Expand Down
189 changes: 189 additions & 0 deletions lib/hermes/server/authorization.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
defmodule Hermes.Server.Authorization do
@moduledoc false

import Peri

@type token :: String.t()
@type authorization_server :: String.t()

@type token_info :: %{
sub: String.t(),
aud: String.t() | list(String.t()) | nil,
scope: String.t() | nil,
exp: integer() | nil,
iat: integer() | nil,
client_id: String.t() | nil,
active: boolean()
}

@type config :: %{
authorization_servers: list(authorization_server()),
resource_metadata_url: String.t() | nil,
realm: String.t(),
scopes_supported: list(String.t()) | nil,
validator: module() | nil
}

@type www_authenticate_params :: %{
realm: String.t(),
resource_metadata: String.t() | nil,
authorization_servers: list(String.t()) | nil,
scope: String.t() | nil,
error: String.t() | nil,
error_description: String.t() | nil,
error_uri: String.t() | nil
}

defschema(:config_schema, %{
authorization_servers: {:required, {:list, :string}},
resource_metadata_url: {:string, {:default, nil}},
realm: {:string, {:default, "mcp-server"}},
scopes_supported: {{:list, :string}, {:default, []}},
validator: {:atom, {:default, nil}}
})

@doc """
Parses and validates authorization configuration.
"""
@spec parse_config!(keyword() | map()) :: config()
def parse_config!(opts), do: config_schema!(Map.new(opts))

@doc """
Builds a WWW-Authenticate header value for 401 responses per RFC 9728.

The header format follows:
WWW-Authenticate: Bearer realm="example",
resource_metadata="https://server.example.com/.well-known/oauth-protected-resource",
authorization_servers="https://as.example.com https://as2.example.com"
"""
@spec build_www_authenticate_header(config(), keyword()) :: String.t()
def build_www_authenticate_header(config, opts \\ []) do
params = build_www_authenticate_params(config, opts)

param_string =
params
|> Enum.map(&format_www_authenticate_param/1)
|> Enum.reject(&is_nil/1)
|> Enum.join(", ")

"Bearer " <> param_string
end

@doc """
Builds Protected Resource Metadata response per RFC 9728.

This should be served at /.well-known/oauth-protected-resource
"""
@spec build_resource_metadata(config()) :: map()
def build_resource_metadata(config) do
%{
"resource" => get_canonical_server_uri(),
"authorization_servers" => config.authorization_servers,
"scopes_supported" => config.scopes_supported,
"bearer_methods_supported" => ["header"],
"resource_documentation" => "https://modelcontextprotocol.io",
"resource_signing_alg_values_supported" => ["RS256", "ES256"]
}
|> Enum.reject(fn {_, v} -> is_nil(v) end)
|> Map.new()
end

@doc """
Validates that a token is intended for this resource server.

Checks the audience claim to ensure the token was issued for this specific
MCP server, preventing token confusion attacks.
"""
@spec validate_audience(token_info(), String.t()) :: :ok | {:error, :invalid_audience}
def validate_audience(%{aud: aud}, expected_audience) when is_binary(aud) do
if aud == expected_audience, do: :ok, else: {:error, :invalid_audience}
end

def validate_audience(%{aud: aud_list}, expected_audience) when is_list(aud_list) do
if expected_audience in aud_list, do: :ok, else: {:error, :invalid_audience}
end

def validate_audience(_, _), do: {:error, :invalid_audience}

@doc """
Checks if a token has expired based on the exp claim.
"""
@spec validate_expiry(token_info()) :: :ok | {:error, :expired_token}
def validate_expiry(%{exp: exp}) when is_integer(exp) do
now = System.system_time(:second)
if now < exp, do: :ok, else: {:error, :expired_token}
end

def validate_expiry(_), do: :ok

@doc """
Validates token has required scopes.
"""
@spec validate_scopes(token_info(), list(String.t())) :: :ok | {:error, :insufficient_scope}
def validate_scopes(%{scope: token_scope}, required_scopes) when is_binary(token_scope) do
token_scopes = String.split(token_scope, " ", trim: true)

if Enum.all?(required_scopes, &(&1 in token_scopes)) do
:ok
else
{:error, :insufficient_scope}
end
end

def validate_scopes(_, []), do: :ok
def validate_scopes(_, _), do: {:error, :insufficient_scope}

# Private functions

defp build_www_authenticate_params(config, opts) do
%{
realm: config.realm,
resource_metadata: config.resource_metadata_url,
authorization_servers: Enum.join(config.authorization_servers, " "),
scope: Keyword.get(opts, :scope),
error: Keyword.get(opts, :error),
error_description: Keyword.get(opts, :error_description),
error_uri: Keyword.get(opts, :error_uri)
}
end

defp format_www_authenticate_param({:realm, value}) when is_binary(value) do
~s(realm="#{escape_quotes(value)}")
end

defp format_www_authenticate_param({:resource_metadata, value}) when is_binary(value) do
~s(resource_metadata="#{escape_quotes(value)}")
end

defp format_www_authenticate_param({:authorization_servers, value}) when is_binary(value) and value != "" do
~s(authorization_servers="#{escape_quotes(value)}")
end

defp format_www_authenticate_param({:scope, value}) when is_binary(value) do
~s(scope="#{escape_quotes(value)}")
end

defp format_www_authenticate_param({:error, value}) when is_binary(value) do
~s(error="#{escape_quotes(value)}")
end

defp format_www_authenticate_param({:error_description, value}) when is_binary(value) do
~s(error_description="#{escape_quotes(value)}")
end

defp format_www_authenticate_param({:error_uri, value}) when is_binary(value) do
~s(error_uri="#{escape_quotes(value)}")
end

defp format_www_authenticate_param(_), do: nil

defp escape_quotes(string) do
String.replace(string, ~s("), ~s(\\"))
end

defp get_canonical_server_uri do
Copy link

Copilot AI Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function returns a placeholder URI that should be configurable. Consider making this part of the authorization configuration rather than relying on an environment variable.

Copilot uses AI. Check for mistakes.
# This should be configured based on the actual server URL
# For now, return a placeholder that should be overridden
System.get_env("MCP_SERVER_URI", "https://mcp.example.com")
end
end
140 changes: 140 additions & 0 deletions lib/hermes/server/authorization/introspection_validator.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
defmodule Hermes.Server.Authorization.IntrospectionValidator do
@moduledoc """
OAuth 2.0 Token Introspection validator for authorization.

Validates tokens by calling an OAuth 2.0 Token Introspection endpoint (RFC 7662).
This is useful when tokens are opaque (not JWTs) or when you need server-side
token validation with real-time revocation checking.

## Configuration

auth_config = [
authorization_servers: ["https://auth.example.com"],
realm: "my-app-realm",
validator: Hermes.Server.Authorization.IntrospectionValidator,

# Required
token_introspection_endpoint: "https://auth.example.com/oauth/introspect",

# Optional - for authenticating to the introspection endpoint
introspection_client_id: "my-mcp-server",
introspection_client_secret: "client-secret"
]

## Features

- Real-time token validation
- Support for opaque tokens
- Immediate revocation detection
- Client authentication support (Basic Auth)

## Token Info

Returns token information based on the introspection response:

%{
sub: "user123", # Subject from introspection
aud: "https://api.example", # Audience claim
scope: "read write", # OAuth scopes
exp: 1234567890, # Expiration timestamp
iat: 1234567800, # Issued at timestamp
client_id: "app123", # OAuth client ID
active: true # From introspection response
}

## Security Note

The introspection endpoint should be protected and only accessible by
authorized resource servers. Use client credentials when required by
your authorization server.
"""

@behaviour Hermes.Server.Authorization.Validator

@impl true
def validate_token(token, config) do
with {:ok, response} <- introspect_token(token, config) do
parse_introspection_response(response)
end
end

@doc """
Introspects a token using the OAuth 2.0 Token Introspection endpoint.
"""
def introspect_token(token, %{token_introspection_endpoint: endpoint} = config) when is_binary(endpoint) do
body =
URI.encode_query(%{
"token" => token,
"token_type_hint" => "access_token"
})

headers = build_introspection_headers(config)

with {:ok, request} <- Hermes.HTTP.build(:post, endpoint, headers, body),
{:ok, %Finch.Response{status: 200, body: resp_body}} <-
Finch.request(request, Hermes.Finch),
{:ok, json} <- JSON.decode(resp_body) do
{:ok, json}
else
{:ok, %Finch.Response{status: status}} ->
{:error, {:introspection_failed, status}}

{:error, reason} ->
{:error, reason}
end
end

def introspect_token(_, _), do: {:error, :no_introspection_endpoint}

defp build_introspection_headers(config) do
base_headers = %{
"content-type" => "application/x-www-form-urlencoded",
"accept" => "application/json"
}

# Add client authentication if configured
case config do
%{introspection_client_id: client_id, introspection_client_secret: secret} ->
auth = Base.encode64("#{client_id}:#{secret}")
Map.put(base_headers, "authorization", "Basic #{auth}")

_ ->
base_headers
end
end

defp parse_introspection_response(%{"active" => false}) do
{:error, :invalid_token}
end

defp parse_introspection_response(%{"active" => true} = response) do
token_info = %{
sub: response["sub"],
aud: response["aud"],
scope: response["scope"],
exp: response["exp"],
iat: response["iat"],
client_id: response["client_id"],
active: true
}

# Validate expiration if present
case token_info do
%{exp: exp} when is_integer(exp) ->
now = System.system_time(:second)

if now < exp do
{:ok, token_info}
else
{:error, :expired_token}
end

_ ->
{:ok, token_info}
end
end

defp parse_introspection_response(_) do
{:error, :invalid_introspection_response}
end
end
Loading