summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoranon5 <anon5clam@protonmail.com>2021-02-28 18:54:41 +0000
committeranon5 <anon5clam@protonmail.com>2021-02-28 18:54:41 +0000
commit7d69943c17614bcf6733ff0b3206374d272e274b (patch)
tree29301ea544edf7b37442f79e0a6085a28d0cf7e3
parent27c0776cb2f1084356df5c9f6080fecf20f8fddf (diff)
rudp: fix errors returned by Peer.Recv other than net.ErrClosed when the Peer is closed
-rw-r--r--rudp/listen.go6
-rw-r--r--rudp/peer.go6
-rw-r--r--rudp/process.go8
-rw-r--r--rudp/proxy/proxy.go5
-rw-r--r--rudp/send.go3
5 files changed, 21 insertions, 7 deletions
diff --git a/rudp/listen.go b/rudp/listen.go
index 2eda819..2d702c4 100644
--- a/rudp/listen.go
+++ b/rudp/listen.go
@@ -115,6 +115,9 @@ func (l *Listener) processNetPkt(pkt netPkt) error {
data[1] = uint8(ctlSetPeerID)
binary.BigEndian.PutUint16(data[2:4], uint16(clt.ID()))
if _, err := clt.sendRaw(rawPkt{Data: data}); err != nil {
+ if errors.Is(err, net.ErrClosed) {
+ return nil
+ }
return fmt.Errorf("can't set client peer id: %w", err)
}
@@ -145,7 +148,8 @@ func (l *Listener) processNetPkt(pkt netPkt) error {
select {
case clt.pkts <- pkt:
default:
- return fmt.Errorf("ignoring net pkt from %s because buf is full", addrstr)
+ // It's OK to drop packets if the buffer is full
+ // because MT RUDP can cope with packet loss.
}
}
diff --git a/rudp/peer.go b/rudp/peer.go
index 9dca93d..4d8df47 100644
--- a/rudp/peer.go
+++ b/rudp/peer.go
@@ -1,6 +1,7 @@
package rudp
import (
+ "errors"
"fmt"
"net"
"sync"
@@ -167,6 +168,9 @@ func (p *Peer) sendPings(ping <-chan time.Time) {
select {
case <-ping:
if _, err := p.sendRaw(pkt); err != nil {
+ if errors.Is(err, net.ErrClosed) {
+ return
+ }
p.errs <- fmt.Errorf("can't send ping: %w", err)
}
case <-p.Disco():
@@ -176,7 +180,7 @@ func (p *Peer) sendPings(ping <-chan time.Time) {
}
// Connect connects to the server on conn
-// and closes conn when the Peer disconnects.
+// and closes conn when the returned *Peer disconnects.
func Connect(conn net.PacketConn, addr net.Addr) *Peer {
srv := newPeer(conn, addr, PeerIDSrv, PeerIDNil)
diff --git a/rudp/process.go b/rudp/process.go
index 70c04a2..c85aba4 100644
--- a/rudp/process.go
+++ b/rudp/process.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
+ "net"
)
// A PktError is an error that occured while processing a packet.
@@ -140,9 +141,7 @@ func (p *Peer) processRawPkt(pkt rawPkt) (err error) {
case ctlDisco:
defer errWrap("disco: %w")
- if err := p.Close(); err != nil {
- return fmt.Errorf("can't close: %w", err)
- }
+ p.Close()
if len(pkt.Data) > 1+1 {
return TrailingDataError(pkt.Data[1+1:])
@@ -226,6 +225,9 @@ func (p *Peer) processRawPkt(pkt rawPkt) (err error) {
Unrel: true,
}
if _, err := p.sendRaw(ack); err != nil {
+ if errors.Is(err, net.ErrClosed) {
+ return nil
+ }
return fmt.Errorf("can't ack %d: %w", sn, err)
}
diff --git a/rudp/proxy/proxy.go b/rudp/proxy/proxy.go
index 833260b..a80b448 100644
--- a/rudp/proxy/proxy.go
+++ b/rudp/proxy/proxy.go
@@ -10,12 +10,13 @@ and listen:port is the address to listen on.
package main
import (
+ "errors"
"fmt"
"log"
"net"
"os"
- "mt/rudp"
+ "github.com/anon55555/mt/rudp"
)
func main() {
@@ -61,7 +62,7 @@ func proxy(src, dest *rudp.Peer) {
for {
pkt, err := src.Recv()
if err != nil {
- if err == net.ErrClosed {
+ if errors.Is(err, net.ErrClosed) {
msg := src.Addr().String() + " disconnected"
if src.TimedOut() {
msg += " (timed out)"
diff --git a/rudp/send.go b/rudp/send.go
index 2615c59..ce3f013 100644
--- a/rudp/send.go
+++ b/rudp/send.go
@@ -205,6 +205,9 @@ func (p *Peer) sendRel(pkt rawPkt) (ack <-chan struct{}, err error) {
select {
case <-time.After(500 * time.Millisecond):
if _, err := p.sendRaw(relpkt); err != nil {
+ if errors.Is(err, net.ErrClosed) {
+ return
+ }
p.errs <- fmt.Errorf("failed to re-send timed out reliable seqnum: %d: %w", sn, err)
}
case <-ack: