From eb75ff430d1f78e129bbfe49d612f241ca418df4 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Mon, 26 Jun 2017 22:07:29 +0200 Subject: Begin implementation of outbound work queue --- src/send.go | 124 ++++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 78 insertions(+), 46 deletions(-) (limited to 'src/send.go') diff --git a/src/send.go b/src/send.go index 9790320..da5905d 100644 --- a/src/send.go +++ b/src/send.go @@ -1,9 +1,11 @@ package main import ( + "encoding/binary" + "golang.org/x/crypto/chacha20poly1305" "net" "sync" - "sync/atomic" + "time" ) /* Handles outbound flow @@ -70,85 +72,115 @@ func (device *Device) SendPacket(packet []byte) { * * TODO: avoid dynamic allocation of work queue elements */ -func (peer *Peer) ConsumeOutboundPackets() { +func (peer *Peer) RoutineOutboundNonceWorker() { + var packet []byte + var keyPair *KeyPair + var flushTimer time.Timer + for { - // wait for key pair - keyPair := func() *KeyPair { - peer.keyPairs.mutex.RLock() - defer peer.keyPairs.mutex.RUnlock() - return peer.keyPairs.current - }() - if keyPair == nil { - if len(peer.queueOutboundRouting) > 0 { - // TODO: start handshake - <-peer.keyPairs.newKeyPair - } - continue + + // wait for packet + + if packet == nil { + packet = <-peer.queueOutboundRouting } - // assign packets key pair - for { + // wait for key pair + + for keyPair == nil { + flushTimer.Reset(time.Second * 10) + // TODO: Handshake or NOP select { case <-peer.keyPairs.newKeyPair: - default: - case <-peer.keyPairs.newKeyPair: - case packet := <-peer.queueOutboundRouting: + keyPair = peer.keyPairs.Current() + continue + case <-flushTimer.C: + size := len(peer.queueOutboundRouting) + for i := 0; i < size; i += 1 { + <-peer.queueOutboundRouting + } + packet = nil + } + break + } + + // process current packet + + if packet != nil { - // create new work element + // create work element - work := new(OutboundWorkQueueElement) - work.wg.Add(1) - work.keyPair = keyPair - work.packet = packet - work.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1 + work := new(OutboundWorkQueueElement) + work.wg.Add(1) + work.keyPair = keyPair + work.packet = packet + work.nonce = keyPair.sendNonce - peer.queueOutbound <- work + packet = nil + peer.queueOutbound <- work + keyPair.sendNonce += 1 - // drop packets until there is room + // drop packets until there is space + func() { for { select { case peer.device.queueWorkOutbound <- work: - break + return default: drop := <-peer.device.queueWorkOutbound drop.packet = nil drop.wg.Done() } } - } + }() } } } +/* Go routine + * + * sequentially reads packets from queue and sends to endpoint + * + */ func (peer *Peer) RoutineSequential() { for work := range peer.queueOutbound { work.wg.Wait() + + // check if dropped ("ghost packet") + if work.packet == nil { continue } + + // + } } -func (device *Device) EncryptionWorker() { - for { - work := <-device.queueWorkOutbound - - func() { - defer work.wg.Done() +func (device *Device) RoutineEncryptionWorker() { + var nonce [chacha20poly1305.NonceSize]byte + for work := range device.queueWorkOutbound { + // pad packet - // pad packet - padding := device.mtu - len(work.packet) - if padding < 0 { - work.packet = nil - return - } - for n := 0; n < padding; n += 1 { - work.packet = append(work.packet, 0) // TODO: gotta be a faster way - } + padding := device.mtu - len(work.packet) + if padding < 0 { + work.packet = nil + work.wg.Done() + } + for n := 0; n < padding; n += 1 { + work.packet = append(work.packet, 0) + } - // + // encrypt - }() + binary.LittleEndian.PutUint64(nonce[4:], work.nonce) + work.packet = work.keyPair.send.Seal( + work.packet[:0], + nonce[:], + work.packet, + nil, + ) + work.wg.Done() } } -- cgit v1.2.3