diff options
author | anon5 <anon5clam@protonmail.com> | 2020-11-07 18:01:24 +0000 |
---|---|---|
committer | anon5 <anon5clam@protonmail.com> | 2020-11-07 18:01:24 +0000 |
commit | a865d2bce1aa097273fdb9d0d02d9cfa8460aefd (patch) | |
tree | 95ff9b35607b57b330ec579f4b96f84e3bf33349 /rudp/send.go |
Initial public release
Diffstat (limited to 'rudp/send.go')
-rw-r--r-- | rudp/send.go | 248 |
1 files changed, 248 insertions, 0 deletions
diff --git a/rudp/send.go b/rudp/send.go new file mode 100644 index 0000000..3cfcda4 --- /dev/null +++ b/rudp/send.go @@ -0,0 +1,248 @@ +package rudp + +import ( + "encoding/binary" + "errors" + "fmt" + "math" + "net" + "sync" + "time" +) + +const ( + // protoID + src PeerID + channel number + MtHdrSize = 4 + 2 + 1 + + // rawTypeOrig + OrigHdrSize = 1 + + // rawTypeSpilt + seqnum + chunk count + chunk number + SplitHdrSize = 1 + 2 + 2 + 2 + + // rawTypeRel + seqnum + RelHdrSize = 1 + 2 +) + +const ( + MaxNetPktSize = 512 + + MaxUnrelRawPktSize = MaxNetPktSize - MtHdrSize + MaxRelRawPktSize = MaxUnrelRawPktSize - RelHdrSize + + MaxRelPktSize = (MaxRelRawPktSize - SplitHdrSize) * math.MaxUint16 + MaxUnrelPktSize = (MaxUnrelRawPktSize - SplitHdrSize) * math.MaxUint16 +) + +var ErrPktTooBig = errors.New("can't send pkt: too big") +var ErrChNoTooBig = errors.New("can't send pkt: channel number >= ChannelCount") + +// Send sends a packet to the Peer. +// It returns a channel that's closed when all chunks are acked or an error. +// The ack channel is nil if pkt.Unrel is true. +func (p *Peer) Send(pkt Pkt) (ack <-chan struct{}, err error) { + if pkt.ChNo >= ChannelCount { + return nil, ErrChNoTooBig + } + + hdrsize := MtHdrSize + if !pkt.Unrel { + hdrsize += RelHdrSize + } + + if hdrsize+OrigHdrSize+len(pkt.Data) > MaxNetPktSize { + c := &p.chans[pkt.ChNo] + + c.outsplitmu.Lock() + sn := c.outsplitsn + c.outsplitsn++ + c.outsplitmu.Unlock() + + chunks := split(pkt.Data, MaxNetPktSize-(hdrsize+SplitHdrSize)) + + if len(chunks) > math.MaxUint16 { + return nil, ErrPktTooBig + } + + var wg sync.WaitGroup + + for i, chunk := range chunks { + data := make([]byte, SplitHdrSize+len(chunk)) + data[0] = uint8(rawTypeSplit) + binary.BigEndian.PutUint16(data[1:3], uint16(sn)) + binary.BigEndian.PutUint16(data[3:5], uint16(len(chunks))) + binary.BigEndian.PutUint16(data[5:7], uint16(i)) + copy(data[SplitHdrSize:], chunk) + + wg.Add(1) + ack, err := p.sendRaw(rawPkt{ + Data: data, + ChNo: pkt.ChNo, + Unrel: pkt.Unrel, + }) + if err != nil { + return nil, err + } + if !pkt.Unrel { + if ack == nil { + panic("ack is nil") + } + go func() { + <-ack + wg.Done() + }() + } + } + + if pkt.Unrel { + return nil, nil + } else { + ack := make(chan struct{}) + + go func() { + wg.Wait() + close(ack) + }() + + return ack, nil + } + } + + return p.sendRaw(rawPkt{ + Data: append([]byte{uint8(rawTypeOrig)}, pkt.Data...), + ChNo: pkt.ChNo, + Unrel: pkt.Unrel, + }) +} + +// sendRaw sends a raw packet to the Peer. +func (p *Peer) sendRaw(pkt rawPkt) (ack <-chan struct{}, err error) { + if pkt.ChNo >= ChannelCount { + return nil, ErrChNoTooBig + } + + p.mu.RLock() + defer p.mu.RUnlock() + + select { + case <-p.Disco(): + return nil, ErrClosed + default: + } + + if !pkt.Unrel { + return p.sendRel(pkt) + } + + data := make([]byte, MtHdrSize+len(pkt.Data)) + binary.BigEndian.PutUint32(data[0:4], protoID) + binary.BigEndian.PutUint16(data[4:6], uint16(p.idOfPeer)) + data[6] = pkt.ChNo + copy(data[MtHdrSize:], pkt.Data) + + if len(data) > MaxNetPktSize { + return nil, ErrPktTooBig + } + + _, err = p.Conn().WriteTo(data, p.Addr()) + if errors.Is(err, net.ErrWriteToConnected) { + conn, ok := p.Conn().(net.Conn) + if !ok { + return nil, err + } + _, err = conn.Write(data) + } + if err != nil { + return nil, err + } + + p.ping.Reset(PingTimeout) + + return nil, nil +} + +// sendRel sends a reliable raw packet to the Peer. +func (p *Peer) sendRel(pkt rawPkt) (ack <-chan struct{}, err error) { + if pkt.Unrel { + panic("mt/rudp: sendRel: pkt.Unrel is true") + } + + c := &p.chans[pkt.ChNo] + + c.outrelmu.Lock() + defer c.outrelmu.Unlock() + + sn := c.outrelsn + for ; sn-c.outrelwin >= 0x8000; c.outrelwin++ { + if ack, ok := c.ackchans.Load(c.outrelwin); ok { + <-ack.(chan struct{}) + } + } + c.outrelsn++ + + rwack := make(chan struct{}) // close-only + c.ackchans.Store(sn, rwack) + ack = rwack + + reldata := make([]byte, RelHdrSize+len(pkt.Data)) + reldata[0] = uint8(rawTypeRel) + binary.BigEndian.PutUint16(reldata[1:3], uint16(sn)) + copy(reldata[RelHdrSize:], pkt.Data) + relpkt := rawPkt{ + Data: reldata, + ChNo: pkt.ChNo, + Unrel: true, + } + + if _, err := p.sendRaw(relpkt); err != nil { + c.ackchans.Delete(sn) + + return nil, err + } + + go func() { + resend := time.NewTicker(500 * time.Millisecond) + defer resend.Stop() + + for { + select { + case <-resend.C: + if _, err := p.sendRaw(relpkt); err != nil { + p.errs <- fmt.Errorf("failed to re-send timed out reliable seqnum: %d: %w", sn, err) + } + case <-ack: + return + case <-p.Disco(): + return + } + } + }() + + return ack, nil +} + +// SendDisco sends a disconnect packet to the Peer but does not close it. +// It returns a channel that's closed when it's acked or an error. +// The ack channel is nil if unrel is true. +func (p *Peer) SendDisco(chno uint8, unrel bool) (ack <-chan struct{}, err error) { + return p.sendRaw(rawPkt{ + Data: []byte{uint8(rawTypeCtl), uint8(ctlDisco)}, + ChNo: chno, + Unrel: unrel, + }) +} + +func split(data []byte, chunksize int) [][]byte { + chunks := make([][]byte, 0, (len(data)+chunksize-1)/chunksize) + + for i := 0; i < len(data); i += chunksize { + end := i + chunksize + if end > len(data) { + end = len(data) + } + + chunks = append(chunks, data[i:end]) + } + + return chunks +} |