aboutsummaryrefslogtreecommitdiffstats
path: root/lua
diff options
context:
space:
mode:
Diffstat (limited to 'lua')
-rw-r--r--lua/mason-core/async/control.lua62
1 files changed, 60 insertions, 2 deletions
diff --git a/lua/mason-core/async/control.lua b/lua/mason-core/async/control.lua
index c9479540..57aa88db 100644
--- a/lua/mason-core/async/control.lua
+++ b/lua/mason-core/async/control.lua
@@ -15,9 +15,14 @@ function Condvar:wait()
end)
end
+function Condvar:notify()
+ local handle = table.remove(self.handles)
+ pcall(handle)
+end
+
function Condvar:notify_all()
- for _, handle in ipairs(self.handles) do
- pcall(handle)
+ while #self.handles > 0 do
+ self:notify()
end
self.handles = {}
end
@@ -97,8 +102,61 @@ function OneShotChannel:receive()
return unpack(self.value)
end
+---@class Channel
+---@field private condvar Condvar
+---@field private buffer any?
+---@field is_closed boolean
+local Channel = {}
+Channel.__index = Channel
+function Channel.new()
+ return setmetatable({
+ condvar = Condvar.new(),
+ buffer = nil,
+ is_closed = false,
+ }, Channel)
+end
+
+function Channel:close()
+ self.is_closed = true
+end
+
+---@async
+function Channel:send(value)
+ assert(not self.is_closed, "Channel is closed.")
+ while self.buffer ~= nil do
+ self.condvar:wait()
+ end
+ self.buffer = value
+ self.condvar:notify()
+ while self.buffer ~= nil do
+ self.condvar:wait()
+ end
+end
+
+---@async
+function Channel:receive()
+ assert(not self.is_closed, "Channel is closed.")
+ while self.buffer == nil do
+ self.condvar:wait()
+ end
+ local value = self.buffer
+ self.buffer = nil
+ self.condvar:notify()
+ return value
+end
+
+---@async
+function Channel:iter()
+ return function()
+ while not self.is_closed do
+ return self:receive()
+ end
+ end
+end
+
return {
Condvar = Condvar,
Semaphore = Semaphore,
OneShotChannel = OneShotChannel,
+ Channel = Channel,
}