Skip to content

Commit

Permalink
mix format
Browse files Browse the repository at this point in the history
  • Loading branch information
c0deaddict committed Oct 19, 2023
1 parent cb00bee commit aae30c0
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 16 deletions.
24 changes: 20 additions & 4 deletions lib/jetstream/api/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ defmodule Jetstream.API.Consumer do
"""
@spec create(conn :: Gnat.t(), consumer :: t()) :: {:ok, info()} | {:error, term()}
def create(conn, %__MODULE__{durable_name: name} = consumer) when not is_nil(name) do
create_topic = "#{js_api(consumer.domain)}.CONSUMER.DURABLE.CREATE.#{consumer.stream_name}.#{name}"
create_topic =
"#{js_api(consumer.domain)}.CONSUMER.DURABLE.CREATE.#{consumer.stream_name}.#{name}"

with :ok <- validate_durable(consumer),
{:ok, raw_response} <- request(conn, create_topic, create_payload(consumer)) do
Expand Down Expand Up @@ -259,7 +260,12 @@ defmodule Jetstream.API.Consumer do
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Consumer.delete(:gnat, "wrong_stream", "consumer")
"""
@spec delete(conn :: Gnat.t(), stream_name :: binary(), consumer_name :: binary(), domain :: nil | binary()) ::
@spec delete(
conn :: Gnat.t(),
stream_name :: binary(),
consumer_name :: binary(),
domain :: nil | binary()
) ::
:ok | {:error, any()}
def delete(conn, stream_name, consumer_name, domain \\ nil) do
topic = "#{js_api(domain)}.CONSUMER.DELETE.#{stream_name}.#{consumer_name}"
Expand All @@ -281,7 +287,12 @@ defmodule Jetstream.API.Consumer do
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Consumer.info(:gnat, "wrong_stream", "consumer")
"""
@spec info(conn :: Gnat.t(), stream_name :: binary(), consumer_name :: binary(), domain :: nil | binary()) ::
@spec info(
conn :: Gnat.t(),
stream_name :: binary(),
consumer_name :: binary(),
domain :: nil | binary()
) ::
{:ok, info()} | {:error, any()}
def info(conn, stream_name, consumer_name, domain \\ nil) do
topic = "#{js_api(domain)}.CONSUMER.INFO.#{stream_name}.#{consumer_name}"
Expand All @@ -302,10 +313,15 @@ defmodule Jetstream.API.Consumer do
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Consumer.list(:gnat, "wrong_stream")
"""
@spec list(conn :: Gnat.t(), stream_name :: binary(), params :: [offset: non_neg_integer(), domain: nil | binary()]) ::
@spec list(
conn :: Gnat.t(),
stream_name :: binary(),
params :: [offset: non_neg_integer(), domain: nil | binary()]
) ::
{:ok, consumers()} | {:error, term()}
def list(conn, stream_name, params \\ []) do
domain = Keyword.get(params, :domain)

payload =
Jason.encode!(%{
offset: Keyword.get(params, :offset, 0)
Expand Down
3 changes: 2 additions & 1 deletion lib/jetstream/api/kv.ex
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ defmodule Jetstream.API.KV do
iex>{:ok, %{"key1" => "value1"}} = Jetstream.API.KV.contents(:gnat, "my_bucket")
"""
@spec contents(conn :: Gnat.t(), bucket_name :: binary(), domain :: nil | binary()) :: {:ok, map()} | {:error, binary()}
@spec contents(conn :: Gnat.t(), bucket_name :: binary(), domain :: nil | binary()) ::
{:ok, map()} | {:error, binary()}
def contents(conn, bucket_name, domain \\ nil) do
stream = stream_name(bucket_name)
inbox = Util.reply_inbox()
Expand Down
34 changes: 27 additions & 7 deletions lib/jetstream/api/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,11 @@ defmodule Jetstream.API.Stream do
def create(conn, %__MODULE__{} = stream) do
with :ok <- validate(stream),
{:ok, stream} <-
request(conn, "#{js_api(stream.domain)}.STREAM.CREATE.#{stream.name}", Jason.encode!(stream)) do
request(
conn,
"#{js_api(stream.domain)}.STREAM.CREATE.#{stream.name}",
Jason.encode!(stream)
) do
{:ok, to_info(stream)}
end
end
Expand All @@ -284,7 +288,11 @@ defmodule Jetstream.API.Stream do
def update(conn, %__MODULE__{} = stream) do
with :ok <- validate(stream),
{:ok, stream} <-
request(conn, "#{js_api(stream.domain)}.STREAM.UPDATE.#{stream.name}", Jason.encode!(stream)) do
request(
conn,
"#{js_api(stream.domain)}.STREAM.UPDATE.#{stream.name}",
Jason.encode!(stream)
) do
{:ok, to_info(stream)}
end
end
Expand All @@ -301,7 +309,8 @@ defmodule Jetstream.API.Stream do
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Stream.delete(:gnat, "wrong_stream")
"""
@spec delete(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary()) :: :ok | {:error, any()}
@spec delete(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary()) ::
:ok | {:error, any()}
def delete(conn, stream_name, domain \\ nil) when is_binary(stream_name) do
with {:ok, _response} <- request(conn, "#{js_api(domain)}.STREAM.DELETE.#{stream_name}", "") do
:ok
Expand All @@ -320,7 +329,8 @@ defmodule Jetstream.API.Stream do
iex> {:error, %{"code" => 404, "description" => "stream not found"}} = Jetstream.API.Stream.purge(:gnat, "wrong_stream")
"""
@spec purge(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary()) :: :ok | {:error, any()}
@spec purge(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary()) ::
:ok | {:error, any()}
def purge(conn, stream_name, domain \\ nil) when is_binary(stream_name) do
with {:ok, _response} <- request(conn, "#{js_api(domain)}.STREAM.PURGE.#{stream_name}", "") do
:ok
Expand All @@ -338,7 +348,8 @@ defmodule Jetstream.API.Stream do
"""
@type method :: %{filter: String.t()}
@spec purge(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary(), method) :: :ok | {:error, any()}
@spec purge(conn :: Gnat.t(), stream_name :: binary(), domain :: nil | binary(), method) ::
:ok | {:error, any()}
def purge(conn, stream_name, domain, method) when is_binary(stream_name) do
with :ok <- validate_purge_method(method),
body <- Jason.encode!(method),
Expand Down Expand Up @@ -379,10 +390,14 @@ defmodule Jetstream.API.Stream do
iex> {:ok, %{total: _, offset: 0, limit: 1024, streams: _}} = Jetstream.API.Stream.list(:gnat)
"""
@spec list(conn :: Gnat.t(), params :: [offset: non_neg_integer(), subject: binary(), domain: nil | binary()]) ::
@spec list(
conn :: Gnat.t(),
params :: [offset: non_neg_integer(), subject: binary(), domain: nil | binary()]
) ::
{:ok, streams()} | {:error, term()}
def list(conn, params \\ []) do
domain = Keyword.get(params, :domain)

payload =
Jason.encode!(%{
offset: Keyword.get(params, :offset, 0),
Expand All @@ -404,7 +419,12 @@ defmodule Jetstream.API.Stream do
@doc """
Get a message from the stream either by "stream sequence number" or the "last message for a given subject"
"""
@spec get_message(conn :: Gnat.t(), stream_name :: binary(), method :: message_access_method(), domain :: nil | binary()) ::
@spec get_message(
conn :: Gnat.t(),
stream_name :: binary(),
method :: message_access_method(),
domain :: nil | binary()
) ::
{:ok, message_response()} | {:error, response_error()}
def get_message(conn, stream_name, method, domain \\ nil) when is_map(method) do
with :ok <- validate_message_access_method(method),
Expand Down
2 changes: 1 addition & 1 deletion lib/jetstream/pull_consumer/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Jetstream.PullConsumer.Server do
connection_name: connection_name,
connection_retry_timeout: connection_retry_timeout,
connection_retries: connection_retries,
domain: domain,
domain: domain
},
listening_topic: listening_topic,
module: module
Expand Down
4 changes: 2 additions & 2 deletions lib/off_broadway/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ with {:module, _} <- Code.ensure_compiled(Broadway) do
:connection_retry_timeout,
:connection_retries,
:inbox_prefix,
:domain,
:domain
])
|> ConnectionOptions.validate!()

Expand Down Expand Up @@ -208,7 +208,7 @@ with {:module, _} <- Code.ensure_compiled(Broadway) do
%ConnectionOptions{
connection_name: connection_name,
connection_retries: retries,
connection_retry_timeout: retry_timeout,
connection_retry_timeout: retry_timeout
} = conn_options,
listening_topic: listening_topic,
connection_retries_left: retries_left
Expand Down
4 changes: 3 additions & 1 deletion test/jetstream/api/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ defmodule Jetstream.API.ConsumerTest do
consumer_name: consumer_name,
reply_subject: reply_subject
} do
Consumer.request_next_message(:gnat, stream_name, consumer_name, reply_subject, nil, batch: 10)
Consumer.request_next_message(:gnat, stream_name, consumer_name, reply_subject, nil,
batch: 10
)

Gnat.pub(:gnat, subject, "message 1")

Expand Down

0 comments on commit aae30c0

Please sign in to comment.