Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtekmach committed Aug 30, 2023
1 parent e9401b1 commit b1d2caa
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 123 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ jobs:
MIX_ENV: test
# TODO: Remove on Req 1.0
REQ_NOWARN_OUTPUT: true
# TODO: Remove on next finch
FINCH_REF: 9609c409764571cbd033f951b3fbd82355dda676
strategy:
fail-fast: false
matrix:
Expand Down
252 changes: 130 additions & 122 deletions lib/req/steps.ex
Original file line number Diff line number Diff line change
Expand Up @@ -680,144 +680,152 @@ defmodule Req.Steps do
run_finch(request, finch_request, finch_name, finch_options)
end

defp run_finch(request, finch_request, finch_name, finch_options) do
case request.options[:finch_request] do
defp run_finch(req, finch_req, finch_name, finch_options) do
case req.options[:finch_request] do
fun when is_function(fun, 4) ->
fun.(request, finch_request, finch_name, finch_options)
fun.(req, finch_req, finch_name, finch_options)

deprecated_fun when is_function(deprecated_fun, 1) ->
IO.warn(
"passing a :finch_request function accepting a single argument is deprecated. " <>
"See Req.Steps.run_finch/1 for more information."
)

{request, run_finch_request(deprecated_fun.(finch_request), finch_name, finch_options)}
{req, run_finch_request(deprecated_fun.(finch_req), finch_name, finch_options)}

nil ->
case request.into do
case req.into do
nil ->
{request, run_finch_request(finch_request, finch_name, finch_options)}
{req, run_finch_request(finch_req, finch_name, finch_options)}

fun when is_function(fun, 2) ->
response = Req.Response.new()

fun = fn
{:status, status}, {request, response} ->
{request, %{response | status: status}}

{:headers, fields}, {request, response} ->
fields = finch_fields_to_map(fields)
response = update_in(response.headers, &Map.merge(&1, fields))
{request, response}

{:data, data}, acc ->
case fun.({:data, data}, acc) do
{:cont, acc} ->
acc

{:halt, acc} ->
throw({:finch_halt, acc})

other ->
raise ArgumentError,
"expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{:trailers, fields}, {request, response} ->
fields = finch_fields_to_map(fields)
response = update_in(response.trailers, &Map.merge(&1, fields))
{request, response}
end

try do
# TODO: use Finch.stream_while on next Finch release
case Finch.stream(
finch_request,
finch_name,
{request, response},
fun,
finch_options
) do
{:ok, acc} ->
acc

{:error, exception} ->
{request, exception}
end
catch
{:finch_halt, acc} ->
acc
end
finch_stream_into_fun(req, finch_req, finch_name, finch_options, fun)

collectable when collectable != :self ->
{acc, collector} = Collectable.into(collectable)
response = Req.Response.new()

fun = fn
{:status, status}, {acc, request, response} ->
{acc, request, %{response | status: status}}

{:headers, fields}, {acc, request, response} ->
fields = finch_fields_to_map(fields)
response = update_in(response.headers, &Map.merge(&1, fields))
{acc, request, response}

{:data, data}, {acc, request, response} ->
acc = collector.(acc, {:cont, data})
{acc, request, response}

{:trailers, fields}, {acc, request, response} ->
fields = finch_fields_to_map(fields)
response = update_in(response.trailers, &Map.merge(&1, fields))
{acc, request, response}
end

case Finch.stream(
finch_request,
finch_name,
{acc, request, response},
fun,
finch_options
) do
{:ok, {acc, request, response}} ->
acc = collector.(acc, :done)
{request, %{response | body: acc}}

{:error, exception} ->
{request, exception}
end

# TODO: WIP
:self ->
ref = Finch.async_request(finch_request, finch_name)

{:status, status} =
receive do
{^ref, message} ->
message
end

headers =
receive do
{^ref, message} ->
# TODO: handle trailers
{:headers, headers} = message

Enum.reduce(headers, %{}, fn {name, value}, acc ->
Map.update(acc, name, [value], &(&1 ++ [value]))
end)
end

async = %Req.Async{
ref: ref,
stream_fun: &finch_parse_message/2,
cancel_fun: &finch_cancel/1
}

request = put_in(request.async, async)
response = Req.Response.new(status: status, headers: headers)
{request, response}
finch_stream_into_self(req, finch_req, finch_name, finch_options)

collectable when collectable != :self ->
finch_stream_into_collectable(req, finch_req, finch_name, finch_options, collectable)
end
end
end

defp finch_stream_into_fun(req, finch_req, finch_name, finch_options, fun) do
resp = Req.Response.new()

fun = fn
{:status, status}, {req, resp} ->
{:cont, {req, %{resp | status: status}}}

{:headers, fields}, {req, resp} ->
fields = finch_fields_to_map(fields)
resp = update_in(resp.headers, &Map.merge(&1, fields))
{:cont, {req, resp}}

{:data, data}, acc ->
fun.({:data, data}, acc)

{:trailers, fields}, {req, resp} ->
fields = finch_fields_to_map(fields)
resp = update_in(resp.trailers, &Map.merge(&1, fields))
{:cont, {req, resp}}
end

