diff --git a/lib/credo/check.ex b/lib/credo/check.ex index 3e09f81e5..101fb3e4e 100644 --- a/lib/credo/check.ex +++ b/lib/credo/check.ex @@ -402,7 +402,8 @@ defmodule Credo.Check do source_files |> Task.async_stream(fn source -> run_on_source_file(exec, source, params) end, max_concurrency: exec.max_concurrent_check_runs, - timeout: :infinity + timeout: :infinity, + ordered: false ) |> Stream.run() diff --git a/lib/credo/check/consistency/collector.ex b/lib/credo/check/consistency/collector.ex index 8bd2e8736..6adb263af 100644 --- a/lib/credo/check/consistency/collector.ex +++ b/lib/credo/check/consistency/collector.ex @@ -155,8 +155,11 @@ defmodule Credo.Check.Consistency.Collector do ) do frequencies_per_source_file = source_files - |> Enum.map(&Task.async(fn -> {&1, collector.collect_matches(&1, params)} end)) - |> Enum.map(&Task.await(&1, :infinity)) + |> Task.async_stream(&{&1, collector.collect_matches(&1, params)}, + timeout: :infinity, + ordered: false + ) + |> Enum.map(fn {:ok, frequencies} -> frequencies end) frequencies = total_frequencies(frequencies_per_source_file) @@ -167,8 +170,11 @@ defmodule Credo.Check.Consistency.Collector do result = frequencies_per_source_file |> source_files_with_issues(most_frequent_match) - |> Enum.map(&Task.async(fn -> issue_formatter.(most_frequent_match, &1, params) end)) - |> Enum.flat_map(&Task.await(&1, :infinity)) + |> Task.async_stream(&issue_formatter.(most_frequent_match, &1, params), + timeout: :infinity, + ordered: false + ) + |> Enum.flat_map(fn {:ok, issue} -> issue end) result else diff --git a/lib/credo/check/design/duplicated_code.ex b/lib/credo/check/design/duplicated_code.ex index 476c9e92f..342522e76 100644 --- a/lib/credo/check/design/duplicated_code.ex +++ b/lib/credo/check/design/duplicated_code.ex @@ -54,18 +54,12 @@ defmodule Credo.Check.Design.DuplicatedCode do defp append_issues_via_issue_service(found_hashes, source_files, nodes_threshold, params, exec) when is_map(found_hashes) do found_hashes - |> Enum.map( - &Task.async(fn -> - do_append_issues_via_issue_service( - &1, - source_files, - nodes_threshold, - params, - exec - ) - end) + |> Task.async_stream( + &do_append_issues_via_issue_service(&1, source_files, nodes_threshold, params, exec), + timeout: :infinity, + ordered: false ) - |> Enum.map(&Task.await(&1, :infinity)) + |> Stream.run() end defp do_append_issues_via_issue_service( @@ -94,14 +88,14 @@ defmodule Credo.Check.Design.DuplicatedCode do end defp duplicate_nodes(source_files, mass_threshold) do - chunked_nodes = + nodes = source_files |> Enum.chunk_every(30) - |> Enum.map(&Task.async(fn -> calculate_hashes_for_chunk(&1, mass_threshold) end)) - |> Enum.map(&Task.await(&1, :infinity)) - - nodes = - Enum.reduce(chunked_nodes, %{}, fn current_hashes, existing_hashes -> + |> Task.async_stream(&calculate_hashes_for_chunk(&1, mass_threshold), + timeout: :infinity, + ordered: false + ) + |> Enum.reduce(%{}, fn {:ok, current_hashes}, existing_hashes -> Map.merge(existing_hashes, current_hashes, fn _hash, node_items1, node_items2 -> node_items1 ++ node_items2 end) @@ -203,10 +197,7 @@ defmodule Credo.Check.Design.DuplicatedCode do else hash = ast |> Credo.Code.remove_metadata() |> to_hash node_item = %{node: ast, filename: filename, mass: nil} - node_items = Map.get(existing_hashes, hash, []) - - updated_hashes = Map.put(existing_hashes, hash, node_items ++ [node_item]) - + updated_hashes = Map.update(existing_hashes, hash, [node_item], &[node_item | &1]) {ast, updated_hashes} end end diff --git a/lib/credo/check/runner.ex b/lib/credo/check/runner.ex index 6c309dd27..e44388d0b 100644 --- a/lib/credo/check/runner.ex +++ b/lib/credo/check/runner.ex @@ -20,12 +20,7 @@ defmodule Credo.Check.Runner do |> fix_deprecated_notation_for_checks_without_params() check_tuples - |> Task.async_stream( - fn check_tuple -> - run_check(exec, check_tuple) - end, - timeout: :infinity - ) + |> Task.async_stream(&run_check(exec, &1), timeout: :infinity, ordered: false) |> Stream.run() :ok diff --git a/lib/credo/cli/output/summary.ex b/lib/credo/cli/output/summary.ex index dcce0f2cc..a48c51f2d 100644 --- a/lib/credo/cli/output/summary.ex +++ b/lib/credo/cli/output/summary.ex @@ -165,13 +165,10 @@ defmodule Credo.CLI.Output.Summary do Credo.Code.prewalk(source_file, &scope_count_traverse/2, 0) end - defp scope_count([]), do: 0 - defp scope_count(source_files) when is_list(source_files) do source_files - |> Enum.map(&Task.async(fn -> scope_count(&1) end)) - |> Enum.map(&Task.await/1) - |> Enum.reduce(&(&1 + &2)) + |> Task.async_stream(&scope_count/1, ordered: false) + |> Enum.reduce(0, fn {:ok, n}, sum -> n + sum end) end @def_ops [:defmodule, :def, :defp, :defmacro] diff --git a/lib/credo/sources.ex b/lib/credo/sources.ex index 860976ddc..10c0200af 100644 --- a/lib/credo/sources.ex +++ b/lib/credo/sources.ex @@ -170,24 +170,12 @@ defmodule Credo.Sources do end defp read_files(filenames, parse_timeout) do - tasks = Enum.map(filenames, &Task.async(fn -> to_source_file(&1) end)) - - task_dictionary = - tasks - |> Enum.zip(filenames) - |> Enum.into(%{}) - - tasks_with_results = Task.yield_many(tasks, parse_timeout) - - results = - Enum.map(tasks_with_results, fn {task, res} -> - # Shutdown the tasks that did not reply nor exit - {task, res || Task.shutdown(task, :brutal_kill)} - end) - - Enum.map(results, fn - {_task, {:ok, value}} -> value - {task, nil} -> SourceFile.timed_out(task_dictionary[task]) + filenames + |> Task.async_stream(&to_source_file/1, timeout: parse_timeout, on_timeout: :kill_task) + |> Stream.zip(filenames) + |> Enum.map(fn + {{:exit, :timeout}, filename} -> SourceFile.timed_out(filename) + {{:ok, value}, _} -> value end) end