Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add response streaming #220

Merged
merged 1 commit into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Req is a batteries-included HTTP client for Elixir.

* Range requests (via [`put_range`]) step.)

* Response streaming

* Follows redirects (via [`follow_redirects`] step.)

* Retries on errors (via [`retry`] step.)
Expand Down Expand Up @@ -183,11 +185,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

[`Req.request/1`]: https://hexdocs.pm/req/Req.html#request/1
[`Req.new/1`]: https://hexdocs.pm/req/Req.html#new/1
[`Req.get!/2`]: https://hexdocs.pm/req/Req.html#get!/2
[`Req.post!/2`]: https://hexdocs.pm/req/Req.html#post!/2
[`Req.Request`]: https://hexdocs.pm/req/Req.Request.html
[`Req.request/1`]: https://hexdocs.pm/req/Req.html#request/1
[`Req.new/1`]: https://hexdocs.pm/req/Req.html#new/1
[`Req.get!/2`]: https://hexdocs.pm/req/Req.html#get!/2
[`Req.post!/2`]: https://hexdocs.pm/req/Req.html#post!/2
[`Req.async_request/2`]: https://hexdocs.pm/req/Req.html#async_request/2
[`Req.Request`]: https://hexdocs.pm/req/Req.Request.html

[`auth`]: https://hexdocs.pm/req/Req.Steps.html#auth/1
[`cache`]: https://hexdocs.pm/req/Req.Steps.html#cache/1
Expand Down
99 changes: 91 additions & 8 deletions lib/req.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Req do
@moduledoc """
@moduledoc ~S"""
The high-level API.

Req is composed of three main pieces:
Expand Down Expand Up @@ -35,8 +35,38 @@ defmodule Req do
iex> stream = Stream.duplicate("foo", 3)
iex> Req.post!("https://httpbin.org/post", body: {:stream, stream}).body["data"]
"foofoofoo"

Response streaming using callback:

iex> resp =
...> Req.get!("http://httpbin.org/stream/2", stream: fn {:data, data}, acc ->
...> IO.puts("got chunk with #{byte_size(data)} bytes")
...> {:cont, acc}
...> end)
# Outputs: got chunk with 249 bytes
# Outputs: got chunk with 249 bytes
iex> resp.status
200
iex> resp.body
""
"""

# TODO: Add when new version of Finch is out.
# Response streaming to caller:
#
# iex> {req, resp} = Req.async_request!("http://httpbin.org/stream/2")
# iex> resp.status
# 200
# iex> resp.body
# ""
# iex> Req.parse_message(req, receive do message -> message end)
# [{:data, "{\"url\": \"http://httpbin.org/stream/2\"" <> ...}]
# iex> Req.parse_message(req, receive do message -> message end)
# [{:data, "{\"url\": \"http://httpbin.org/stream/2\"" <> ...}]
# iex> Req.parse_message(req, receive do message -> message end)
# [:done]
# ""

@type url() :: URI.t() | String.t()

@doc """
Expand Down Expand Up @@ -71,9 +101,9 @@ defmodule Req do

Can be one of:

* `iodata`
* `iodata` - send request body eagerly

* `{:stream, enumerable}`
* `{:stream, enumerable}` - stream `enumerable` as request body

Additional URL options:

Expand Down Expand Up @@ -146,9 +176,20 @@ defmodule Req do

* `:max_redirects` - the maximum number of redirects, defaults to `10`.

Response streaming:

* `:stream` - a 2-arity function used to stream response. The first argument is a "Stream Command" tuple
described below. The second argument is a `{request, response}` tuple.

The "Stream Command" is one of:

* `{:data, data}` - a chunk of the response body.

See module documentation for an example of streaming responses.

Retry options ([`retry`](`Req.Steps.retry/1`) step):

* `:retry`: can be set to: `:safe` (default) to only retry GET/HEAD requests on HTTP 408/5xx
* `:retry` - can be set to: `:safe` (default) to only retry GET/HEAD requests on HTTP 408/5xx
responses or exceptions, `false` to never retry, and `fun` - a 1-arity function that accepts
either a `Req.Response` or an exception struct and returns boolean whether to retry

Expand Down Expand Up @@ -212,13 +253,13 @@ defmodule Req do
iex> URI.to_string(req.url)
"https://elixir-lang.org"

With mock adapter:
Fake adapter:

iex> mock = fn request ->
iex> fake = fn request ->
...> {request, Req.Response.new(status: 200, body: "it works!")}
...> end
iex>
iex> req = Req.new(adapter: mock)
iex> req = Req.new(adapter: fake)
iex> Req.get!(req).body
"it works!"

