diff options
Diffstat (limited to 'send.go')
-rw-r--r-- | send.go | 43 |
1 files changed, 21 insertions, 22 deletions
@@ -66,10 +66,7 @@ func (elem *QueueOutboundElement) IsDropped() bool { return atomic.LoadInt32(&elem.dropped) == AtomicTrue } -func addToOutboundQueue( - queue chan *QueueOutboundElement, - element *QueueOutboundElement, -) { +func addToNonceQueue(queue chan *QueueOutboundElement, element *QueueOutboundElement, device *Device) { for { select { case queue <- element: @@ -78,32 +75,30 @@ func addToOutboundQueue( select { case old := <-queue: old.Drop() + device.PutMessageBuffer(element.buffer) default: } } } } -func addToEncryptionQueue( - queue chan *QueueOutboundElement, - element *QueueOutboundElement, -) { - for { +func addToOutboundAndEncryptionQueues(outboundQueue chan *QueueOutboundElement, encryptionQueue chan *QueueOutboundElement, element *QueueOutboundElement) { + select { + case outboundQueue <- element: select { - case queue <- element: + case encryptionQueue <- element: return default: - select { - case old := <-queue: - // drop & release to potential consumer - old.Drop() - old.mutex.Unlock() - default: - } + element.Drop() + element.peer.device.PutMessageBuffer(element.buffer) + element.mutex.Unlock() } + default: + element.peer.device.PutMessageBuffer(element.buffer) } } + /* Queues a keepalive if no packets are queued for peer */ func (peer *Peer) SendKeepalive() bool { @@ -117,6 +112,7 @@ func (peer *Peer) SendKeepalive() bool { peer.device.log.Debug.Println(peer, "- Sending keepalive packet") return true default: + peer.device.PutMessageBuffer(elem.buffer) return false } } @@ -267,6 +263,7 @@ func (device *Device) RoutineReadFromTUN() { logError.Println("Failed to read packet from TUN device:", err) device.Close() } + device.PutMessageBuffer(elem.buffer) return } @@ -308,7 +305,7 @@ func (device *Device) RoutineReadFromTUN() { if peer.queue.packetInNonceQueueIsAwaitingKey.Get() { peer.SendHandshakeInitiation(false) } - addToOutboundQueue(peer.queue.nonce, elem) + addToNonceQueue(peer.queue.nonce, elem, device) elem = device.NewOutboundElement() } } @@ -342,7 +339,8 @@ func (peer *Peer) RoutineNonce() { flush := func() { for { select { - case <-peer.queue.nonce: + case elem := <-peer.queue.nonce: + device.PutMessageBuffer(elem.buffer) default: return } @@ -402,10 +400,12 @@ func (peer *Peer) RoutineNonce() { logDebug.Println(peer, "- Obtained awaited keypair") case <-peer.signals.flushNonceQueue: + device.PutMessageBuffer(elem.buffer) flush() goto NextPacket case <-peer.routines.stop: + device.PutMessageBuffer(elem.buffer) return } } @@ -420,6 +420,7 @@ func (peer *Peer) RoutineNonce() { if elem.nonce >= RejectAfterMessages { atomic.StoreUint64(&keypair.sendNonce, RejectAfterMessages) + device.PutMessageBuffer(elem.buffer) goto NextPacket } @@ -428,9 +429,7 @@ func (peer *Peer) RoutineNonce() { elem.mutex.Lock() // add to parallel and sequential queue - - addToEncryptionQueue(device.queue.encryption, elem) - addToOutboundQueue(peer.queue.outbound, elem) + addToOutboundAndEncryptionQueues(peer.queue.outbound, device.queue.encryption, elem) } } } |