summaryrefslogtreecommitdiff
path: root/rudp/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'rudp/peer.go')
-rw-r--r--rudp/peer.go82
1 files changed, 46 insertions, 36 deletions
diff --git a/rudp/peer.go b/rudp/peer.go
index 4d8df47..791249c 100644
--- a/rudp/peer.go
+++ b/rudp/peer.go
@@ -20,8 +20,9 @@ const (
// A Peer is a connection to a client or server.
type Peer struct {
- conn net.PacketConn
+ pc net.PacketConn
addr net.Addr
+ conn net.Conn
disco chan struct{} // close-only
@@ -29,7 +30,7 @@ type Peer struct {
pkts chan Pkt
errs chan error // don't close
- timedout chan struct{} // close-only
+ timedOut chan struct{} // close-only
chans [ChannelCount]pktchan // read/write
@@ -39,24 +40,8 @@ type Peer struct {
ping *time.Ticker
}
-type pktchan struct {
- // Only accessed by Peer.processRawPkt.
- insplit map[seqnum][][]byte
- inrel map[seqnum][]byte
- inrelsn seqnum
-
- ackchans sync.Map // map[seqnum]chan struct{}
-
- outsplitmu sync.Mutex
- outsplitsn seqnum
-
- outrelmu sync.Mutex
- outrelsn seqnum
- outrelwin seqnum
-}
-
// Conn returns the net.PacketConn used to communicate with the Peer.
-func (p *Peer) Conn() net.PacketConn { return p.conn }
+func (p *Peer) Conn() net.PacketConn { return p.pc }
// Addr returns the address of the Peer.
func (p *Peer) Addr() net.Addr { return p.addr }
@@ -75,13 +60,34 @@ func (p *Peer) IsSrv() bool {
// TimedOut reports whether the Peer has timed out.
func (p *Peer) TimedOut() bool {
select {
- case <-p.timedout:
+ case <-p.timedOut:
return true
default:
return false
}
}
+type inSplit struct {
+ chunks [][]byte
+ size, got int
+}
+
+type pktchan struct {
+ // Only accessed by Peer.processRawPkt.
+ inSplit *[65536]*inSplit
+ inRel *[65536][]byte
+ inRelSN seqnum
+
+ ackChans sync.Map // map[seqnum]chan struct{}
+
+ outSplitMu sync.Mutex
+ outSplitSN seqnum
+
+ outRelMu sync.Mutex
+ outRelSN seqnum
+ outRelWin seqnum
+}
+
// Recv recieves a packet from the Peer.
// You should keep calling this until it returns net.ErrClosed
// so it doesn't leak a goroutine.
@@ -123,9 +129,9 @@ func (p *Peer) Close() error {
return nil
}
-func newPeer(conn net.PacketConn, addr net.Addr, id, idOfPeer PeerID) *Peer {
+func newPeer(pc net.PacketConn, addr net.Addr, id, idOfPeer PeerID) *Peer {
p := &Peer{
- conn: conn,
+ pc: pc,
addr: addr,
id: id,
idOfPeer: idOfPeer,
@@ -135,21 +141,25 @@ func newPeer(conn net.PacketConn, addr net.Addr, id, idOfPeer PeerID) *Peer {
errs: make(chan error),
}
+ if conn, ok := pc.(net.Conn); ok && conn.RemoteAddr() != nil {
+ p.conn = conn
+ }
+
for i := range p.chans {
p.chans[i] = pktchan{
- insplit: make(map[seqnum][][]byte),
- inrel: make(map[seqnum][]byte),
- inrelsn: seqnumInit,
+ inSplit: new([65536]*inSplit),
+ inRel: new([65536][]byte),
+ inRelSN: seqnumInit,
- outsplitsn: seqnumInit,
- outrelsn: seqnumInit,
- outrelwin: seqnumInit,
+ outSplitSN: seqnumInit,
+ outRelSN: seqnumInit,
+ outRelWin: seqnumInit,
}
}
- p.timedout = make(chan struct{})
+ p.timedOut = make(chan struct{})
p.timeout = time.AfterFunc(ConnTimeout, func() {
- close(p.timedout)
+ close(p.timedOut)
p.SendDisco(0, true)
p.Close()
@@ -179,18 +189,18 @@ func (p *Peer) sendPings(ping <-chan time.Time) {
}
}
-// Connect connects to the server on conn
-// and closes conn when the returned *Peer disconnects.
-func Connect(conn net.PacketConn, addr net.Addr) *Peer {
- srv := newPeer(conn, addr, PeerIDSrv, PeerIDNil)
+// Connect connects to addr using pc
+// and closes pc when the returned *Peer disconnects.
+func Connect(pc net.PacketConn, addr net.Addr) *Peer {
+ srv := newPeer(pc, addr, PeerIDSrv, PeerIDNil)
pkts := make(chan netPkt)
- go readNetPkts(conn, pkts, srv.errs)
+ go readNetPkts(pc, pkts, srv.errs)
go srv.processNetPkts(pkts)
go func() {
<-srv.Disco()
- conn.Close()
+ pc.Close()
}()
return srv