Skip to content

Commit

Permalink
Merge pull request #78 from mmmries/object-store
Browse files Browse the repository at this point in the history
Add support for using the ObjectStore pattern
  • Loading branch information
mmmries authored Jul 6, 2023
2 parents a890140 + 820493f commit 30ddfb5
Show file tree
Hide file tree
Showing 7 changed files with 597 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
pair:
- otp: "24.3.4"
elixir: "1.12"
nats: "2.2.0"
nats: "2.3.0"

- otp: "24.3.4"
elixir: "main"
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ jetstream-*.tar

# ASDF tool versions
.tool-versions

# tmp directory to test files
tmp
316 changes: 316 additions & 0 deletions lib/jetstream/api/object.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
defmodule Jetstream.API.Object do
@moduledoc """
API for interacting with the JetStream Object Store
Learn more about Object Store: https://docs.nats.io/nats-concepts/jetstream/obj_store
"""
alias Jetstream.API.{Consumer, Stream, Util}
alias Jetstream.API.Object.Meta

@stream_prefix "OBJ_"
@subject_prefix "$O."

@type bucket_opt ::
{:description, String.t()}
| {:max_bucket_size, integer()}
| {:max_chunk_size, integer()}
| {:placement, Stream.placement()}
| {:replicas, non_neg_integer()}
| {:storage, :file | :memory}
| {:ttl, non_neg_integer()}
@spec create_bucket(Gnat.t(), String.t(), list(bucket_opt)) ::
{:ok, Stream.info()} | {:error, any()}
def create_bucket(conn, bucket_name, params \\ []) do
with :ok <- validate_bucket_name(bucket_name) do
stream = %Stream{
name: stream_name(bucket_name),
subjects: stream_subjects(bucket_name),
description: Keyword.get(params, :description),
discard: :new,
allow_rollup_hdrs: true,
max_age: Keyword.get(params, :ttl, 0),
max_bytes: Keyword.get(params, :max_bucket_size, -1),
max_msg_size: Keyword.get(params, :max_chunk_size, -1),
num_replicas: Keyword.get(params, :replicas, 1),
storage: Keyword.get(params, :storage, :file),
placement: Keyword.get(params, :placement),
duplicate_window: adjust_duplicate_window(Keyword.get(params, :ttl, 0))
}

Stream.create(conn, stream)
end
end

@spec delete_bucket(Gnat.t(), String.t()) :: :ok | {:error, any}
def delete_bucket(conn, bucket_name) do
Stream.delete(conn, stream_name(bucket_name))
end

@spec delete(Gnat.t(), String.t(), String.t()) :: :ok | {:error, any}
def delete(conn, bucket_name, object_name) do
with {:ok, meta} <- info(conn, bucket_name, object_name),
meta <- %Meta{meta | deleted: true},
topic <- meta_stream_topic(bucket_name, object_name),
{:ok, body} <- Jason.encode(meta),
{:ok, _msg} <- Gnat.request(conn, topic, body, headers: [{"Nats-Rollup", "sub"}]) do
filter = chunk_stream_topic(meta)
Stream.purge(conn, stream_name(bucket_name), %{filter: filter})
end
end

@spec get(Gnat.t(), String.t(), String.t(), (binary -> any())) :: :ok | {:error, any}
def get(conn, bucket_name, object_name, chunk_fun) do
with {:ok, %{config: _stream}} <- Stream.info(conn, stream_name(bucket_name)),
{:ok, meta} <- info(conn, bucket_name, object_name) do
receive_chunks(conn, meta, chunk_fun)
end
end

@spec info(Gnat.t(), String.t(), String.t()) :: {:ok, Meta.t()} | {:error, any}
def info(conn, bucket_name, object_name) do
with {:ok, _stream_info} <- Stream.info(conn, stream_name(bucket_name)) do
Stream.get_message(conn, stream_name(bucket_name), %{
last_by_subj: meta_stream_topic(bucket_name, object_name)
})
|> case do
{:ok, message} ->
meta = json_to_meta(message.data)
{:ok, meta}

error ->
error
end
end
end

@type list_option :: {:show_deleted, boolean()}
@spec list(Gnat.t(), String.t(), list(list_option())) :: {:error, any} | {:ok, list(Meta.t())}
def list(conn, bucket_name, options \\ []) do
with {:ok, %{config: stream}} <- Stream.info(conn, stream_name(bucket_name)),
topic <- Util.reply_inbox(),
{:ok, sub} <- Gnat.sub(conn, self(), topic),
{:ok, consumer} <-
Consumer.create(conn, %Consumer{
stream_name: stream.name,
deliver_subject: topic,
deliver_policy: :last_per_subject,
filter_subject: meta_stream_subject(bucket_name),
ack_policy: :none,
max_ack_pending: nil,
replay_policy: :instant,
max_deliver: 1
}),
{:ok, messages} <- receive_all_metas(sub, consumer.num_pending) do
:ok = Gnat.unsub(conn, sub)
:ok = Consumer.delete(conn, stream.name, consumer.name)

show_deleted = Keyword.get(options, :show_deleted, false)

if show_deleted do
{:ok, messages}
else
{:ok, Enum.reject(messages, &(&1.deleted == true))}
end
end
end

@spec put(Gnat.t(), String.t(), String.t(), File.io_device()) ::
{:ok, Meta.t()} | {:error, any()}
def put(conn, bucket_name, object_name, io) do
nuid = Util.nuid()
chunk_topic = chunk_stream_topic(bucket_name, nuid)

with {:ok, %{config: _}} <- Stream.info(conn, stream_name(bucket_name)),
:ok <- purge_prior_chunks(conn, bucket_name, object_name),
{:ok, chunks, size, digest} <- send_chunks(conn, io, chunk_topic) do
object_meta = %Meta{
name: object_name,
bucket: bucket_name,
nuid: nuid,
size: size,
chunks: chunks,
digest: "SHA-256=#{Base.url_encode64(digest)}"
}

topic = meta_stream_topic(bucket_name, object_name)
body = Jason.encode!(object_meta)

case Gnat.request(conn, topic, body, headers: [{"Nats-Rollup", "sub"}]) do
{:ok, _} ->
{:ok, object_meta}

error ->
error
end
end
end

defp stream_name(bucket_name) do
"#{@stream_prefix}#{bucket_name}"
end

defp stream_subjects(bucket_name) do
[
chunk_stream_subject(bucket_name),
meta_stream_subject(bucket_name)
]
end

defp chunk_stream_subject(bucket_name) do
"#{@subject_prefix}#{bucket_name}.C.>"
end

defp chunk_stream_topic(bucket_name, nuid) do
"#{@subject_prefix}#{bucket_name}.C.#{nuid}"
end

defp chunk_stream_topic(%Meta{bucket: bucket, nuid: nuid}) do
"#{@subject_prefix}#{bucket}.C.#{nuid}"
end

defp meta_stream_subject(bucket_name) do
"#{@subject_prefix}#{bucket_name}.M.>"
end

defp meta_stream_topic(bucket_name, object_name) do
key = Base.url_encode64(object_name)
"#{@subject_prefix}#{bucket_name}.M.#{key}"
end

@two_minutes_in_nanoseconds 1_200_000_000
# The `duplicate_window` can't be greater than the `max_age`. The default `duplicate_window`
# is 2 minutes. We'll keep the 2 minute window UNLESS the ttl is less than 2 minutes
defp adjust_duplicate_window(ttl) when ttl > 0 and ttl < @two_minutes_in_nanoseconds, do: ttl
defp adjust_duplicate_window(_ttl), do: @two_minutes_in_nanoseconds

defp json_to_meta(json) do
raw = Jason.decode!(json)

%{
"bucket" => bucket,
"chunks" => chunks,
"digest" => digest,
"name" => name,
"nuid" => nuid,
"size" => size
} = raw

%Meta{
bucket: bucket,
chunks: chunks,
digest: digest,
deleted: Map.get(raw, "deleted", false),
name: name,
nuid: nuid,
size: size
}
end

