aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHimbeer <himbeer@disroot.org>2024-07-30 20:27:51 +0200
committerHimbeer <himbeer@disroot.org>2024-07-30 20:28:49 +0200
commit6c9cad1f566f6a8716fa94fb84668bf9d6c62a2a (patch)
treefdb99def34905af988efe615b8d8b62983e69159
parent48d6fa3e80193a3cc735f5e0b0390a18a7bf5a83 (diff)
channel: Implement and fully switch to joined channel receiving
Unjoined receiving is no longer possible due to complications with whether the unjoined copy should be popped when a joined process receives it. Closes #56.
-rw-r--r--src/lib/channel.zig49
-rw-r--r--src/lib/syscall.zig10
2 files changed, 35 insertions, 24 deletions
diff --git a/src/lib/channel.zig b/src/lib/channel.zig
index 30d8a5a..7687afa 100644
--- a/src/lib/channel.zig
+++ b/src/lib/channel.zig
@@ -6,18 +6,19 @@ const std = @import("std");
const Allocator = std.mem.Allocator;
pub const Error = error{
+ NotJoined,
WouldBlock,
};
pub const Message = struct {
bytes: []const u8,
- refcount: usize = 0,
+ refcount: usize = 1,
- fn clone(self: *Message) !void {
- self.refcount = try std.math.add(self.refcount, 1);
+ fn addReference(self: *Message) !void {
+ self.refcount = try std.math.add(usize, self.refcount, 1);
}
- fn deinit(self: *Message) void {
+ fn dropReference(self: *Message) void {
self.refcount -= 1;
if (self.refcount == 0) {
defer alloc.free(self.bytes);
@@ -30,7 +31,6 @@ pub const Messages = std.TailQueue(*Message);
var alloc: Allocator = undefined;
const Queues = std.AutoArrayHashMap(usize, Messages);
-var unjoined_queues: Queues = undefined;
const Processes = std.AutoArrayHashMap(usize, Queues);
var joined: Processes = undefined;
@@ -55,24 +55,30 @@ pub fn leave(pid: usize, id: usize) void {
}
// The channel takes ownership of `bytes`.
-pub fn pass(channel: usize, bytes: []const u8) !void {
- const entry = try unjoined_queues.getOrPut(channel);
- if (!entry.found_existing) {
- initQueue(entry.value_ptr);
- }
+pub fn pass(id: usize, bytes: []const u8) !void {
+ const message = try alloc.create(Message);
+ defer message.dropReference();
- const node = try alloc.create(Messages.Node);
- node.data = try alloc.create(Message);
- node.data.* = .{ .bytes = bytes };
- entry.value_ptr.append(node);
+ message.* = .{ .bytes = bytes };
+
+ var it = joined.iterator();
+ while (it.next()) |queues| {
+ if (queues.value_ptr.getPtr(id)) |messages| {
+ try message.addReference();
+ errdefer message.dropReference();
+
+ try enqueue(messages, message);
+ }
+ }
}
-pub fn receive(channel: usize, buffer: []u8) !usize {
- const messages = unjoined_queues.getPtr(channel) orelse return Error.WouldBlock;
+pub fn receive(pid: usize, id: usize, buffer: []u8) !usize {
+ const queues = joined.getPtr(pid) orelse return Error.NotJoined;
+ const messages = queues.getPtr(id) orelse return Error.NotJoined;
const message = messages.popFirst() orelse return Error.WouldBlock;
defer alloc.destroy(message);
- defer message.data.deinit();
+ defer message.data.dropReference();
const len = @min(buffer.len, message.data.bytes.len);
@memcpy(buffer[0..len], message.data.bytes[0..len]);
@@ -97,12 +103,17 @@ fn freeQueues(queues: *Queues) void {
fn freeMessages(messages: *Messages) void {
while (messages.popFirst()) |message| {
- message.data.deinit();
+ message.data.dropReference();
}
}
+fn enqueue(messages: *Messages, message: *Message) !void {
+ const node = try alloc.create(Messages.Node);
+ node.data = message;
+ messages.append(node);
+}
+
pub fn init(with_allocator: Allocator) void {
- unjoined_queues = Queues.init(with_allocator);
joined = Processes.init(with_allocator);
alloc = with_allocator;
}
diff --git a/src/lib/syscall.zig b/src/lib/syscall.zig
index 97dc811..43db5a5 100644
--- a/src/lib/syscall.zig
+++ b/src/lib/syscall.zig
@@ -35,7 +35,7 @@ pub fn handler(proc: *process.Info, trap_frame: *TrapFrame) !void {
100009 => trap_frame.setReturnValue(join(proc, trap_frame)),
100010 => trap_frame.setReturnValue(leave(proc, trap_frame)),
100011 => trap_frame.setReturnValue(pass(trap_frame)),
- 100012 => trap_frame.setReturnValue(receive(trap_frame)),
+ 100012 => trap_frame.setReturnValue(receive(proc, trap_frame)),
else => return HandleError.UnknownSyscall,
}
}
@@ -174,13 +174,13 @@ fn devicesByKind(trap_frame: *const TrapFrame) !usize {
}
// join(channel_id: usize) !void
-fn join(proc: *process.Info, trap_frame: *const TrapFrame) !void {
+fn join(proc: *const process.Info, trap_frame: *const TrapFrame) !void {
const id = trap_frame.general_purpose_registers[10];
return channel.join(proc.id, id);
}
// leave(channel_id: usize) void
-fn leave(proc: *process.Info, trap_frame: *const TrapFrame) void {
+fn leave(proc: *const process.Info, trap_frame: *const TrapFrame) void {
const id = trap_frame.general_purpose_registers[10];
channel.leave(proc.id, id);
}
@@ -198,11 +198,11 @@ fn pass(trap_frame: *const TrapFrame) !void {
}
// receive(channel_id: usize, buffer: [*]u8, len: usize) !usize
-fn receive(trap_frame: *const TrapFrame) !usize {
+fn receive(proc: *const process.Info, trap_frame: *const TrapFrame) !usize {
const id = trap_frame.general_purpose_registers[10];
const buffer_ptr: [*]u8 = @ptrFromInt(trap_frame.general_purpose_registers[11]);
const len = trap_frame.general_purpose_registers[12];
const buffer = buffer_ptr[0..len];
- return channel.receive(id, buffer);
+ return channel.receive(proc.id, id, buffer);
}