Skip to content

Commit

Permalink
Reduce the number of op codes executed during async reads. (#12)
Browse files Browse the repository at this point in the history
Fixes #11.
  • Loading branch information
electricshaman authored Apr 3, 2017
1 parent 5747a76 commit e342e16
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
10 changes: 4 additions & 6 deletions lib/tm_mercury/read_async_task.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ defmodule TM.Mercury.ReadAsyncTask do
require Logger
alias TM.Mercury.Reader

def start_link(reader_pid, callback, plan) do
def start_link(reader_pid, callback) do
loop(%{status: :running,
reader: reader_pid,
cb: callback,
plan: plan,
tag_count: 0})
cb: callback})
end

defp loop(state) do
Expand All @@ -24,9 +22,9 @@ defmodule TM.Mercury.ReadAsyncTask do
end
end

defp dispatch(%{status: :running, reader: rdr, cb: cb, plan: plan} = state) do
defp dispatch(%{status: :running, reader: rdr, cb: cb} = state) do
try do
case Reader.read_sync(rdr, plan) do
case Reader.read_sync_prepared(rdr) do
{:ok, tags} when length(tags) == 0 -> state
{:ok, tags} when length(tags) > 0 ->
send(cb, {:tm_mercury, :tags, tags})
Expand Down
20 changes: 19 additions & 1 deletion lib/tm_mercury/reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ defmodule TM.Mercury.Reader do
GenServer.call(pid, [:read_sync, rp])
end

@doc """
Read tags synchronously using the current reader configuration, skipping preparation steps in `prepare_read`.
Care should be taken that any necessary preparation is performed separately before calling this function.
"""
@spec read_sync(pid) :: {:ok, term} | {:error, term}
def read_sync_prepared(pid) do
GenServer.call(pid, :read_sync_prepared)
end

@doc """
Start reading tags asynchronously using the current reader configuration
Tags will be sent to the process provided as the callback until `stop_read_async` is called.
Expand Down Expand Up @@ -386,6 +395,15 @@ defmodule TM.Mercury.Reader do
handle_read_sync(rp, state)
end

def handle_call(:read_sync_prepared, _from, %{transport: ts, reader: rdr, read_timeout: timeout} = state) do
case execute_read_sync(ts, rdr, timeout) do
{:error, :no_tags_found} ->
{:reply, {:ok, []}, state}
other ->
{:reply, other, state}
end
end

def handle_call([:read_async_start, cb, %ReadPlan{} = rp], _from, state) do
# Pseudo-async until we implement true continuous reading
# Use the provided read plan
Expand Down Expand Up @@ -570,7 +588,7 @@ defmodule TM.Mercury.Reader do
case ReadPlan.validate(rp) do
[errors: []] ->
{:ok, new_reader} = prepare_read(ts, rdr, rp)
{:ok, task_pid} = Task.start_link(ReadAsyncTask, :start_link, [self(), callback, rp])
{:ok, task_pid} = Task.start_link(ReadAsyncTask, :start_link, [self(), callback])
# Return the reader in case any settings changed during prepare
{:ok, task_pid, new_reader}
[errors: errors] ->
Expand Down

0 comments on commit e342e16

Please sign in to comment.