diff options
| author | William Boman <william@redwill.se> | 2022-07-08 18:34:38 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-07-08 18:34:38 +0200 |
| commit | 976aa4fbee8a070f362cab6f6ec84e9251a90cf9 (patch) | |
| tree | 5e8d9c9c59444a25c7801b8f39763c4ba6e1f76d /lua/mason-core/async | |
| parent | feat: add gotests, gomodifytags, impl (#28) (diff) | |
| download | mason-976aa4fbee8a070f362cab6f6ec84e9251a90cf9.tar mason-976aa4fbee8a070f362cab6f6ec84e9251a90cf9.tar.gz mason-976aa4fbee8a070f362cab6f6ec84e9251a90cf9.tar.bz2 mason-976aa4fbee8a070f362cab6f6ec84e9251a90cf9.tar.lz mason-976aa4fbee8a070f362cab6f6ec84e9251a90cf9.tar.xz mason-976aa4fbee8a070f362cab6f6ec84e9251a90cf9.tar.zst mason-976aa4fbee8a070f362cab6f6ec84e9251a90cf9.zip | |
refactor: add mason-schemas and mason-core modules (#29)
* refactor: add mason-schemas and move generated filetype map to mason-lspconfig
* refactor: add mason-core module
Diffstat (limited to 'lua/mason-core/async')
| -rw-r--r-- | lua/mason-core/async/control.lua | 75 | ||||
| -rw-r--r-- | lua/mason-core/async/init.lua | 245 | ||||
| -rw-r--r-- | lua/mason-core/async/uv.lua | 49 |
3 files changed, 369 insertions, 0 deletions
diff --git a/lua/mason-core/async/control.lua b/lua/mason-core/async/control.lua new file mode 100644 index 00000000..3252c070 --- /dev/null +++ b/lua/mason-core/async/control.lua @@ -0,0 +1,75 @@ +local a = require "mason-core.async" + +---@class Condvar +local Condvar = {} +Condvar.__index = Condvar + +function Condvar.new() + return setmetatable({ handles = {}, queue = {}, is_notifying = false }, Condvar) +end + +---@async +function Condvar:wait() + a.wait(function(resolve) + if self.is_notifying then + self.queue[resolve] = true + else + self.handles[resolve] = true + end + end) +end + +function Condvar:notify_all() + self.is_notifying = true + for handle in pairs(self.handles) do + handle() + end + self.handles = self.queue + self.queue = {} + self.is_notifying = false +end + +local Permit = {} +Permit.__index = Permit + +function Permit.new(semaphore) + return setmetatable({ semaphore = semaphore }, Permit) +end + +function Permit:forget() + local semaphore = self.semaphore + semaphore.permits = semaphore.permits + 1 + + if semaphore.permits > 0 and #semaphore.handles > 0 then + semaphore.permits = semaphore.permits - 1 + local release = table.remove(semaphore.handles, 1) + release(Permit.new(semaphore)) + end +end + +---@class Semaphore +local Semaphore = {} +Semaphore.__index = Semaphore + +---@param permits integer +function Semaphore.new(permits) + return setmetatable({ permits = permits, handles = {} }, Semaphore) +end + +---@async +function Semaphore:acquire() + if self.permits > 0 then + self.permits = self.permits - 1 + else + return a.wait(function(resolve) + table.insert(self.handles, resolve) + end) + end + + return Permit.new(self) +end + +return { + Condvar = Condvar, + Semaphore = Semaphore, +} diff --git a/lua/mason-core/async/init.lua b/lua/mason-core/async/init.lua new file mode 100644 index 00000000..c79c6e42 --- /dev/null +++ b/lua/mason-core/async/init.lua @@ -0,0 +1,245 @@ +local _ = require "mason-core.functional" +local co = coroutine + +local exports = {} + +local Promise = {} +Promise.__index = Promise + +function Promise.new(resolver) + return setmetatable({ resolver = resolver, has_resolved = false }, Promise) +end + +---@param success boolean +---@param cb fun(success: boolean, value: table) +function Promise:_wrap_resolver_cb(success, cb) + return function(...) + if self.has_resolved then + return + end + self.has_resolved = true + cb(success, { ... }) + end +end + +function Promise:__call(callback) + self.resolver(self:_wrap_resolver_cb(true, callback), self:_wrap_resolver_cb(false, callback)) +end + +local function await(resolver) + local ok, value = co.yield(Promise.new(resolver)) + if not ok then + error(value[1], 2) + end + return unpack(value) +end + +local function table_pack(...) + return { n = select("#", ...), ... } +end + +---@param async_fn fun(...) +---@param should_reject_err boolean|nil @Whether the provided async_fn takes a callback with the signature `fun(err, result)` +local function promisify(async_fn, should_reject_err) + return function(...) + local args = table_pack(...) + return await(function(resolve, reject) + if should_reject_err then + args[args.n + 1] = function(err, result) + if err then + reject(err) + else + resolve(result) + end + end + else + args[args.n + 1] = resolve + end + local ok, err = pcall(async_fn, unpack(args, 1, args.n + 1)) + if not ok then + reject(err) + end + end) + end +end + +local function new_execution_context(suspend_fn, callback, ...) + local thread = co.create(suspend_fn) + local cancelled = false + local step + step = function(...) + if cancelled then + return + end + local ok, promise_or_result = co.resume(thread, ...) + if ok then + if co.status(thread) == "suspended" then + if getmetatable(promise_or_result) == Promise then + promise_or_result(step) + else + -- yield to parent coroutine + step(coroutine.yield(promise_or_result)) + end + else + callback(true, promise_or_result) + thread = nil + end + else + callback(false, promise_or_result) + thread = nil + end + end + + step(...) + return function() + cancelled = true + thread = nil + end +end + +exports.run = function(suspend_fn, callback, ...) + return new_execution_context(suspend_fn, callback, ...) +end + +---@generic T +---@param suspend_fn T +---@return T +exports.scope = function(suspend_fn) + return function(...) + return new_execution_context(suspend_fn, function(success, err) + if not success then + error(err, 0) + end + end, ...) + end +end + +exports.run_blocking = function(suspend_fn, ...) + local resolved, ok, result + local cancel_coroutine = new_execution_context(suspend_fn, function(a, b) + resolved = true + ok = a + result = b + end, ...) + + if vim.wait(60000, function() + return resolved == true + end, 50) then + if not ok then + error(result, 2) + end + return result + else + cancel_coroutine() + error("async function failed to resolve in time.", 2) + end +end + +exports.wait = await +exports.promisify = promisify + +exports.sleep = function(ms) + await(function(resolve) + vim.defer_fn(resolve, ms) + end) +end + +exports.scheduler = function() + await(vim.schedule) +end + +---Creates a oneshot channel that can only send once. +local function oneshot_channel() + local has_sent = false + local sent_value + local saved_callback + + return { + is_closed = function() + return has_sent + end, + send = function(...) + assert(not has_sent, "Oneshot channel can only send once.") + has_sent = true + sent_value = { ... } + if saved_callback then + saved_callback(unpack(sent_value)) + end + end, + receive = function() + return await(function(resolve) + if has_sent then + resolve(unpack(sent_value)) + else + saved_callback = resolve + end + end) + end, + } +end + +---@async +---@param suspend_fns async fun()[] +---@param mode '"first"' | '"all"' +local function wait(suspend_fns, mode) + local channel = oneshot_channel() + + do + local results = {} + local thread_cancellations = {} + local count = #suspend_fns + local completed = 0 + + local function cancel() + for _, cancel_thread in ipairs(thread_cancellations) do + cancel_thread() + end + end + + for i, suspend_fn in ipairs(suspend_fns) do + thread_cancellations[i] = exports.run(suspend_fn, function(success, result) + completed = completed + 1 + if not success then + if not channel.is_closed() then + cancel() + channel.send(false, result) + results = nil + thread_cancellations = {} + end + else + results[i] = result + if mode == "first" or completed >= count then + cancel() + channel.send(true, mode == "first" and { result } or results) + results = nil + thread_cancellations = {} + end + end + end) + end + end + + local ok, results = channel.receive() + if not ok then + error(results, 2) + end + return unpack(results) +end + +---@async +---@param suspend_fns async fun()[] +function exports.wait_all(suspend_fns) + return wait(suspend_fns, "all") +end + +---@async +---@param suspend_fns async fun()[] +function exports.wait_first(suspend_fns) + return wait(suspend_fns, "first") +end + +function exports.blocking(suspend_fn) + return _.partial(exports.run_blocking, suspend_fn) +end + +return exports diff --git a/lua/mason-core/async/uv.lua b/lua/mason-core/async/uv.lua new file mode 100644 index 00000000..f3d25b04 --- /dev/null +++ b/lua/mason-core/async/uv.lua @@ -0,0 +1,49 @@ +local a = require "mason-core.async" + +---@type table<UvMethod, async fun(...)> +local M = setmetatable({}, { + __index = function(cache, method) + cache[method] = a.promisify(vim.loop[method], true) + return cache[method] + end, +}) + +return M + +---@alias UvMethod +---| '"fs_close"' +---| '"fs_open"' +---| '"fs_read"' +---| '"fs_unlink"' +---| '"fs_write"' +---| '"fs_mkdir"' +---| '"fs_mkdtemp"' +---| '"fs_mkstemp"' +---| '"fs_rmdir"' +---| '"fs_scandir"' +---| '"fs_stat"' +---| '"fs_fstat"' +---| '"fs_lstat"' +---| '"fs_rename"' +---| '"fs_fsync"' +---| '"fs_fdatasync"' +---| '"fs_ftruncate"' +---| '"fs_sendfile"' +---| '"fs_access"' +---| '"fs_chmod"' +---| '"fs_fchmod"' +---| '"fs_utime"' +---| '"fs_futime"' +---| '"fs_lutime"' +---| '"fs_link"' +---| '"fs_symlink"' +---| '"fs_readlink"' +---| '"fs_realpath"' +---| '"fs_chown"' +---| '"fs_fchown"' +---| '"fs_lchown"' +---| '"fs_copyfile"' +---| '"fs_opendir"' +---| '"fs_readdir"' +---| '"fs_closedir"' +---| '"fs_statfs"' |
