aboutsummaryrefslogtreecommitdiff
path: root/src/channel.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel.zig')
-rw-r--r--src/channel.zig23
1 files changed, 17 insertions, 6 deletions
diff --git a/src/channel.zig b/src/channel.zig
index 6367463..b3084b8 100644
--- a/src/channel.zig
+++ b/src/channel.zig
@@ -14,6 +14,7 @@ pub const Message = struct {
bytes: []const u8,
refcount: usize = 1,
process_filter: u16,
+ sender: u16,
fn addReference(self: *Message) !void {
self.refcount = try std.math.add(usize, self.refcount, 1);
@@ -33,10 +34,10 @@ var alloc: Allocator = undefined;
const Queues = std.AutoArrayHashMap(usize, Messages);
-const Processes = std.AutoArrayHashMap(usize, Queues);
+const Processes = std.AutoArrayHashMap(u16, Queues);
var joined: Processes = undefined;
-pub fn join(pid: usize, id: usize) !void {
+pub fn join(pid: u16, id: usize) !void {
const queues = try joined.getOrPut(pid);
if (!queues.found_existing) {
initProcess(queues.value_ptr);
@@ -48,7 +49,7 @@ pub fn join(pid: usize, id: usize) !void {
}
}
-pub fn leave(pid: usize, id: usize) void {
+pub fn leave(pid: u16, id: usize) void {
const queues = joined.getPtr(pid) orelse return;
freeQueues(queues);
queues.clearAndFree();
@@ -56,11 +57,15 @@ pub fn leave(pid: usize, id: usize) void {
}
// The channel takes ownership of `bytes`.
-pub fn pass(id: usize, receiver: u16, bytes: []const u8) !void {
+pub fn pass(pid: u16, id: usize, receiver: u16, bytes: []const u8) !void {
const message = try alloc.create(Message);
defer message.dropReference();
- message.* = .{ .bytes = bytes, .process_filter = receiver };
+ message.* = .{
+ .bytes = bytes,
+ .process_filter = receiver,
+ .sender = pid,
+ };
var it = joined.iterator();
while (it.next()) |queues| {
@@ -73,7 +78,7 @@ pub fn pass(id: usize, receiver: u16, bytes: []const u8) !void {
}
}
-pub fn receive(pid: usize, id: usize, buffer: []u8) !usize {
+pub fn receive(pid: u16, id: usize, sender: ?*u16, buffer: []u8) !usize {
const queues = joined.getPtr(pid) orelse return Error.NotJoined;
const messages = queues.getPtr(id) orelse return Error.NotJoined;
const message = messages.popFirst() orelse return Error.WouldBlock;
@@ -81,6 +86,12 @@ pub fn receive(pid: usize, id: usize, buffer: []u8) !usize {
defer alloc.destroy(message);
defer message.data.dropReference();
+ if (message.data.process_filter != pid and message.data.process_filter != 0) {
+ return Error.WouldBlock;
+ }
+
+ if (sender) |sender_id| sender_id.* = message.data.sender;
+
const len = @min(buffer.len, message.data.bytes.len);
@memcpy(buffer[0..len], message.data.bytes[0..len]);