Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lua: structured concurrency, Promises, task pipelines #19624

Open
justinmk opened this issue Aug 2, 2022 · 77 comments
Open

Lua: structured concurrency, Promises, task pipelines #19624

justinmk opened this issue Aug 2, 2022 · 77 comments
Labels
architecture async futures/promises, async/await, concurrency, task pipelines enhancement feature request lua stdlib
Milestone

Comments

@justinmk
Copy link
Member

justinmk commented Aug 2, 2022

related: #11312

Problem

There's no builtin abstraction for:

  • Representing a task.
    • Example: vim.system() returns its own ad-hoc "task" that can be "awaited" via :wait().
  • Orchestrating "pipelines" (quasi monads?) of work ("tasks") and handling errors.
    • Example: shell-like task chains: vim.fs.files():filter(..):map(function(f) vim.fs.delete(f) end)
    • See also vim.iter.

Expected behavior

  • "Task" abstraction:
    • Maximally leveraging Lua coroutines + libuv. Only add concepts ("task", "promise") if absolutely needed.
    • Coroutines (or tasks that wrap coroutines) can be nested. (ref)
    • Util to create an awaitable task from "normal" functions (cf. "promisify"?).
      • Don't want to call await everywhere, that is a horrible consequence of the JS async/await model. Instead consider Go's go for opt-in concurrency.
      • Can e.g. vim.system() be "promisified" without its knowledge? Or could it handle differently when it detects that it's in a coroutine? So vim.system() would be synchronous normally, but vim.async(vim.system, ...):wait() would be asynchronous.
    • Document (or generalize) "coroutine to callback".
  • Structured concurrency:
    • await_all, await_any (pseudo-names). See JS Promise.all().
    • Tasks can be canceled.
    • Results (and failures) can be aggregated. (Can't do this with jobwait()!)
    • Failures/errors can be handled (possibly canceling the rest of the task tree).

Implementations

Related

notes from reddit discussion

Briefly speaking, without cancellability, a structured concurrency API can and should be based on fire-and-forget coroutine functions (fafcf). Anything else would likely either be reinventing the wheel or running into a non-composable mess that is currently Plenary’s async.

With pure fafcfs, you can have most of your requirements: ability to chain, start concurrent computations, wait for completion of arbitrary “fafcfs.” It’d be quite a expressive and elegant system.

As you notice, what’s missing is cancellability. That would require creating a special protocol that fafcfs need to conform to.

After briefly looking at async.nvim, it looks like it’s trying to build concurrency by stepping through chunks, which is kind of what Plenary’s async is trying to do.

Ideally, I’d like to try to achieve cancellability without resorting to reimplementing an event loop and a scheduler in Lua. Perhaps fafcfs with a special protocol for indicating that it shouldn’t proceed would be enough.

I believe that Neovim/Lua can take design hints from Python. In this regard, Lua fafcfs are Python coroutines. They can be nested freely. If you want to a cancellable computation, you need to use Tasks, which builds upon coroutines to provide a richer interface.

@justinmk justinmk added enhancement feature request lua stdlib labels Aug 2, 2022
@justinmk justinmk added this to the backlog milestone Aug 2, 2022
@clason
Copy link
Member

clason commented Aug 2, 2022

I believe the gold standard is https://github.com/nvim-lua/plenary.nvim#plenaryasync, but @lewis6991 rolled his own for GitSigns. Either could conceivably be upstreamed if deemed useful enough.

@lewis6991
Copy link
Member

lewis6991 commented Aug 2, 2022

For context here's the complete implementation Gitsigns uses:

local co = coroutine

local async_thread = {
   threads = {},
}

local function threadtostring(x)
   if jit then
      return string.format('%p', x)
   else
      return tostring(x):match('thread: (.*)')
   end
end

function async_thread.running()
   local thread = co.running()
   local id = threadtostring(thread)
   return async_thread.threads[id]
end

function async_thread.create(fn)
   local thread = co.create(fn)
   local id = threadtostring(thread)
   async_thread.threads[id] = true
   return thread
end

function async_thread.finished(x)
   if co.status(x) == 'dead' then
      local id = threadtostring(x)
      async_thread.threads[id] = nil
      return true
   end
   return false
end

local function execute(async_fn, ...)
   local thread = async_thread.create(async_fn)

   local function step(...)
      local ret = { co.resume(thread, ...) }
      local stat, err_or_fn, nargs = unpack(ret)

      if not stat then
         error(string.format("The coroutine failed with this message: %s\n%s",
         err_or_fn, debug.traceback(thread)))
      end

      if async_thread.finished(thread) then
         return
      end

      assert(type(err_or_fn) == "function", "type error :: expected func")

      local ret_fn = err_or_fn
      local args = { select(4, unpack(ret)) }
      args[nargs] = step
      ret_fn(unpack(args, 1, nargs))
   end

   step(...)
end

local M = {}

function M.wrap(func, argc)
   return function(...)
      if not async_thread.running() then
         return func(...)
      end
      return co.yield(func, argc, ...)
   end
end

function M.void(func)
   return function(...)
      if async_thread.running() then
         return func(...)
      end
      execute(func, ...)
   end
end

M.scheduler = M.wrap(vim.schedule, 1)

return M

I basically just copy and paste this for all my plugins now.

@justinmk
Copy link
Member Author

justinmk commented Aug 2, 2022

I basically just copy and paste this for all my plugins now.

That's a good sign. Does it support cancellation? How does error handling look? How does it look to aggregate results?

@lewis6991
Copy link
Member

lewis6991 commented Aug 2, 2022

That's a good sign. Does it support cancellation? How does error handling look? How does it look to aggregate results?

It doesn't support issuing out several coroutines with any kind of join, so there is nothing to cancel or aggregate, the 95% usecase is for use with sequential system commands (via vim.loop.spawn). We could implement that kind of stuff, I just haven't found a situation yet where I've really needed it.

How does error handling look?

Here's an example of injecting error("I'm an error") mid-way through setup() in Gitsigns:

Error executing vim.schedule lua callback: ...e/pack/packer/start/gitsigns.nvim/lua/gitsigns/async.lua:67: The coroutine failed with this message: ...im/site/p
ack/packer/start/gitsigns.nvim/lua/gitsigns.lua:429: I'm an error
stack traceback:
        [C]: in function 'error'
        ...im/site/pack/packer/start/gitsigns.nvim/lua/gitsigns.lua:429: in function <...im/site/pack/packer/start/gitsigns.nvim/lua/gitsigns.lua:393>
stack traceback:
        [C]: in function 'error'
        ...e/pack/packer/start/gitsigns.nvim/lua/gitsigns/async.lua:67: in function <...e/pack/packer/start/gitsigns.nvim/lua/gitsigns/async.lua:62>

It just issues out both stacktraces: one from the application code, and one from the async lib. I've found it useful enough that I don't feel the need to improve it.

@williamboman
Copy link
Contributor

I think this would be a great addition to the stdlib. I've been playing around with async implementations as well, and I'd like to bring forward some topics that I think are worth consideration:

  • How to handle situations where the callback that resumes execution is being invoked more than once.

  • Should callback signatures be standardized to allow easy translation between async <-> callback-style invocations? (e.g. fun(..., callback: fun (err: any?, result: any?))) For example:

    -- callback style
    vim.fs.find(".git", function (err, path)
      -- ...
    end)
    
    -- async style
    local a = vim.async
    local async_find = a.syncify(vim.fs.find) -- this could perhaps even be done automatically by vim.fs.find if it sees that no callback is passed as last arg
    local ok, path_or_err = pcall(async_find, ".git")
  • How to handle nested coroutines? Should the async coroutine forward yields to parent coroutines, on what criterion?

  • Should you be able to retrieve the result of an async function from a non-async context, e.g. through a callback?

  • How to handle multiple return values across async/sync boundaries (return val1, val2)?

  • Perhaps too early to even think about but I wanted to surface the idea anyway: Should user scripts perhaps be executed in a "global" async coroutine context? This would allow direct access to async primitives without users having to explicitly create async constructs.

@ii14
Copy link
Member

ii14 commented Aug 3, 2022

How to handle situations where the callback that resumes execution is being invoked more than once.

A closure could be created that can be called only once.

fun(..., callback: fun (err: any?, result: any?))
How to handle multiple return values across async/sync boundaries (return val1, val2)?

Maybe instead it could be ok: boolean, results: any....

How to handle nested coroutines? Should the async coroutine forward yields to parent coroutines, on what criterion?

Maybe I'm missing something, but it should always resume the caller?

edit: Okay, I get what you mean now, you're asking about suspending a nested coroutine. I think both behaviors should be supported, the standard way of using coroutine.yield directly, but also a way of suspending the entire call stack. I think this could be done by yielding a special constant, based on which the parent coroutine would decide whether to suspend or not, until it hits the main thread.

@mfussenegger
Copy link
Member

What are the current patterns for queuing promises in Lua, that are being used in the wild for Nvim plugins (and Nvim core itself)? Are these good enough? Do we need any sugar in the Nvim lua stdlib to encourage these patterns?

In some of my plugins I started using the pattern norcalli mentioned in #11312 (comment):

local function send_request_sync(method, params)
	local co = coroutine.running()
	send_request(method, params, function(...) coroutine.resume(co, ...) end)
	return coroutine.yield()
end

and found that it's both simple and effective in avoiding callback nesting.

I think if we were to add some async/await or promise abstraction it would help to formulate the problem first, to help ensure we're solving the right thing.
E.g. I can see how functionality to race two or more operations, cancellation or waiting for multiple operations can be useful and the LSP stuff in neovim could probably benefit for some of it. But if all it does is add some sugar over the coroutine pattern above then I'd say it's not worth it.

@muniter
Copy link
Member

muniter commented Aug 4, 2022

Perhaps ideas and implementation from https://github.com/kevinhwang91/promise-async can be useful.

@Chromosore
Copy link

I've read something similar about Javascript: A Study on Solving Callbacks with JavaScript Generators, hope it might help.

@hrsh7th
Copy link
Contributor

hrsh7th commented Aug 22, 2022

I am also implementing async-await for my plugin development. (I plan to use it primarily for sequencing code related to key mapping.)

Currently, I have not written the Promise.all equivalent, but will be able to add it easily.

https://github.com/hrsh7th/nvim-kit/blob/main/lua/___plugin_name___/Async/init.spec.lua#L27

@justinmk
Copy link
Member Author

I think if we were to add some async/await or promise abstraction it would help to formulate the problem first, to help ensure we're solving the right thing.
E.g. I can see how functionality to race two or more operations, cancellation or waiting for multiple operations can be useful and the LSP stuff in neovim could probably benefit for some of it. But if all it does is add some sugar over the coroutine pattern above then I'd say it's not worth it.

@mfussenegger Absolutely. If there are patterns we can document without adding sugar, that is definitely preferred. So to close this, we can document those patterns. Or both: maybe we only need sugar for "race two or more operations, cancellation or waiting for multiple operations".

@lewis6991
Copy link
Member

lewis6991 commented Aug 24, 2022

Using @mfussenegger suggestion in #19624 (comment) I've come up with a very simple and lean implementation with some necessary safety check boilerplate:

local M = {}

function M.wrap(func, argc)
  return function(...)
    local co = coroutine.running()
    if not co then
      error('wrapped functions must be called in a coroutine')
    end

    local args = {...}
    args[argc] = function(...)
      if co == coroutine.running() then
        error('callback called in same coroutine as calling function')
      end
      coroutine.resume(co, ...)
    end
    func(unpack(args, 1, argc))
    return coroutine.yield()
  end
end

function M.void(func)
  return function(...)
    if coroutine.running() then
      return func(...)
    else
      return coroutine.wrap(func)(...)
    end
  end
end

M.scheduler = M.wrap(vim.schedule, 1)

return M

Example usage:

local function foo(x, cb)
  vim.schedule(function()
    cb(x + 1)
  end)
end

local afoo = wrap(foo, 2)

local bar = void(function(x)
  local a = afoo(x)
  print('Hello', a)
end)

bar(3)

Outputs Hello 4

@wbthomason
Copy link

Just chiming in: packer has another async/await implementation based on https://github.com/ms-jpq/lua-async-await, but I consider it to be messy and would love to rip it out in favor of a stdlib alternative. It has limited support for interrupting a sequence of jobs and aggregating results, but could be improved in its API, robustness to errors, etc.

@justinmk justinmk changed the title Lua: async/await/await_all abstraction Lua: async/await/await_all abstraction, structured concurrency Oct 2, 2022
@oberblastmeister
Copy link
Contributor

I have a lot of experience with this because I implemented the async library in plenary. The first iteration was basically a direct copy of https://github.com/ms-jpq/lua-async-await. It was not the best design because there is no point for async await with thunks. It felt like the extra thunks were just there to mirror the syntax from javascript or rust. await was literally implemented like this:

function await(f)
  f(nil)
end

It was also much harder to pass async functions to higher order functions.

The new version did not need async or await, and you can call functions just like normal synchronous functions. Furthermore, higher order functions that expect synchronous functions just work like normal. There is also no extra syntax for for loops

-- now iterators just work like normal
for entry in readdir() do
end

-- instead of
local iter = await(readdir())
while true do
  local entry = await(iter())
  if entry == nil then break end
end

Back then I didn't realize that this was basically delimited continuations. Now, I think we should just implement algebraic effects using one-shot delimited continuations which is described in this paper and implemented in eff.lua. The issue with just using coroutines for converting callback based functions is that you don't get to use coroutines for anything else. With algebraic effects, you can have other effects in addition to async effects. Algebraic effects can implement Reader (local configuration), Exceptions, State (local state), Generators, and more. Also, algebraic effects separate the effects from interpretation. For example, imagine some program using the file system effect, log effect, and read effect:

local handle = handler {
  val = function(v) return v end,
  [Log] = function(k, arg)
    print(arg)
    return k()
  end,
  [ReadFile] = function(k, path)
    return k(fs.read_file(path))
  end,
  [Read] = function(k)
    return k(config)
  end
}
handle(my_program)

If you wanted to test your functions, you can change the interpretation to not use IO.

local vfs = {my_file = "asdfasdf", another_file = "adfadsfdf"}
local logs = {}

-- handle without IO!
local handle = handler {
  val = function(v) return v end,
  [Log] = function(k, arg)
    table.insert(logs, arg)
    return k()
  end,
  [ReadFile] = function(k, path)
    return k(vfs[path])
  end,
  [Read] = function(k)
    return k(config)
  end
}

Async could be implemented like this:

local handle = handler {
  val = callback,
  [Async] = function(k, fn, ...)
    fn(k, ...)
  end
}

Exceptions:

local handle = handler {
  val = function(v) return v end,
  [Exception] = function(k, e)
    return e
  end,
}

@hrsh7th
Copy link
Contributor

hrsh7th commented Dec 2, 2022

I implemented AsyncTask (which is almost like the JavaScript Promises interface, but always synchronous if possible).
(The always synchronous if possible is needed to support feedkeys handling.)

If neovim core supports async-await, I'm wondering if neovim should introduce promsie like async primitives or not.

@hrsh7th
Copy link
Contributor

hrsh7th commented Dec 3, 2022

I found the interesting PR.
luvit/luv#618

@lewis6991
Copy link
Member

lewis6991 commented Dec 3, 2022

@oberblastmeister I've taken a look at eff.lua, and it appears to be a completely generalized version of the current async library we're using. If you take eff.lua and the async implementation you mentioned and refactor enough, you get to the same async lib I implemented in gitsigns.

It's quite interesting, but I'm not sure if we need something as generalized as that. The main need is for an async lib to better utilize libuv and all of it's callback based functions.

The issue with just using coroutines for converting callback based functions is that you don't get to use coroutines for anything else.

eff.lua uses coroutines in the exact same way the current async lib does, so not sure this is true.

I implemented AsyncTask (which is almost like the JavaScript Promises interface, but always synchronous if possible). (The always synchronous if possible is needed to support feedkeys handling.)

If neovim core supports async-await, I'm wondering if neovim should introduce promsie like async primitives or not.

* async-await
  
  * [hrsh7th/nvim-kit@`main`/lua/___kit___/kit/Async/init.lua](https://github.com/hrsh7th/nvim-kit/blob/main/lua/___kit___/kit/Async/init.lua?rgh-link-date=2022-12-02T04%3A13%3A30Z)

* Keymap module
  
  * [hrsh7th/nvim-kit@`main`/lua/___kit___/kit/Vim/Keymap.spec.lua#L28](https://github.com/hrsh7th/nvim-kit/blob/main/lua/___kit___/kit/Vim/Keymap.spec.lua?rgh-link-date=2022-12-02T04%3A13%3A30Z#L28)

* IO module
  
  * [hrsh7th/nvim-kit@`main`/lua/___kit___/kit/IO/init.lua#L114](https://github.com/hrsh7th/nvim-kit/blob/main/lua/___kit___/kit/IO/init.lua?rgh-link-date=2022-12-02T04%3A13%3A30Z#L114)

* Convert callback style to Promise style
  
  * [hrsh7th/nvim-kit@`main`/lua/___kit___/kit/IO/init.lua#L54](https://github.com/hrsh7th/nvim-kit/blob/main/lua/___kit___/kit/IO/init.lua?rgh-link-date=2022-12-02T04%3A13%3A30Z#L54)

It looks like this implementation creates a new coroutine for every async function. This is a big drawback compared to the plenary/gitsigns implementations that uses a single coroutine for each context. It also requires await which isn't needed in the other implementations.

I've created https://github.com/lewis6991/async.nvim which I'm using as a reference for all the plugins I use async in. The library is small enough that (for now) it's ok to just vendor directly in plugins without needing it as a depedency.

@hrsh7th
Copy link
Contributor

hrsh7th commented Dec 3, 2022

Yes. My implementation creates a coroutine for each asynchronous operation.
I did not think this would affect performance. I will investigate.

I also wonder that "it is the drawback that requires the call to await".

Because I believe the difference is where and when to yield asynchronous primitives.

For example, look at the following example

-- Return just AsyncTask
local function timeout_async(ms)
  return AsyncTask.new(function(resolve))
    vim.defer_fn(resolve, ms)
  end)
end

-- Return AsyncTask if it is running in the main coroutine, otherwise yield.
local function timeout(ms)
  local task = timeout_async(ms)
  if vim.async.in_context() then
    return task:await()
  end
  return task
end

vim.async.run(function())
  local s = uv.now()
  timeout(200) -- does not need to `await` call.
   assert.is_true(math.abs(uv.now() - s - 200) < 10)
end)

In this example, timeout_async just returns an AsyncTask, but timeout delegates resolution to any async coroutine context.

I think it is a difficult question as to whether we should choose Promise, thunk, or another means as an asynchronous primitive.
However, it seems natural to me to introduce some kind of asynchronous primitive.

To me, the Promise style seem like a good choice, since there are many JavaScript users and they are easy to understand.
Also, it is easy to convert parallel waits instead of serial ones. e.g. await Promise.all(tasks)

Translated with www.DeepL.com/Translator (free version)

@justinmk
Copy link
Member Author

justinmk commented Dec 4, 2022

the Promise style seem like a good choice, since there are many JavaScript users and they are easy to understand.

Comforting javascript users is not a goal, and should not be factored into any of the designs here. As a typescript/javascript user myself, I'm very skeptical that async/await is a good interface. We should focus on what feels right for a Lua + Nvim.

The main need is for an async lib to better utilize libuv and all of it's callback based functions.

Since the introduce of jobstart(), long before libuv was exposed. Network tasks like LSP are another common use-case.

Sounds like https://github.com/lewis6991/async.nvim is informed by a lot of previous work, which is a good sign! Does it provide a way to cancel tasks, handle errors/cancellations, and aggregate results?

@rcarriga
Copy link

As a typescript/javascript user myself, I'm very skeptical that async/await is a good interface. We should focus on what feels right for a Lua + Nvim.

I've been using plenary's async module with neotest for a while now and have found the lack of an async/await syntax to be much more fitting with Lua's simplistic style of programming. IMO adding async/await would be clunky.

Sounds like https://github.com/lewis6991/async.nvim is informed by a lot of previous work, which is a good sign! Does it provide a way to cancel tasks, handle errors/cancellations, and aggregate results?

I wanted to introduce the async usage to nvim-dap-ui which is similar to LSP, where it's currently using a lot of callbacks. I worked off of @lewis6991's implementation and added the ability to cancel tasks and handle errors rather than them just being raised to the user. His implementation already covers aggregating results. It's very similar to Python's asyncio Task class just because I didn't want to have to design the API much. Also added some flow control primitives.

Implementation here https://github.com/rcarriga/nvim-dap-ui/blob/feat/new-api/lua/dapui/async/base.lua and some usage in tests https://github.com/rcarriga/nvim-dap-ui/blob/feat/new-api/tests/unit/async/base_spec.lua. Excuse the long namespaced names, it's just for generating documentation.

@rcarriga
Copy link

rcarriga commented Jan 3, 2023

Thought I'd update as I've been having a bit of fun with this. I've done a fair bit of refactoring and adding of features in the last few days.

The main features are currently:

  1. Handling results and errors of tasks along with cancelling.
  2. Helper functions for coordinating tasks, including gathering results from a list of tasks and getting the result of the first task to complete (and cancelling the others).
  3. Async versions of many libuv functions, pretty much the same way plenary does it with the added bonus of all of the functions being typed with emmylua annotations so users will have full type inference.
  4. Creating a fully documented and typed LSP client interface which is generated by a script based off of the LSP spec meta model and then implemented very simply with some metatables here.
  5. Async primitives (queues, events and semaphores) which are often useful in async code, again based off of Python's asyncio modules.

Not sure if it's the direction core would want to go with an async library, (e.g. the LSP client but I was doing something similar for the DAP specification and LSP was mentioned as a key reason for the async usage so thought I'd slap something together for it). I think it does demonstrate the advantages of a single library by implementing broadly useful features and providing a stable, tested solution for async that can be used by anybody.

Here is some sample usage of the LSP client. Open the script in the nvim-dap-ui branch to see the type checking/documentation with Lua LSP

local async = require("dapui").async

async
  .run(function()
    local client = async.lsp.client(2)

    -- Notifications usage
    client.notify.textDocument_willSave({
      reason = "3",
      textDocument = {
        uri = vim.uri_from_bufnr(0),
      },
    })

    -- Request usage
    local symbols = client.request.textDocument_documentSymbol(0, {
      textDocument = { uri = vim.uri_from_bufnr(0) },
    }, { timeout = 1000 })

    return symbols
  end)
  .add_callback(function(task)
    if task.error() then
      vim.schedule_wrap(error)(task.error())
    else
      print(vim.inspect(task.result()))
    end
  end)

@lewis6991
Copy link
Member

What is the purpose of add_callback. Why can't that code be appended to the end of the async function?

@rcarriga
Copy link

rcarriga commented Jan 4, 2023

The add_callback usage is very arbitrary here. It's just to show result/error handling. I'd imagine the most common use case would be just for error handling. add_callback would be preferred over just wrapping everything in pcall because pcall won't catch errors outside of the coroutine such as cancellation.

@svermeulen
Copy link
Contributor

Some work was started on structured concurrency in lua (similar to Python's trio) here in case that approach might also be considered here

@svermeulen
Copy link
Contributor

I was inspired enough by the article I linked to above that I created a plugin to provide structured concurrency in neovim lua: https://github.com/svermeulen/nvim-lusc

@hrsh7th
Copy link
Contributor

hrsh7th commented Oct 30, 2023

Should we support progress related functions in this area?

@justinmk justinmk mentioned this issue Nov 14, 2023
3 tasks
@s1n7ax

This comment was marked as off-topic.

@s1n7ax
Copy link

s1n7ax commented Nov 25, 2023

I have added Promise like error handling for https://github.com/ms-jpq/lua-async-await to handle LSP response type where the first value is the error. So no more assert checks on the response is needed. It's done in the wait() function too. So, if it's different, one can simply create a new wait function.

https://github.com/nvim-java/lua-async-await

local function lsp_request(callback)
  local timer = vim.loop.new_timer()

  assert(timer)

  timer:start(2000, 0, function()
    -- First parameter is the error
    callback('something went wrong', nil)
  end)
end

local M = require('sync')

local main = M.sync(function()
  local response = M.wait_handle_error(M.wrap(lsp_request)())
end)
  .catch(function(err)
    print('error occurred ', err)
  end)
  .run()

Result:

error occurred  test6.lua:105: something went wrong

@justinmk justinmk changed the title Lua: async/await/await_all abstraction, structured concurrency Lua: structured concurrency, task pipelines Jan 25, 2024
@neovim neovim deleted a comment from miversen33 Jan 25, 2024
@justinmk justinmk added the async futures/promises, async/await, concurrency, task pipelines label Jan 25, 2024
@justinmk justinmk changed the title Lua: structured concurrency, task pipelines Lua: structured concurrency, Promises, task pipelines Jan 29, 2024
@vurentjie
Copy link

This is what I am currently using.
https://gist.github.com/vurentjie/566a7158038ea6e044a4321c63cacde0

It is very similar to some of the samples posted above in this thread, possibly because I also derived it from /ms-jpq/lua-async-await by breaking down all the calls and simplifying.

Thought I would just share it here as another example. It's very lean and probably does not cover everything. Currently I handle errors outside of this, either via [p|xp]call() or error() depending on the use case.

@gregorias
Copy link
Contributor

Hi,

I would like to present my design for structured concurrency in Neovim: Coop.nvim. I believe it addresses all requirements you have listed out for such a framework + it’s as straightforward as such a framework could be (How it works doc). I believe it provides an elegant approach to the subject.

Here’s how Coop relates to the listed requirements:

Representing a task.
Orchestrating "pipelines" (quasi monads?) of work ("tasks") and handling errors.

See examples for examples of handling errors, running tasks in parallel, and complex awaiting conditions.

Maximally leveraging Lua coroutines + libuv. Only add concepts ("task", "promise") if absolutely needed.

Yes,

Coroutines (or tasks that wrap coroutines) can be nested. (ref)

True. In Coop you can freely nest task functions (a task function is a function that may call task.yield).

Util to create an awaitable task from "normal" functions (cf. "promisify"?).
Example: vim.system() returns its own ad-hoc "task" that can be "awaited" via :wait().
Can e.g. vim.system() be "promisified" without its knowledge?

Coop indeed “promisifies” uv.spawn. Coop’s implementation returns an awaitable future. In tests you have an example of how it turns the callback interface from Neovim’s documentation into a synchronous one:

local stdin = vim.uv.new_pipe()
local stdout = vim.uv.new_pipe()
local stderr = vim.uv.new_pipe()

local handle, pid, cat_future = uv.spawn("cat", {
	stdio = { stdin, stdout, stderr },
})

local read_future = coop.Future.new()
vim.uv.read_start(stdout, function(err, data)
	assert(not err, err)
	if data ~= nil and not read_future.done then
		read_future:complete(data)
	end
end)
vim.uv.write(stdin, "Hello World")

-- Here we wrap the APIs into a task to use the synchronous looking interface.
local task = coop.spawn(function()
	-- Wait for stdout to produce the data.
	local read_data = read_future:await()
	assert.are.same("Hello World", read_data)
	local err_stdin_shutdown = uv.shutdown(stdin)
	assert.is.Nil(err_stdin_shutdown)
	-- Wait for the cat process to finish.
	return cat_future:await()
end)

-- Wait for 200ms for the task finish.
local exit_code, exit_sign = task:await(200, 2)

Document (or generalize) "coroutine to callback".

Coop provides a generic converter callback to coroutine converter: cb_to_tf (docs.)

The generic conversion from a coroutine (a task) into a callback is done through an await overload that accepts a callback.

await_all, await_any (pseudo-names). See JS Promise.all().
Results (and failures) can be aggregated. (Can't do this with jobwait()!)

All possible. For example, Coop provides an implementation of as_completed, an iterator that returns task results as they finish: example.

Tentative implementations of await_all and await_any are in control.lua.

Tasks can be canceled.

True.

Failures/errors can be handled

True.

(possibly canceling the rest of the task tree).

Coop doesn’t natively provide the task tree abstraction to keep with the Lua’s spirit of coroutines being standalone threads. Task trees can be implemented on top of Coop, because the programmer can intercept the "cancelled" error and :cancel subtasks.

@justinmk
Copy link
Member Author

@gregorias Thanks for sharing, that looks very promising.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
architecture async futures/promises, async/await, concurrency, task pipelines enhancement feature request lua stdlib
Projects
None yet
Development

No branches or pull requests