diff options
author | Himbeer <himbeer@disroot.org> | 2024-12-07 11:21:06 +0100 |
---|---|---|
committer | Himbeer <himbeer@disroot.org> | 2024-12-07 11:24:27 +0100 |
commit | aee7c77cb835299f7e0a96bc971a182619f91296 (patch) | |
tree | be362af900bf5a3954f1684c4b65233716b61685 | |
parent | 162faad9f636263cc27413358af18de725db66d9 (diff) |
Implement modchannel plugin API between proxy and servers (per-client)
-rw-r--r-- | connect.go | 2 | ||||
-rw-r--r-- | plugin_modchan.go | 115 | ||||
-rw-r--r-- | process.go | 37 | ||||
-rw-r--r-- | server_conn.go | 5 |
4 files changed, 159 insertions, 0 deletions
@@ -38,6 +38,8 @@ func connect(conn net.Conn, name string, cc *ClientConn) *ServerConn { sounds: make(map[mt.SoundID]struct{}), huds: make(map[mt.HUDID]mt.HUDType), playerList: make(map[string]struct{}), + modChanJoinChs: make(map[string]map[chan bool]struct{}), + modChanLeaveChs: make(map[string]map[chan bool]struct{}), } sc.Log("->", "connect") diff --git a/plugin_modchan.go b/plugin_modchan.go index 446a37f..1b661cd 100644 --- a/plugin_modchan.go +++ b/plugin_modchan.go @@ -18,6 +18,12 @@ var ( ) // SendModChanMsg sends a message to all subscribed clients on a modchannel. +var ( + onSrvModChanMsg []func(*ClientConn, string, string, string) bool + onSrvModChanMsgMu sync.RWMutex + onSrvModChanMsgOnce sync.Once +) + func SendModChanMsg(channel, msg string) { modChanSubscriberMu.RLock() defer modChanSubscriberMu.RUnlock() @@ -31,6 +37,90 @@ func SendModChanMsg(channel, msg string) { } } +// JoinModChan attempts to subscribe to a modchannel, returning a channel +// that yields a boolean indicating success. +// The proxy will try to rejoin the channel after switching servers, +// but this is not guaranteed to succeed and errors will not be reported +// to the caller. +// This condition can be checked against using the IsModChanJoined method. +// This method may block indefinitely if the client switches servers +// before a response is received. If this cannot be controlled, +// using a select statement with a timeout is recommended. +func (cc *ClientConn) JoinModChan(channel string) <-chan bool { + failCh := make(chan bool) + failCh <- false + + sc := cc.server() + if sc == nil { + return failCh + } + + successCh := make(chan bool) + + sc.modChanJoinChMu.Lock() + defer sc.modChanJoinChMu.Unlock() + + if sc.modChanJoinChs[channel] == nil { + sc.modChanJoinChs[channel] = make(map[chan bool]struct{}) + } + sc.modChanJoinChs[channel][successCh] = struct{}{} + + sc.SendCmd(&mt.ToSrvJoinModChan{Channel: channel}) + return successCh +} + +// LeaveModChan attempts to unscribe from a modchannel, returning a channel +// yielding a boolean indicating success. +func (cc *ClientConn) LeaveModChan(channel string) <-chan bool { + failCh := make(chan bool) + failCh <- false + + sc := cc.server() + if sc == nil { + return failCh + } + + successCh := make(chan bool) + + sc.modChanLeaveChMu.Lock() + defer sc.modChanLeaveChMu.Unlock() + + if sc.modChanLeaveChs[channel] == nil { + sc.modChanLeaveChs[channel] = make(map[chan bool]struct{}) + } + sc.modChanLeaveChs[channel][successCh] = struct{}{} + + sc.SendCmd(&mt.ToSrvLeaveModChan{Channel: channel}) + return successCh +} + +// IsModChanJoined returns whether this client is currently subscribed +// to the specified modchannel. This is mainly useful for tracking success +// across server hops. +func (cc *ClientConn) IsModChanJoined(channel string) bool { + cc.modChsMu.RLock() + defer cc.modChsMu.RUnlock() + + _, ok := cc.modChs[channel] + return ok +} + +// SendModChanMsg sends a message to the current upstream server +// and all subscribed clients connected to it on a modchannel. +func (cc *ClientConn) SendModChanMsg(channel, msg string) bool { + sc := cc.server() + if sc == nil { + return false + } + + sc.SendCmd(&mt.ToSrvMsgModChan{ + Channel: channel, + Msg: msg, + }) + + return true +} + // RegisterOnCltModChanMsg registers a handler that is called // when a client sends a message on a modchannel. // If any handler returns true, the message is not forwarded @@ -42,6 +132,17 @@ func RegisterOnCltModChanMsg(handler func(string, *ClientConn, string) bool) { onCltModChanMsg = append(onCltModChanMsg, handler) } +// RegisterOnSrvModChanMsg registers a handler that is called +// when another client of the current upstream server or a server mod +// sends a message on a modchannel. +// If any handler returns true, the message is not forwarded to the client. +func RegisterOnSrvModChanMsg(handler func(*ClientConn, string, string, string) bool) { + onSrvModChanMsgMu.Lock() + defer onSrvModChanMsgMu.Unlock() + + onSrvModChanMsg = append(onSrvModChanMsg, handler) +} + func cltLeaveModChan(cc *ClientConn, channel string) { modChanSubscriberMu.Lock() defer modChanSubscriberMu.Unlock() @@ -91,6 +192,20 @@ func handleCltModChanMsg(cc *ClientConn, cmd *mt.ToSrvMsgModChan) bool { return drop } +func handleSrvModChanMsg(cc *ClientConn, cmd *mt.ToCltModChanMsg) bool { + onSrvModChanMsgMu.RLock() + defer onSrvModChanMsgMu.RUnlock() + + drop := false + for _, handler := range onSrvModChanMsg { + if handler(cc, cmd.Channel, cmd.Sender, cmd.Msg) { + drop = true + } + } + + return drop +} + func init() { modChanSubscribers = make(map[string][]*ClientConn) } @@ -902,17 +902,54 @@ func (sc *ServerConn) process(pkt mt.Pkt) { } sc.prependInv(cmd.Changed[k].Inv) } + case *mt.ToCltModChanMsg: + if handleSrvModChanMsg(clt, cmd) { + return + } case *mt.ToCltModChanSig: + reportStatus := func(ch chan bool, status bool) { + ch <- status + delete(sc.modChanJoinChs[cmd.Channel], ch) + } + switch cmd.Signal { case mt.JoinOK: + sc.modChanJoinChMu.Lock() + defer sc.modChanJoinChMu.Unlock() + + for ch := range sc.modChanJoinChs[cmd.Channel] { + go reportStatus(ch, true) + } + if _, ok := clt.modChs[cmd.Channel]; ok { return } clt.modChs[cmd.Channel] = struct{}{} case mt.JoinFail: + sc.modChanJoinChMu.Lock() + defer sc.modChanJoinChMu.Unlock() + + for ch := range sc.modChanJoinChs[cmd.Channel] { + go reportStatus(ch, false) + } + fallthrough case mt.LeaveOK: + sc.modChanLeaveChMu.Lock() + defer sc.modChanLeaveChMu.Unlock() + + for ch := range sc.modChanLeaveChs[cmd.Channel] { + go reportStatus(ch, true) + } + delete(clt.modChs, cmd.Channel) + case mt.LeaveFail: + sc.modChanLeaveChMu.Lock() + defer sc.modChanLeaveChMu.Unlock() + + for ch := range sc.modChanLeaveChs[cmd.Channel] { + go reportStatus(ch, false) + } } } diff --git a/server_conn.go b/server_conn.go index f451cd5..34ab129 100644 --- a/server_conn.go +++ b/server_conn.go @@ -46,6 +46,11 @@ type ServerConn struct { huds map[mt.HUDID]mt.HUDType playerList map[string]struct{} + + modChanJoinChs map[string]map[chan bool]struct{} + modChanJoinChMu sync.Mutex + modChanLeaveChs map[string]map[chan bool]struct{} + modChanLeaveChMu sync.Mutex } func (sc *ServerConn) client() *ClientConn { |