Skip to content

Commit

Permalink
Merge pull request #86 from c0deaddict/master
Browse files Browse the repository at this point in the history
Add Jetstream domain support
  • Loading branch information
mmmries authored Oct 19, 2023
2 parents ae8c14d + aae30c0 commit bd5ca30
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 53 deletions.
49 changes: 38 additions & 11 deletions lib/jetstream/api/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Jetstream.API.Consumer do
Consumer struct fields explanation:
* `:stream_name` - name of a stream the consumer is pointing at.
* `:domain` - JetStream domain the stream is on.
* `:ack_policy` - how the messages should be acknowledged. It has the following options:
- `:explicit` - the default policy. It means that each individual message must be acknowledged.
It is the only allowed option for pull consumers.
Expand Down Expand Up @@ -92,6 +93,7 @@ defmodule Jetstream.API.Consumer do
:deliver_group,
:deliver_subject,
:description,
:domain,
:durable_name,
:filter_subject,
:flow_control,
Expand All @@ -116,6 +118,7 @@ defmodule Jetstream.API.Consumer do

@type t :: %__MODULE__{
stream_name: binary(),
domain: nil | binary(),
ack_policy: :none | :all | :explicit,
ack_wait: nil | non_neg_integer(),
backoff: nil | [non_neg_integer()],
Expand Down Expand Up @@ -226,7 +229,8 @@ defmodule Jetstream.API.Consumer do
"""
@spec create(conn :: Gnat.t(), consumer :: t()) :: {:ok, info()} | {:error, term()}
def create(conn, %__MODULE__{durable_name: name} = consumer) when not is_nil(name) do
create_topic = "$JS.API.CONSUMER.DURABLE.CREATE.#{consumer.stream_name}.#{name}"
create_topic =
"#{js_api(consumer.domain)}.CONSUMER.DURABLE.CREATE.#{consumer.stream_name}.#{name}"

with :ok <- validate_durable(consumer),
{:ok, raw_response} <- request(conn, create_topic, create_payload(consumer)) do
Expand All @@ -235,7 +239,7 @@ defmodule Jetstream.API.Consumer do
end

def create(conn, %__MODULE__{} = consumer) do
create_topic = "$JS.API.CONSUMER.CREATE.#{consumer.stream_name}"
create_topic = "#{js_api(consumer.domain)}.CONSUMER.CREATE.#{consumer.stream_name}"

with :ok <- validate(consumer),
{:ok, raw_response} <- request(conn, create_topic, create_payload(consumer)) do
Expand All @@ -256,10 +260,15 @@ defmodule Jetstream.API.Consumer do
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Consumer.delete(:gnat, "wrong_stream", "consumer")
"""
@spec delete(conn :: Gnat.t(), stream_name :: binary(), consumer_name :: binary()) ::
@spec delete(
conn :: Gnat.t(),
stream_name :: binary(),
consumer_name :: binary(),
domain :: nil | binary()
) ::
:ok | {:error, any()}
def delete(conn, stream_name, consumer_name) do
topic = "$JS.API.CONSUMER.DELETE.#{stream_name}.#{consumer_name}"
def delete(conn, stream_name, consumer_name, domain \\ nil) do
topic = "#{js_api(domain)}.CONSUMER.DELETE.#{stream_name}.#{consumer_name}"

with {:ok, _response} <- request(conn, topic, "") do
:ok
Expand All @@ -278,10 +287,15 @@ defmodule Jetstream.API.Consumer do
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Consumer.info(:gnat, "wrong_stream", "consumer")
"""
@spec info(conn :: Gnat.t(), stream_name :: binary(), consumer_name :: binary()) ::
@spec info(
conn :: Gnat.t(),
stream_name :: binary(),
consumer_name :: binary(),
domain :: nil | binary()
) ::
{:ok, info()} | {:error, any()}
def info(conn, stream_name, consumer_name) do
topic = "$JS.API.CONSUMER.INFO.#{stream_name}.#{consumer_name}"
def info(conn, stream_name, consumer_name, domain \\ nil) do
topic = "#{js_api(domain)}.CONSUMER.INFO.#{stream_name}.#{consumer_name}"

with {:ok, raw} <- request(conn, topic, "") do
{:ok, to_info(raw)}
Expand All @@ -299,15 +313,21 @@ defmodule Jetstream.API.Consumer do
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Consumer.list(:gnat, "wrong_stream")
"""
@spec list(conn :: Gnat.t(), stream_name :: binary(), params :: [offset: non_neg_integer()]) ::
@spec list(
conn :: Gnat.t(),
stream_name :: binary(),
params :: [offset: non_neg_integer(), domain: nil | binary()]
) ::
{:ok, consumers()} | {:error, term()}
def list(conn, stream_name, params \\ []) do
domain = Keyword.get(params, :domain)

payload =
Jason.encode!(%{
offset: Keyword.get(params, :offset, 0)
})

with {:ok, raw} <- request(conn, "$JS.API.CONSUMER.NAMES.#{stream_name}", payload) do
with {:ok, raw} <- request(conn, "#{js_api(domain)}.CONSUMER.NAMES.#{stream_name}", payload) do
response = %{
consumers: Map.get(raw, "consumers"),
limit: Map.get(raw, "limit"),
Expand Down Expand Up @@ -351,13 +371,15 @@ defmodule Jetstream.API.Consumer do
stream_name :: binary(),
consumer_name :: binary(),
reply_to :: String.t(),
domain :: nil | binary(),
opts :: keyword()
) :: :ok
def request_next_message(
conn,
stream_name,
consumer_name,
reply_to,
domain \\ nil,
opts \\ []
) do
default_payload = %{batch: 1}
Expand All @@ -379,12 +401,17 @@ defmodule Jetstream.API.Consumer do

Gnat.pub(
conn,
"$JS.API.CONSUMER.MSG.NEXT.#{stream_name}.#{consumer_name}",
"#{js_api(domain)}.CONSUMER.MSG.NEXT.#{stream_name}.#{consumer_name}",
payload,
reply_to: reply_to
)
end

# https://docs.nats.io/running-a-nats-service/configuration/leafnodes/jetstream_leafnodes
defp js_api(nil), do: "$JS.API"
defp js_api(""), do: "$JS.API"
defp js_api(domain), do: "$JS.#{domain}.API"

defp create_payload(%__MODULE__{} = cons) do
%{
config: %{
Expand Down
8 changes: 5 additions & 3 deletions lib/jetstream/api/kv.ex
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,9 @@ defmodule Jetstream.API.KV do
iex>{:ok, %{"key1" => "value1"}} = Jetstream.API.KV.contents(:gnat, "my_bucket")
"""
@spec contents(conn :: Gnat.t(), bucket_name :: binary()) :: {:ok, map()} | {:error, binary()}
def contents(conn, bucket_name) do
@spec contents(conn :: Gnat.t(), bucket_name :: binary(), domain :: nil | binary()) ::
{:ok, map()} | {:error, binary()}
def contents(conn, bucket_name, domain \\ nil) do
stream = stream_name(bucket_name)
inbox = Util.reply_inbox()
consumer_name = "all_key_values_consumer_#{Util.nuid()}"
Expand All @@ -207,14 +208,15 @@ defmodule Jetstream.API.KV do
durable_name: consumer_name,
deliver_subject: inbox,
stream_name: stream,
domain: domain,
ack_policy: :none,
max_ack_pending: -1,
max_deliver: 1
}) do
keys = receive_keys(bucket_name)

:ok = Gnat.unsub(conn, sub)
:ok = Consumer.delete(conn, stream, consumer_name)
:ok = Consumer.delete(conn, stream, consumer_name, domain)

{:ok, keys}
end
Expand Down
5 changes: 3 additions & 2 deletions lib/jetstream/api/kv/watcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ defmodule Jetstream.API.KV.Watcher do
conn: opts[:conn],
bucket_name: opts[:bucket_name],
sub: sub,
consumer_name: consumer_name
consumer_name: consumer_name,
domain: Keyword.get(opts, :domain)
}}
end

def terminate(_reason, state) do
stream = KV.stream_name(state.bucket_name)
:ok = Gnat.unsub(state.conn, state.sub)
:ok = Consumer.delete(state.conn, stream, state.consumer_name)
:ok = Consumer.delete(state.conn, stream, state.consumer_name, state.domain)
end

# Received from NATS when headers are on the message (delete)
Expand Down
4 changes: 2 additions & 2 deletions lib/jetstream/api/object.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ defmodule Jetstream.API.Object do
{:ok, body} <- Jason.encode(meta),
{:ok, _msg} <- Gnat.request(conn, topic, body, headers: [{"Nats-Rollup", "sub"}]) do
filter = chunk_stream_topic(meta)
Stream.purge(conn, stream_name(bucket_name), %{filter: filter})
Stream.purge(conn, stream_name(bucket_name), nil, %{filter: filter})
end
end

Expand Down Expand Up @@ -209,7 +209,7 @@ defmodule Jetstream.API.Object do
defp purge_prior_chunks(conn, bucket, name) do
case info(conn, bucket, name) do
{:ok, meta} ->
Stream.purge(conn, stream_name(bucket), %{filter: chunk_stream_topic(meta)})
Stream.purge(conn, stream_name(bucket), nil, %{filter: chunk_stream_topic(meta)})

{:error, %{"code" => 404}} ->
:ok
Expand Down
70 changes: 50 additions & 20 deletions lib/jetstream/api/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ defmodule Jetstream.API.Stream do
* `:discard` - determines what happens when a Stream reaches its limits. It has the following options:
- `:old` - the default option. Old messages are deleted.
- `:new` - refuses new messages.
* `:domain` - JetStream domain, mainly used for leaf nodes.
See [JetStream on Leaf Nodes](https://docs.nats.io/running-a-nats-service/configuration/leafnodes/jetstream_leafnodes).
* `:duplicate_window` - the window within which to track duplicate messages, expressed in nanoseconds.
* `:max_age` - maximum age of any message in the Stream, expressed in nanoseconds.
* `:max_bytes` - how many bytes the Stream may contain. Adheres to `:discard`, removing oldest or
Expand Down Expand Up @@ -65,6 +67,7 @@ defmodule Jetstream.API.Stream do
:description,
:mirror,
:name,
:domain,
:no_ack,
:placement,
:sources,
Expand Down Expand Up @@ -100,6 +103,7 @@ defmodule Jetstream.API.Stream do
deny_purge: boolean(),
description: nil | binary(),
discard: :old | :new,
domain: nil | binary(),
duplicate_window: nil | nanoseconds(),
max_age: nanoseconds(),
max_bytes: integer(),
Expand Down Expand Up @@ -262,7 +266,11 @@ defmodule Jetstream.API.Stream do
def create(conn, %__MODULE__{} = stream) do
with :ok <- validate(stream),
{:ok, stream} <-
request(conn, "$JS.API.STREAM.CREATE.#{stream.name}", Jason.encode!(stream)) do
request(
conn,
"#{js_api(stream.domain)}.STREAM.CREATE.#{stream.name}",
Jason.encode!(stream)
) do
{:ok, to_info(stream)}
end
end
Expand All @@ -280,7 +288,11 @@ defmodule Jetstream.API.Stream do
def update(conn, %__MODULE__{} = stream) do
with :ok <- validate(stream),
{:ok, stream} <-
request(conn, "$JS.API.STREAM.UPDATE.#{stream.name}", Jason.encode!(stream)) do
request(
conn,
"#{js_api(stream.domain)}.STREAM.UPDATE.#{stream.name}",
Jason.encode!(stream)
) do
{:ok, to_info(stream)}
end
end
Expand All @@ -297,9 +309,10 @@ defmodule Jetstream.API.Stream do
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Stream.delete(:gnat, "wrong_stream")
"""
@spec delete(conn :: Gnat.t(), stream_name :: binary()) :: :ok | {:error, any()}
def delete(conn, stream_name) when is_binary(stream_name) do
with {:ok, _response} <- request(conn, "$JS.API.STREAM.DELETE.#{stream_name}", "") do
@spec delete(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary()) ::
:ok | {:error, any()}
def delete(conn, stream_name, domain \\ nil) when is_binary(stream_name) do
with {:ok, _response} <- request(conn, "#{js_api(domain)}.STREAM.DELETE.#{stream_name}", "") do
:ok
end
end
Expand All @@ -316,9 +329,10 @@ defmodule Jetstream.API.Stream do
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Stream.purge(:gnat, "wrong_stream")
"""
@spec purge(conn :: Gnat.t(), stream_name :: binary()) :: :ok | {:error, any()}
def purge(conn, stream_name) when is_binary(stream_name) do
with {:ok, _response} <- request(conn, "$JS.API.STREAM.PURGE.#{stream_name}", "") do
@spec purge(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary()) ::
:ok | {:error, any()}
def purge(conn, stream_name, domain \\ nil) when is_binary(stream_name) do
with {:ok, _response} <- request(conn, "#{js_api(domain)}.STREAM.PURGE.#{stream_name}", "") do
:ok
end
end
Expand All @@ -329,16 +343,17 @@ defmodule Jetstream.API.Stream do
## Examples
iex> Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "stream", subjects: ["sub1", "sub2"]})
iex> Jetstream.API.Stream.purge(:gnat, "stream", %{filter: "sub1"})
iex> Jetstream.API.Stream.purge(:gnat, "stream", nil, %{filter: "sub1"})
:ok
"""
@type method :: %{filter: String.t()}
@spec purge(conn :: Gnat.t(), stream_name :: binary(), method) :: :ok | {:error, any()}
def purge(conn, stream_name, method) when is_binary(stream_name) do
@spec purge(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary(), method) ::
:ok | {:error, any()}
def purge(conn, stream_name, domain, method) when is_binary(stream_name) do
with :ok <- validate_purge_method(method),
body <- Jason.encode!(method),
{:ok, _response} <- request(conn, "$JS.API.STREAM.PURGE.#{stream_name}", body) do
{:ok, _response} <- request(conn, "#{js_api(domain)}.STREAM.PURGE.#{stream_name}", body) do
:ok
end
end
Expand All @@ -354,10 +369,10 @@ defmodule Jetstream.API.Stream do
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Stream.info(:gnat, "wrong_stream")
"""
@spec info(conn :: Gnat.t(), stream_name :: binary()) ::
@spec info(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary()) ::
{:ok, info()} | {:error, any()}
def info(conn, stream_name) when is_binary(stream_name) do
with {:ok, decoded} <- request(conn, "$JS.API.STREAM.INFO.#{stream_name}", "") do
def info(conn, stream_name, domain \\ nil) when is_binary(stream_name) do
with {:ok, decoded} <- request(conn, "#{js_api(domain)}.STREAM.INFO.#{stream_name}", "") do
{:ok, to_info(decoded)}
end
end
Expand All @@ -375,16 +390,21 @@ defmodule Jetstream.API.Stream do
iex> {:ok, %{total: _, offset: 0, limit: 1024, streams: _}} = Jetstream.API.Stream.list(:gnat)
"""
@spec list(conn :: Gnat.t(), params :: [{:offset, non_neg_integer()}, {:subject, binary()}]) ::
@spec list(
conn :: Gnat.t(),
params :: [offset: non_neg_integer(), subject: binary(), domain: nil | binary()]
) ::
{:ok, streams()} | {:error, term()}
def list(conn, params \\ []) do
domain = Keyword.get(params, :domain)

payload =
Jason.encode!(%{
offset: Keyword.get(params, :offset, 0),
subject: Keyword.get(params, :subject)
})

with {:ok, decoded} <- request(conn, "$JS.API.STREAM.NAMES", payload) do
with {:ok, decoded} <- request(conn, "#{js_api(domain)}.STREAM.NAMES", payload) do
result = %{
limit: Map.get(decoded, "limit"),
offset: Map.get(decoded, "offset"),
Expand All @@ -399,12 +419,17 @@ defmodule Jetstream.API.Stream do
@doc """
Get a message from the stream either by "stream sequence number" or the "last message for a given subject"
"""
@spec get_message(conn :: Gnat.t(), stream_name :: binary(), method :: message_access_method()) ::
@spec get_message(
conn :: Gnat.t(),
stream_name :: binary(),
method :: message_access_method(),
domain :: nil | binary()
) ::
{:ok, message_response()} | {:error, response_error()}
def get_message(conn, stream_name, method) when is_map(method) do
def get_message(conn, stream_name, method, domain \\ nil) when is_map(method) do
with :ok <- validate_message_access_method(method),
{:ok, %{"message" => message}} <-
request(conn, "$JS.API.STREAM.MSG.GET.#{stream_name}", Jason.encode!(method)) do
request(conn, "#{js_api(domain)}.STREAM.MSG.GET.#{stream_name}", Jason.encode!(method)) do
{:ok,
%{
data: decode_base64(message["data"]),
Expand All @@ -416,6 +441,11 @@ defmodule Jetstream.API.Stream do
end
end

# https://docs.nats.io/running-a-nats-service/configuration/leafnodes/jetstream_leafnodes
defp js_api(nil), do: "$JS.API"
defp js_api(""), do: "$JS.API"
defp js_api(domain), do: "$JS.#{domain}.API"

defp to_state(state) do
%{
bytes: Map.fetch!(state, "bytes"),
Expand Down
2 changes: 2 additions & 0 deletions lib/jetstream/pull_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ defmodule Jetstream.PullConsumer do
connection. When this value is exceeded, the pull consumer stops with the `:timeout` reason,
defaults to `10`
* `:inbox_prefix` - allows the default `_INBOX.` prefix to be customized. Should end with a dot.
* `:domain` - use a JetStream domain, this is mostly used on leaf nodes.
## Dynamic Connection Options
Expand Down Expand Up @@ -203,6 +204,7 @@ defmodule Jetstream.PullConsumer do
| {:consumer_name, String.t()}
| {:connection_retry_timeout, non_neg_integer()}
| {:connection_retries, non_neg_integer()}
| {:domain, String.t()}

@typedoc """
Connection options used to connect the consumer to NATS server.
Expand Down
Loading

0 comments on commit bd5ca30

Please sign in to comment.