defp purge_prior_chunks(conn, bucket, name) do
case info(conn, bucket, name) do
{:ok, meta} ->
Stream.purge(conn, stream_name(bucket), %{filter: chunk_stream_topic(meta)})

{:error, %{"code" => 404}} ->
:ok

{:error, other} ->
{:error, other}
end
end

defp receive_all_metas(sid, num_pending, messages \\ [])

defp receive_all_metas(_sid, 0, messages) do
{:ok, messages}
end

defp receive_all_metas(sid, remaining, messages) do
receive do
{:msg, %{sid: ^sid, body: body}} ->
meta = json_to_meta(body)
receive_all_metas(sid, remaining - 1, [meta | messages])
after
10_000 ->
{:error, :timeout_waiting_for_messages}
end
end

defp receive_chunks(conn, %Meta{} = meta, chunk_fun) do
topic = chunk_stream_topic(meta)
stream = stream_name(meta.bucket)
inbox = Util.reply_inbox()
{:ok, sub} = Gnat.sub(conn, self(), inbox)

{:ok, consumer} =
Consumer.create(conn, %Consumer{
stream_name: stream,
deliver_subject: inbox,
deliver_policy: :all,
filter_subject: topic,
ack_policy: :none,
max_ack_pending: nil,
replay_policy: :instant,
max_deliver: 1
})

:ok = receive_chunks(sub, meta.chunks, chunk_fun)

:ok = Gnat.unsub(conn, sub)
:ok = Consumer.delete(conn, stream, consumer.name)
end

defp receive_chunks(_sub, 0, _chunk_fun) do
:ok
end

defp receive_chunks(sub, remaining, chunk_fun) do
receive do
{:msg, %{sid: ^sub, body: body}} ->
chunk_fun.(body)
receive_chunks(sub, remaining - 1, chunk_fun)
after
10_000 ->
{:error, :timeout_waiting_for_messages}
end
end

@chunk_size 128 * 1024
defp send_chunks(conn, io, topic) do
sha = :crypto.hash_init(:sha256)
size = 0
chunks = 0
send_chunks(conn, io, topic, sha, size, chunks)
end

defp send_chunks(conn, io, topic, sha, size, chunks) do
case IO.binread(io, @chunk_size) do
:eof ->
sha = :crypto.hash_final(sha)
{:ok, chunks, size, sha}

{:error, err} ->
{:error, err}

bytes ->
sha = :crypto.hash_update(sha, bytes)
size = size + byte_size(bytes)
chunks = chunks + 1

case Gnat.request(conn, topic, bytes) do
{:ok, _} ->
send_chunks(conn, io, topic, sha, size, chunks)

error ->
error
end
end
end

defp validate_bucket_name(name) do
case Regex.match?(~r/^[a-zA-Z0-9_-]+$/, name) do
true -> :ok
false -> {:error, "invalid bucket name"}
end
end
end
34 changes: 34 additions & 0 deletions lib/jetstream/api/object/meta.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Jetstream.API.Object.Meta do
@enforce_keys [:bucket, :chunks, :digest, :name, :nuid, :size]
defstruct bucket: nil,
chunks: nil,
deleted: false,
digest: nil,
name: nil,
nuid: nil,
size: nil

@type t :: %__MODULE__{
bucket: String.t(),
chunks: non_neg_integer(),
deleted: boolean(),
digest: String.t(),
name: String.t(),
nuid: String.t(),
size: non_neg_integer()
}
end

defimpl Jason.Encoder, for: Jetstream.API.Object.Meta do
alias Jetstream.API.Object.Meta

def encode(%Meta{deleted: true} = meta, opts) do
Map.take(meta, [:bucket, :chunks, :deleted, :digest, :name, :nuid, :size])
|> Jason.Encode.map(opts)
end

def encode(meta, opts) do
Map.take(meta, [:bucket, :chunks, :digest, :name, :nuid, :size])
|> Jason.Encode.map(opts)
end
end
Loading

0 comments on commit 30ddfb5

Please sign in to comment.