summaryrefslogtreecommitdiff
path: root/rudp/send.go
diff options
context:
space:
mode:
authoranon5 <anon5clam@protonmail.com>2020-11-07 18:01:24 +0000
committeranon5 <anon5clam@protonmail.com>2020-11-07 18:01:24 +0000
commita865d2bce1aa097273fdb9d0d02d9cfa8460aefd (patch)
tree95ff9b35607b57b330ec579f4b96f84e3bf33349 /rudp/send.go
Initial public release
Diffstat (limited to 'rudp/send.go')
-rw-r--r--rudp/send.go248
1 files changed, 248 insertions, 0 deletions
diff --git a/rudp/send.go b/rudp/send.go
new file mode 100644
index 0000000..3cfcda4
--- /dev/null
+++ b/rudp/send.go
@@ -0,0 +1,248 @@
+package rudp
+
+import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "math"
+ "net"
+ "sync"
+ "time"
+)
+
+const (
+ // protoID + src PeerID + channel number
+ MtHdrSize = 4 + 2 + 1
+
+ // rawTypeOrig
+ OrigHdrSize = 1
+
+ // rawTypeSpilt + seqnum + chunk count + chunk number
+ SplitHdrSize = 1 + 2 + 2 + 2
+
+ // rawTypeRel + seqnum
+ RelHdrSize = 1 + 2
+)
+
+const (
+ MaxNetPktSize = 512
+
+ MaxUnrelRawPktSize = MaxNetPktSize - MtHdrSize
+ MaxRelRawPktSize = MaxUnrelRawPktSize - RelHdrSize
+
+ MaxRelPktSize = (MaxRelRawPktSize - SplitHdrSize) * math.MaxUint16
+ MaxUnrelPktSize = (MaxUnrelRawPktSize - SplitHdrSize) * math.MaxUint16
+)
+
+var ErrPktTooBig = errors.New("can't send pkt: too big")
+var ErrChNoTooBig = errors.New("can't send pkt: channel number >= ChannelCount")
+
+// Send sends a packet to the Peer.
+// It returns a channel that's closed when all chunks are acked or an error.
+// The ack channel is nil if pkt.Unrel is true.
+func (p *Peer) Send(pkt Pkt) (ack <-chan struct{}, err error) {
+ if pkt.ChNo >= ChannelCount {
+ return nil, ErrChNoTooBig
+ }
+
+ hdrsize := MtHdrSize
+ if !pkt.Unrel {
+ hdrsize += RelHdrSize
+ }
+
+ if hdrsize+OrigHdrSize+len(pkt.Data) > MaxNetPktSize {
+ c := &p.chans[pkt.ChNo]
+
+ c.outsplitmu.Lock()
+ sn := c.outsplitsn
+ c.outsplitsn++
+ c.outsplitmu.Unlock()
+
+ chunks := split(pkt.Data, MaxNetPktSize-(hdrsize+SplitHdrSize))
+
+ if len(chunks) > math.MaxUint16 {
+ return nil, ErrPktTooBig
+ }
+
+ var wg sync.WaitGroup
+
+ for i, chunk := range chunks {
+ data := make([]byte, SplitHdrSize+len(chunk))
+ data[0] = uint8(rawTypeSplit)
+ binary.BigEndian.PutUint16(data[1:3], uint16(sn))
+ binary.BigEndian.PutUint16(data[3:5], uint16(len(chunks)))
+ binary.BigEndian.PutUint16(data[5:7], uint16(i))
+ copy(data[SplitHdrSize:], chunk)
+
+ wg.Add(1)
+ ack, err := p.sendRaw(rawPkt{
+ Data: data,
+ ChNo: pkt.ChNo,
+ Unrel: pkt.Unrel,
+ })
+ if err != nil {
+ return nil, err
+ }
+ if !pkt.Unrel {
+ if ack == nil {
+ panic("ack is nil")
+ }
+ go func() {
+ <-ack
+ wg.Done()
+ }()
+ }
+ }
+
+ if pkt.Unrel {
+ return nil, nil
+ } else {
+ ack := make(chan struct{})
+
+ go func() {
+ wg.Wait()
+ close(ack)
+ }()
+
+ return ack, nil
+ }
+ }
+
+ return p.sendRaw(rawPkt{
+ Data: append([]byte{uint8(rawTypeOrig)}, pkt.Data...),
+ ChNo: pkt.ChNo,
+ Unrel: pkt.Unrel,
+ })
+}
+
+// sendRaw sends a raw packet to the Peer.
+func (p *Peer) sendRaw(pkt rawPkt) (ack <-chan struct{}, err error) {
+ if pkt.ChNo >= ChannelCount {
+ return nil, ErrChNoTooBig
+ }
+
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ select {
+ case <-p.Disco():
+ return nil, ErrClosed
+ default:
+ }
+
+ if !pkt.Unrel {
+ return p.sendRel(pkt)
+ }
+
+ data := make([]byte, MtHdrSize+len(pkt.Data))
+ binary.BigEndian.PutUint32(data[0:4], protoID)
+ binary.BigEndian.PutUint16(data[4:6], uint16(p.idOfPeer))
+ data[6] = pkt.ChNo
+ copy(data[MtHdrSize:], pkt.Data)
+
+ if len(data) > MaxNetPktSize {
+ return nil, ErrPktTooBig
+ }
+
+ _, err = p.Conn().WriteTo(data, p.Addr())
+ if errors.Is(err, net.ErrWriteToConnected) {
+ conn, ok := p.Conn().(net.Conn)
+ if !ok {
+ return nil, err
+ }
+ _, err = conn.Write(data)
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ p.ping.Reset(PingTimeout)
+
+ return nil, nil
+}
+
+// sendRel sends a reliable raw packet to the Peer.
+func (p *Peer) sendRel(pkt rawPkt) (ack <-chan struct{}, err error) {
+ if pkt.Unrel {
+ panic("mt/rudp: sendRel: pkt.Unrel is true")
+ }
+
+ c := &p.chans[pkt.ChNo]
+
+ c.outrelmu.Lock()
+ defer c.outrelmu.Unlock()
+
+ sn := c.outrelsn
+ for ; sn-c.outrelwin >= 0x8000; c.outrelwin++ {
+ if ack, ok := c.ackchans.Load(c.outrelwin); ok {
+ <-ack.(chan struct{})
+ }
+ }
+ c.outrelsn++
+
+ rwack := make(chan struct{}) // close-only
+ c.ackchans.Store(sn, rwack)
+ ack = rwack
+
+ reldata := make([]byte, RelHdrSize+len(pkt.Data))
+ reldata[0] = uint8(rawTypeRel)
+ binary.BigEndian.PutUint16(reldata[1:3], uint16(sn))
+ copy(reldata[RelHdrSize:], pkt.Data)
+ relpkt := rawPkt{
+ Data: reldata,
+ ChNo: pkt.ChNo,
+ Unrel: true,
+ }
+
+ if _, err := p.sendRaw(relpkt); err != nil {
+ c.ackchans.Delete(sn)
+
+ return nil, err
+ }
+
+ go func() {
+ resend := time.NewTicker(500 * time.Millisecond)
+ defer resend.Stop()
+
+ for {
+ select {
+ case <-resend.C:
+ if _, err := p.sendRaw(relpkt); err != nil {
+ p.errs <- fmt.Errorf("failed to re-send timed out reliable seqnum: %d: %w", sn, err)
+ }
+ case <-ack:
+ return
+ case <-p.Disco():
+ return
+ }
+ }
+ }()
+
+ return ack, nil
+}
+
+// SendDisco sends a disconnect packet to the Peer but does not close it.
+// It returns a channel that's closed when it's acked or an error.
+// The ack channel is nil if unrel is true.
+func (p *Peer) SendDisco(chno uint8, unrel bool) (ack <-chan struct{}, err error) {
+ return p.sendRaw(rawPkt{
+ Data: []byte{uint8(rawTypeCtl), uint8(ctlDisco)},
+ ChNo: chno,
+ Unrel: unrel,
+ })
+}
+
+func split(data []byte, chunksize int) [][]byte {
+ chunks := make([][]byte, 0, (len(data)+chunksize-1)/chunksize)
+
+ for i := 0; i < len(data); i += chunksize {
+ end := i + chunksize
+ if end > len(data) {
+ end = len(data)
+ }
+
+ chunks = append(chunks, data[i:end])
+ }
+
+ return chunks
+}