case finch_stream_while(finch_req, finch_name, {req, resp}, fun, finch_options) do
{:ok, acc} ->
acc

{:error, exception} ->
{req, exception}
end
end

defp finch_stream_into_collectable(req, finch_req, finch_name, finch_options, collectable) do
{acc, collector} = Collectable.into(collectable)
resp = Req.Response.new()

fun = fn
{:status, status}, {acc, req, resp} ->
{acc, req, %{resp | status: status}}

{:headers, fields}, {acc, req, resp} ->
fields = finch_fields_to_map(fields)
resp = update_in(resp.headers, &Map.merge(&1, fields))
{acc, req, resp}

{:data, data}, {acc, req, resp} ->
acc = collector.(acc, {:cont, data})
{acc, req, resp}

{:trailers, fields}, {acc, req, resp} ->
fields = finch_fields_to_map(fields)
resp = update_in(resp.trailers, &Map.merge(&1, fields))
{acc, req, resp}
end

case Finch.stream(finch_req, finch_name, {acc, req, resp}, fun, finch_options) do
{:ok, {acc, req, resp}} ->
acc = collector.(acc, :done)
{req, %{resp | body: acc}}

{:error, exception} ->
{req, exception}
end
end

# TODO: WIP
defp finch_stream_into_self(req, finch_req, finch_name, finch_options) do
ref = Finch.async_request(finch_req, finch_name, finch_options)

{:status, status} =
receive do
{^ref, message} ->
message
end

headers =
receive do
{^ref, message} ->
# TODO: handle trailers
{:headers, headers} = message

Enum.reduce(headers, %{}, fn {name, value}, acc ->
Map.update(acc, name, [value], &(&1 ++ [value]))
end)
end

async = %Req.Async{
ref: ref,
stream_fun: &finch_parse_message/2,
cancel_fun: &finch_cancel/1
}

req = put_in(req.async, async)
resp = Req.Response.new(status: status, headers: headers)
{req, resp}
end

# TODO: Remove when we require Finch 0.17+
if Code.ensure_loaded?(Finch) and function_exported?(Finch, :stream_while, 5) do
defp finch_stream_while(finch_req, finch_name, acc, fun, finch_options) do
Finch.stream_while(finch_req, finch_name, acc, fun, finch_options)
end
else
defp finch_stream_while(finch_req, finch_name, acc, fun, finch_options) do
fun = fn item, acc ->
case fun.(item, acc) do
{:cont, acc} ->
acc

{:halt, _acc} ->
raise ArgumentError, "returning {:halt, _acc} requires Finch 0.17+"

other ->
raise ArgumentError, "expected {:cont, _acc}, got: #{inspect(other)}"
end
end

Finch.stream(finch_req, finch_name, acc, fun, finch_options)
end
end

Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"earmark_parser": {:hex, :earmark_parser, "1.4.33", "3c3fd9673bb5dcc9edc28dd90f50c87ce506d1f71b70e3de69aa8154bc695d44", [:mix], [], "hexpm", "2d526833729b59b9fdb85785078697c72ac5e5066350663e5be6a1182da61b8f"},
"ex_doc": {:hex, :ex_doc, "0.30.5", "aa6da96a5c23389d7dc7c381eba862710e108cee9cfdc629b7ec021313900e9e", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "88a1e115dcb91cefeef7e22df4a6ebbe4634fbf98b38adcbc25c9607d6d9d8e6"},
"ezstd": {:hex, :ezstd, "1.0.8", "13584a08f2711b7b20173a41062f9c9454e255face6a30802fb676330db43495", [:rebar3], [], "hexpm", "a450e855d207cfee18263e78837964e59fe0cbed0dd99cb5e3755ebff7823171"},
"finch": {:hex, :finch, "0.16.0", "40733f02c89f94a112518071c0a91fe86069560f5dbdb39f9150042f44dcfb1a", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f660174c4d519e5fec629016054d60edd822cdfe2b7270836739ac2f97735ec5"},
"finch": {:git, "https://github.com/sneako/finch.git", "b08b594da825d50fdf5d5c8907c8107aca68a683", [ref: "b08b594"]},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
Expand Down
4 changes: 4 additions & 0 deletions test/req/steps_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1637,6 +1637,10 @@ defmodule Req.StepsTest do
refute_receive _
end

finch_stream_while? =
Code.ensure_loaded?(Finch) and function_exported?(Finch, :stream_while, 5)

@tag skip: not finch_stream_while?
test "into: fun with halt", %{bypass: bypass, url: url} do

Check failure on line 1644 in test/req/steps_test.exs

View workflow job for this annotation

GitHub Actions / test (1.13, 24.3.4.10)

test run_finch into: fun with halt (Req.StepsTest)

Check failure on line 1644 in test/req/steps_test.exs

View workflow job for this annotation

GitHub Actions / test (1.15, 26.0, lint)

test run_finch into: fun with halt (Req.StepsTest)
Bypass.expect(bypass, "GET", "/", fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
Expand Down

0 comments on commit b1d2caa

Please sign in to comment.