diff options
Diffstat (limited to 'rudp/peer.go')
-rw-r--r-- | rudp/peer.go | 82 |
1 files changed, 46 insertions, 36 deletions
diff --git a/rudp/peer.go b/rudp/peer.go index 4d8df47..791249c 100644 --- a/rudp/peer.go +++ b/rudp/peer.go @@ -20,8 +20,9 @@ const ( // A Peer is a connection to a client or server. type Peer struct { - conn net.PacketConn + pc net.PacketConn addr net.Addr + conn net.Conn disco chan struct{} // close-only @@ -29,7 +30,7 @@ type Peer struct { pkts chan Pkt errs chan error // don't close - timedout chan struct{} // close-only + timedOut chan struct{} // close-only chans [ChannelCount]pktchan // read/write @@ -39,24 +40,8 @@ type Peer struct { ping *time.Ticker } -type pktchan struct { - // Only accessed by Peer.processRawPkt. - insplit map[seqnum][][]byte - inrel map[seqnum][]byte - inrelsn seqnum - - ackchans sync.Map // map[seqnum]chan struct{} - - outsplitmu sync.Mutex - outsplitsn seqnum - - outrelmu sync.Mutex - outrelsn seqnum - outrelwin seqnum -} - // Conn returns the net.PacketConn used to communicate with the Peer. -func (p *Peer) Conn() net.PacketConn { return p.conn } +func (p *Peer) Conn() net.PacketConn { return p.pc } // Addr returns the address of the Peer. func (p *Peer) Addr() net.Addr { return p.addr } @@ -75,13 +60,34 @@ func (p *Peer) IsSrv() bool { // TimedOut reports whether the Peer has timed out. func (p *Peer) TimedOut() bool { select { - case <-p.timedout: + case <-p.timedOut: return true default: return false } } +type inSplit struct { + chunks [][]byte + size, got int +} + +type pktchan struct { + // Only accessed by Peer.processRawPkt. + inSplit *[65536]*inSplit + inRel *[65536][]byte + inRelSN seqnum + + ackChans sync.Map // map[seqnum]chan struct{} + + outSplitMu sync.Mutex + outSplitSN seqnum + + outRelMu sync.Mutex + outRelSN seqnum + outRelWin seqnum +} + // Recv recieves a packet from the Peer. // You should keep calling this until it returns net.ErrClosed // so it doesn't leak a goroutine. @@ -123,9 +129,9 @@ func (p *Peer) Close() error { return nil } -func newPeer(conn net.PacketConn, addr net.Addr, id, idOfPeer PeerID) *Peer { +func newPeer(pc net.PacketConn, addr net.Addr, id, idOfPeer PeerID) *Peer { p := &Peer{ - conn: conn, + pc: pc, addr: addr, id: id, idOfPeer: idOfPeer, @@ -135,21 +141,25 @@ func newPeer(conn net.PacketConn, addr net.Addr, id, idOfPeer PeerID) *Peer { errs: make(chan error), } + if conn, ok := pc.(net.Conn); ok && conn.RemoteAddr() != nil { + p.conn = conn + } + for i := range p.chans { p.chans[i] = pktchan{ - insplit: make(map[seqnum][][]byte), - inrel: make(map[seqnum][]byte), - inrelsn: seqnumInit, + inSplit: new([65536]*inSplit), + inRel: new([65536][]byte), + inRelSN: seqnumInit, - outsplitsn: seqnumInit, - outrelsn: seqnumInit, - outrelwin: seqnumInit, + outSplitSN: seqnumInit, + outRelSN: seqnumInit, + outRelWin: seqnumInit, } } - p.timedout = make(chan struct{}) + p.timedOut = make(chan struct{}) p.timeout = time.AfterFunc(ConnTimeout, func() { - close(p.timedout) + close(p.timedOut) p.SendDisco(0, true) p.Close() @@ -179,18 +189,18 @@ func (p *Peer) sendPings(ping <-chan time.Time) { } } -// Connect connects to the server on conn -// and closes conn when the returned *Peer disconnects. -func Connect(conn net.PacketConn, addr net.Addr) *Peer { - srv := newPeer(conn, addr, PeerIDSrv, PeerIDNil) +// Connect connects to addr using pc +// and closes pc when the returned *Peer disconnects. +func Connect(pc net.PacketConn, addr net.Addr) *Peer { + srv := newPeer(pc, addr, PeerIDSrv, PeerIDNil) pkts := make(chan netPkt) - go readNetPkts(conn, pkts, srv.errs) + go readNetPkts(pc, pkts, srv.errs) go srv.processNetPkts(pkts) go func() { <-srv.Disco() - conn.Close() + pc.Close() }() return srv |