Expand Down Expand Up @@ -353,7 +394,7 @@ defmodule Req do
"""
@spec update(Req.Request.t(), options :: keyword()) :: Req.Request.t()
def update(%Req.Request{} = request, options) when is_list(options) do
request_option_names = [:method, :url, :headers, :body, :adapter]
request_option_names = [:method, :url, :headers, :body, :adapter, :stream]

{request_options, options} = Keyword.split(options, request_option_names)

Expand Down Expand Up @@ -918,6 +959,48 @@ defmodule Req do
end
end

# TODO
@doc false
def async_request(request, options \\ []) do
Req.Request.run_request(%{new(request, options) | stream: :self})
end

# TODO
@doc false
def async_request!(request, options \\ []) do
case async_request(request, options) do
{request, %Req.Response{} = response} ->
{request, response}

{_request, exception} ->
raise exception
end
end

def parse_message(%Req.Request{} = request, message) do
request.async.stream_fun.(request.async.ref, message)
end

def cancel_async_request(%Req.Request{} = request) do
request.async.cancel_fun.(request.async.ref)
end

def run_request(request, options \\ []) do
request
|> Req.update(options)
|> Req.Request.run_request()
end

def run_request!(request, options \\ []) do
case run_request(request, options) do
{request, %Req.Response{} = response} ->
{request, response}

{_request, exception} ->
raise exception
end
end

@doc """
Returns default options.

Expand Down
5 changes: 5 additions & 0 deletions lib/req/async.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule Req.Async do
# TODO
@moduledoc false
defstruct [:ref, :stream_fun, :cancel_fun]
end
4 changes: 3 additions & 1 deletion lib/req/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ defmodule Req.Request do
error_steps: [],
private: %{},
registered_options: MapSet.new(),
current_request_steps: []
current_request_steps: [],
stream: nil,
async: nil

@doc """
Returns a new request struct.
Expand Down
70 changes: 68 additions & 2 deletions lib/req/steps.ex
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ defmodule Req.Steps do

iex> Req.get!(url, connect_options: [transport_opts: [cacerts: :public_key.cacerts_get()]])

Stream response body:
Stream response body using `Finch.stream/5`:

fun = fn request, finch_request, finch_name, finch_options ->
fun = fn
Expand Down Expand Up @@ -682,7 +682,56 @@ defmodule Req.Steps do
{request, run_finch_request(deprecated_fun.(finch_request), finch_name, finch_options)}

nil ->
{request, run_finch_request(finch_request, finch_name, finch_options)}
case request.stream do
nil ->
{request, run_finch_request(finch_request, finch_name, finch_options)}

fun when is_function(fun, 2) ->
response = Req.Response.new()

fun = fn
{:status, status}, {request, response} ->
{request, %{response | status: status}}

{:headers, headers}, {request, response} ->
{request, %{response | headers: headers}}

{:data, data}, acc ->
{:cont, result} = fun.({:data, data}, acc)
# TODO: handle {:halt, result}
result
end

case Finch.stream(finch_request, finch_name, {request, response}, fun, finch_options) do
{:ok, acc} ->
acc
end

:self ->
ref = Finch.async_request(finch_request, finch_name)

{:status, status} =
receive do
{^ref, message} ->
message
end

{:headers, headers} =
receive do
{^ref, message} ->
message
end

async = %Req.Async{
ref: ref,
stream_fun: &finch_parse_message/2,
cancel_fun: &finch_cancel/1
}

request = put_in(request.async, async)
response = Req.Response.new(status: status, headers: headers)
{request, response}
end
end
end

Expand All @@ -693,6 +742,23 @@ defmodule Req.Steps do
end
end

defp finch_parse_message(ref, {ref, {:data, data}}) do
{:ok, [{:data, data}]}
end

defp finch_parse_message(ref, {ref, :done}) do
{:ok, [:done]}
end

# TODO: handle remaining possible Finch results
defp finch_parse_message(_ref, _other) do
:unknown
end

defp finch_cancel(ref) do
Finch.cancel_async_request(ref)
end

defp finch_name(request) do
if name = request.options[:finch] do
if request.options[:connect_options] do
Expand Down
16 changes: 13 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ defmodule Req.MixProject do
NimbleCSV.RFC4180,
Plug.Test,
:brotli,
:ezstd
:ezstd,
# TODO: Remove on next Finch release
Finch
]
]
]
Expand All @@ -35,7 +37,7 @@ defmodule Req.MixProject do
def application do
[
mod: {Req.Application, []},
extra_applications: [:logger]
extra_applications: [:logger, :inets]
]
end

Expand All @@ -52,7 +54,7 @@ defmodule Req.MixProject do

defp deps do
[
{:finch, "~> 0.9"},
{:finch, "~> 0.9", finch_opts()},
{:mime, "~> 1.6 or ~> 2.0"},
{:jason, "~> 1.0"},
{:nimble_csv, "~> 1.0", optional: true},
Expand All @@ -64,6 +66,14 @@ defmodule Req.MixProject do
]
end

defp finch_opts do
if path = System.get_env("FINCH_PATH") do
[path: path]
else
[]
end
end

defp docs do
[
main: "readme",
Expand Down
Loading
Loading