diff options
Diffstat (limited to 'src/send.go')
-rw-r--r-- | src/send.go | 44 |
1 files changed, 34 insertions, 10 deletions
diff --git a/src/send.go b/src/send.go index d4f9342..7a10560 100644 --- a/src/send.go +++ b/src/send.go @@ -27,6 +27,7 @@ import ( * workers release lock when they have completed work on the packet. */ type QueueOutboundElement struct { + state uint32 mutex sync.Mutex packet []byte nonce uint64 @@ -59,6 +60,14 @@ func (peer *Peer) InsertOutbound(elem *QueueOutboundElement) { } } +func (elem *QueueOutboundElement) Drop() { + atomic.StoreUint32(&elem.state, ElementStateDropped) +} + +func (elem *QueueOutboundElement) IsDropped() bool { + return atomic.LoadUint32(&elem.state) == ElementStateDropped +} + /* Reads packets from the TUN and inserts * into nonce queue for peer * @@ -162,6 +171,8 @@ func (peer *Peer) RoutineNonce() { } } + logger.Println("PACKET:", packet) + // wait for key pair for { @@ -176,6 +187,7 @@ func (peer *Peer) RoutineNonce() { break } } + logger.Println("Key pair:", keyPair) sendSignal(peer.signal.handshakeBegin) logger.Println("Waiting for key-pair, peer", peer.id) @@ -205,10 +217,12 @@ func (peer *Peer) RoutineNonce() { work := new(QueueOutboundElement) // TODO: profile, maybe use pool work.keyPair = keyPair work.packet = packet - work.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) + work.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1 work.peer = peer work.mutex.Lock() + logger.Println("WORK:", work) + packet = nil // drop packets until there is space @@ -219,9 +233,11 @@ func (peer *Peer) RoutineNonce() { case peer.device.queue.encryption <- work: return default: - drop := <-peer.device.queue.encryption - drop.packet = nil - drop.mutex.Unlock() + select { + case elem := <-peer.device.queue.encryption: + elem.Drop() + default: + } } } }() @@ -241,18 +257,22 @@ func (peer *Peer) RoutineNonce() { func (device *Device) RoutineEncryption() { var nonce [chacha20poly1305.NonceSize]byte for work := range device.queue.encryption { + if work.IsDropped() { + continue + } // pad packet padding := device.mtu - len(work.packet) if padding < 0 { - // drop - work.packet = nil - work.mutex.Unlock() + work.Drop() + continue } + for n := 0; n < padding; n += 1 { work.packet = append(work.packet, 0) } + device.log.Debug.Println(work.packet) // encrypt @@ -288,6 +308,9 @@ func (peer *Peer) RoutineSequentialSender() { logger.Println("Routine, sequential sender, stopped for peer", peer.id) return case work := <-peer.queue.outbound: + if work.IsDropped() { + continue + } work.mutex.Lock() func() { if work.packet == nil { @@ -310,10 +333,12 @@ func (peer *Peer) RoutineSequentialSender() { return } - logger.Println("Sending packet for peer", peer.id, work.packet) + logger.Println(work.packet) _, err := device.net.conn.WriteToUDP(work.packet, peer.endpoint) - logger.Println("SEND:", peer.endpoint, err) + if err != nil { + return + } atomic.AddUint64(&peer.tx_bytes, uint64(len(work.packet))) // shift keep-alive timer @@ -323,7 +348,6 @@ func (peer *Peer) RoutineSequentialSender() { peer.timer.sendKeepalive.Reset(interval) } }() - work.mutex.Unlock() } } } |