aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHimbeer <himbeer@disroot.org>2024-12-07 11:21:06 +0100
committerHimbeer <himbeer@disroot.org>2024-12-07 11:24:27 +0100
commitaee7c77cb835299f7e0a96bc971a182619f91296 (patch)
treebe362af900bf5a3954f1684c4b65233716b61685
parent162faad9f636263cc27413358af18de725db66d9 (diff)
Implement modchannel plugin API between proxy and servers (per-client)
-rw-r--r--connect.go2
-rw-r--r--plugin_modchan.go115
-rw-r--r--process.go37
-rw-r--r--server_conn.go5
4 files changed, 159 insertions, 0 deletions
diff --git a/connect.go b/connect.go
index 632d203..c772077 100644
--- a/connect.go
+++ b/connect.go
@@ -38,6 +38,8 @@ func connect(conn net.Conn, name string, cc *ClientConn) *ServerConn {
sounds: make(map[mt.SoundID]struct{}),
huds: make(map[mt.HUDID]mt.HUDType),
playerList: make(map[string]struct{}),
+ modChanJoinChs: make(map[string]map[chan bool]struct{}),
+ modChanLeaveChs: make(map[string]map[chan bool]struct{}),
}
sc.Log("->", "connect")
diff --git a/plugin_modchan.go b/plugin_modchan.go
index 446a37f..1b661cd 100644
--- a/plugin_modchan.go
+++ b/plugin_modchan.go
@@ -18,6 +18,12 @@ var (
)
// SendModChanMsg sends a message to all subscribed clients on a modchannel.
+var (
+ onSrvModChanMsg []func(*ClientConn, string, string, string) bool
+ onSrvModChanMsgMu sync.RWMutex
+ onSrvModChanMsgOnce sync.Once
+)
+
func SendModChanMsg(channel, msg string) {
modChanSubscriberMu.RLock()
defer modChanSubscriberMu.RUnlock()
@@ -31,6 +37,90 @@ func SendModChanMsg(channel, msg string) {
}
}
+// JoinModChan attempts to subscribe to a modchannel, returning a channel
+// that yields a boolean indicating success.
+// The proxy will try to rejoin the channel after switching servers,
+// but this is not guaranteed to succeed and errors will not be reported
+// to the caller.
+// This condition can be checked against using the IsModChanJoined method.
+// This method may block indefinitely if the client switches servers
+// before a response is received. If this cannot be controlled,
+// using a select statement with a timeout is recommended.
+func (cc *ClientConn) JoinModChan(channel string) <-chan bool {
+ failCh := make(chan bool)
+ failCh <- false
+
+ sc := cc.server()
+ if sc == nil {
+ return failCh
+ }
+
+ successCh := make(chan bool)
+
+ sc.modChanJoinChMu.Lock()
+ defer sc.modChanJoinChMu.Unlock()
+
+ if sc.modChanJoinChs[channel] == nil {
+ sc.modChanJoinChs[channel] = make(map[chan bool]struct{})
+ }
+ sc.modChanJoinChs[channel][successCh] = struct{}{}
+
+ sc.SendCmd(&mt.ToSrvJoinModChan{Channel: channel})
+ return successCh
+}
+
+// LeaveModChan attempts to unscribe from a modchannel, returning a channel
+// yielding a boolean indicating success.
+func (cc *ClientConn) LeaveModChan(channel string) <-chan bool {
+ failCh := make(chan bool)
+ failCh <- false
+
+ sc := cc.server()
+ if sc == nil {
+ return failCh
+ }
+
+ successCh := make(chan bool)
+
+ sc.modChanLeaveChMu.Lock()
+ defer sc.modChanLeaveChMu.Unlock()
+
+ if sc.modChanLeaveChs[channel] == nil {
+ sc.modChanLeaveChs[channel] = make(map[chan bool]struct{})
+ }
+ sc.modChanLeaveChs[channel][successCh] = struct{}{}
+
+ sc.SendCmd(&mt.ToSrvLeaveModChan{Channel: channel})
+ return successCh
+}
+
+// IsModChanJoined returns whether this client is currently subscribed
+// to the specified modchannel. This is mainly useful for tracking success
+// across server hops.
+func (cc *ClientConn) IsModChanJoined(channel string) bool {
+ cc.modChsMu.RLock()
+ defer cc.modChsMu.RUnlock()
+
+ _, ok := cc.modChs[channel]
+ return ok
+}
+
+// SendModChanMsg sends a message to the current upstream server
+// and all subscribed clients connected to it on a modchannel.
+func (cc *ClientConn) SendModChanMsg(channel, msg string) bool {
+ sc := cc.server()
+ if sc == nil {
+ return false
+ }
+
+ sc.SendCmd(&mt.ToSrvMsgModChan{
+ Channel: channel,
+ Msg: msg,
+ })
+
+ return true
+}
+
// RegisterOnCltModChanMsg registers a handler that is called
// when a client sends a message on a modchannel.
// If any handler returns true, the message is not forwarded
@@ -42,6 +132,17 @@ func RegisterOnCltModChanMsg(handler func(string, *ClientConn, string) bool) {
onCltModChanMsg = append(onCltModChanMsg, handler)
}
+// RegisterOnSrvModChanMsg registers a handler that is called
+// when another client of the current upstream server or a server mod
+// sends a message on a modchannel.
+// If any handler returns true, the message is not forwarded to the client.
+func RegisterOnSrvModChanMsg(handler func(*ClientConn, string, string, string) bool) {
+ onSrvModChanMsgMu.Lock()
+ defer onSrvModChanMsgMu.Unlock()
+
+ onSrvModChanMsg = append(onSrvModChanMsg, handler)
+}
+
func cltLeaveModChan(cc *ClientConn, channel string) {
modChanSubscriberMu.Lock()
defer modChanSubscriberMu.Unlock()
@@ -91,6 +192,20 @@ func handleCltModChanMsg(cc *ClientConn, cmd *mt.ToSrvMsgModChan) bool {
return drop
}
+func handleSrvModChanMsg(cc *ClientConn, cmd *mt.ToCltModChanMsg) bool {
+ onSrvModChanMsgMu.RLock()
+ defer onSrvModChanMsgMu.RUnlock()
+
+ drop := false
+ for _, handler := range onSrvModChanMsg {
+ if handler(cc, cmd.Channel, cmd.Sender, cmd.Msg) {
+ drop = true
+ }
+ }
+
+ return drop
+}
+
func init() {
modChanSubscribers = make(map[string][]*ClientConn)
}
diff --git a/process.go b/process.go
index 05b16fe..1d8263f 100644
--- a/process.go
+++ b/process.go
@@ -902,17 +902,54 @@ func (sc *ServerConn) process(pkt mt.Pkt) {
}
sc.prependInv(cmd.Changed[k].Inv)
}
+ case *mt.ToCltModChanMsg:
+ if handleSrvModChanMsg(clt, cmd) {
+ return
+ }
case *mt.ToCltModChanSig:
+ reportStatus := func(ch chan bool, status bool) {
+ ch <- status
+ delete(sc.modChanJoinChs[cmd.Channel], ch)
+ }
+
switch cmd.Signal {
case mt.JoinOK:
+ sc.modChanJoinChMu.Lock()
+ defer sc.modChanJoinChMu.Unlock()
+
+ for ch := range sc.modChanJoinChs[cmd.Channel] {
+ go reportStatus(ch, true)
+ }
+
if _, ok := clt.modChs[cmd.Channel]; ok {
return
}
clt.modChs[cmd.Channel] = struct{}{}
case mt.JoinFail:
+ sc.modChanJoinChMu.Lock()
+ defer sc.modChanJoinChMu.Unlock()
+
+ for ch := range sc.modChanJoinChs[cmd.Channel] {
+ go reportStatus(ch, false)
+ }
+
fallthrough
case mt.LeaveOK:
+ sc.modChanLeaveChMu.Lock()
+ defer sc.modChanLeaveChMu.Unlock()
+
+ for ch := range sc.modChanLeaveChs[cmd.Channel] {
+ go reportStatus(ch, true)
+ }
+
delete(clt.modChs, cmd.Channel)
+ case mt.LeaveFail:
+ sc.modChanLeaveChMu.Lock()
+ defer sc.modChanLeaveChMu.Unlock()
+
+ for ch := range sc.modChanLeaveChs[cmd.Channel] {
+ go reportStatus(ch, false)
+ }
}
}
diff --git a/server_conn.go b/server_conn.go
index f451cd5..34ab129 100644
--- a/server_conn.go
+++ b/server_conn.go
@@ -46,6 +46,11 @@ type ServerConn struct {
huds map[mt.HUDID]mt.HUDType
playerList map[string]struct{}
+
+ modChanJoinChs map[string]map[chan bool]struct{}
+ modChanJoinChMu sync.Mutex
+ modChanLeaveChs map[string]map[chan bool]struct{}
+ modChanLeaveChMu sync.Mutex
}
func (sc *ServerConn) client() *ClientConn {