aboutsummaryrefslogtreecommitdiff
path: root/src/channel.zig
blob: 6367463fb846ab18de1413c8841b555b74710970 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// SPDX-FileCopyrightText: 2024 Himbeer <himbeer@disroot.org>
//
// SPDX-License-Identifier: AGPL-3.0-or-later

const std = @import("std");
const Allocator = std.mem.Allocator;

pub const Error = error{
    NotJoined,
    WouldBlock,
};

pub const Message = struct {
    bytes: []const u8,
    refcount: usize = 1,
    process_filter: u16,

    fn addReference(self: *Message) !void {
        self.refcount = try std.math.add(usize, self.refcount, 1);
    }

    fn dropReference(self: *Message) void {
        self.refcount -= 1;
        if (self.refcount == 0) {
            defer alloc.free(self.bytes);
            defer alloc.destroy(self);
        }
    }
};

pub const Messages = std.TailQueue(*Message);
var alloc: Allocator = undefined;

const Queues = std.AutoArrayHashMap(usize, Messages);

const Processes = std.AutoArrayHashMap(usize, Queues);
var joined: Processes = undefined;

pub fn join(pid: usize, id: usize) !void {
    const queues = try joined.getOrPut(pid);
    if (!queues.found_existing) {
        initProcess(queues.value_ptr);
    }

    const messages = try queues.value_ptr.getOrPut(id);
    if (!messages.found_existing) {
        initQueue(messages.value_ptr);
    }
}

pub fn leave(pid: usize, id: usize) void {
    const queues = joined.getPtr(pid) orelse return;
    freeQueues(queues);
    queues.clearAndFree();
    _ = queues.swapRemove(id);
}

// The channel takes ownership of `bytes`.
pub fn pass(id: usize, receiver: u16, bytes: []const u8) !void {
    const message = try alloc.create(Message);
    defer message.dropReference();

    message.* = .{ .bytes = bytes, .process_filter = receiver };

    var it = joined.iterator();
    while (it.next()) |queues| {
        if (queues.value_ptr.getPtr(id)) |messages| {
            try message.addReference();
            errdefer message.dropReference();

            try enqueue(messages, message);
        }
    }
}

pub fn receive(pid: usize, id: usize, buffer: []u8) !usize {
    const queues = joined.getPtr(pid) orelse return Error.NotJoined;
    const messages = queues.getPtr(id) orelse return Error.NotJoined;
    const message = messages.popFirst() orelse return Error.WouldBlock;

    defer alloc.destroy(message);
    defer message.data.dropReference();

    const len = @min(buffer.len, message.data.bytes.len);
    @memcpy(buffer[0..len], message.data.bytes[0..len]);

    return len;
}

fn initQueue(messages: *Messages) void {
    messages.* = .{};
}

fn initProcess(queues: *Queues) void {
    queues.* = Queues.init(alloc);
}

fn freeQueues(queues: *Queues) void {
    var it = queues.iterator();
    while (it.next()) |messages| {
        freeMessages(messages.value_ptr);
    }
}

fn freeMessages(messages: *Messages) void {
    while (messages.popFirst()) |message| {
        message.data.dropReference();
    }
}

fn enqueue(messages: *Messages, message: *Message) !void {
    const node = try alloc.create(Messages.Node);
    node.data = message;
    messages.append(node);
}

pub fn init(with_allocator: Allocator) void {
    joined = Processes.init(with_allocator);
    alloc = with_allocator;
}

pub fn allocator() Allocator {
    return alloc;
}