diff options
author | Himbeer <himbeer@disroot.org> | 2024-07-30 20:27:51 +0200 |
---|---|---|
committer | Himbeer <himbeer@disroot.org> | 2024-07-30 20:28:49 +0200 |
commit | 6c9cad1f566f6a8716fa94fb84668bf9d6c62a2a (patch) | |
tree | fdb99def34905af988efe615b8d8b62983e69159 /src/lib/channel.zig | |
parent | 48d6fa3e80193a3cc735f5e0b0390a18a7bf5a83 (diff) |
channel: Implement and fully switch to joined channel receiving
Unjoined receiving is no longer possible due to complications with whether the unjoined copy should be popped when a joined process receives it.
Closes #56.
Diffstat (limited to 'src/lib/channel.zig')
-rw-r--r-- | src/lib/channel.zig | 49 |
1 files changed, 30 insertions, 19 deletions
diff --git a/src/lib/channel.zig b/src/lib/channel.zig index 30d8a5a..7687afa 100644 --- a/src/lib/channel.zig +++ b/src/lib/channel.zig @@ -6,18 +6,19 @@ const std = @import("std"); const Allocator = std.mem.Allocator; pub const Error = error{ + NotJoined, WouldBlock, }; pub const Message = struct { bytes: []const u8, - refcount: usize = 0, + refcount: usize = 1, - fn clone(self: *Message) !void { - self.refcount = try std.math.add(self.refcount, 1); + fn addReference(self: *Message) !void { + self.refcount = try std.math.add(usize, self.refcount, 1); } - fn deinit(self: *Message) void { + fn dropReference(self: *Message) void { self.refcount -= 1; if (self.refcount == 0) { defer alloc.free(self.bytes); @@ -30,7 +31,6 @@ pub const Messages = std.TailQueue(*Message); var alloc: Allocator = undefined; const Queues = std.AutoArrayHashMap(usize, Messages); -var unjoined_queues: Queues = undefined; const Processes = std.AutoArrayHashMap(usize, Queues); var joined: Processes = undefined; @@ -55,24 +55,30 @@ pub fn leave(pid: usize, id: usize) void { } // The channel takes ownership of `bytes`. -pub fn pass(channel: usize, bytes: []const u8) !void { - const entry = try unjoined_queues.getOrPut(channel); - if (!entry.found_existing) { - initQueue(entry.value_ptr); - } +pub fn pass(id: usize, bytes: []const u8) !void { + const message = try alloc.create(Message); + defer message.dropReference(); - const node = try alloc.create(Messages.Node); - node.data = try alloc.create(Message); - node.data.* = .{ .bytes = bytes }; - entry.value_ptr.append(node); + message.* = .{ .bytes = bytes }; + + var it = joined.iterator(); + while (it.next()) |queues| { + if (queues.value_ptr.getPtr(id)) |messages| { + try message.addReference(); + errdefer message.dropReference(); + + try enqueue(messages, message); + } + } } -pub fn receive(channel: usize, buffer: []u8) !usize { - const messages = unjoined_queues.getPtr(channel) orelse return Error.WouldBlock; +pub fn receive(pid: usize, id: usize, buffer: []u8) !usize { + const queues = joined.getPtr(pid) orelse return Error.NotJoined; + const messages = queues.getPtr(id) orelse return Error.NotJoined; const message = messages.popFirst() orelse return Error.WouldBlock; defer alloc.destroy(message); - defer message.data.deinit(); + defer message.data.dropReference(); const len = @min(buffer.len, message.data.bytes.len); @memcpy(buffer[0..len], message.data.bytes[0..len]); @@ -97,12 +103,17 @@ fn freeQueues(queues: *Queues) void { fn freeMessages(messages: *Messages) void { while (messages.popFirst()) |message| { - message.data.deinit(); + message.data.dropReference(); } } +fn enqueue(messages: *Messages, message: *Message) !void { + const node = try alloc.create(Messages.Node); + node.data = message; + messages.append(node); +} + pub fn init(with_allocator: Allocator) void { - unjoined_queues = Queues.init(with_allocator); joined = Processes.init(with_allocator); alloc = with_allocator; } |