diff options
author | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-09-16 21:50:58 +0200 |
---|---|---|
committer | Jason A. Donenfeld <Jason@zx2c4.com> | 2018-09-16 21:50:58 +0200 |
commit | 39d6e4f2f18265c9cee1a2b1b456f6468950b932 (patch) | |
tree | 2d6c604dd6b5c16ca3a1cbf96d94351bc9c7c864 /send.go | |
parent | 1c025570139f614f2083b935e2c58d5dbf199c2f (diff) | |
download | wireguard-go-39d6e4f2f18265c9cee1a2b1b456f6468950b932.tar.gz wireguard-go-39d6e4f2f18265c9cee1a2b1b456f6468950b932.zip |
Change queueing drop order and fix memory leaks
If the queues are full, we drop the present packet, which is better for
network traffic flow. Also, we try to fix up the memory leaks with not
putting buffers from our shared pool.
Diffstat (limited to 'send.go')
-rw-r--r-- | send.go | 43 |
1 files changed, 21 insertions, 22 deletions
@@ -66,10 +66,7 @@ func (elem *QueueOutboundElement) IsDropped() bool { return atomic.LoadInt32(&elem.dropped) == AtomicTrue } -func addToOutboundQueue( - queue chan *QueueOutboundElement, - element *QueueOutboundElement, -) { +func addToNonceQueue(queue chan *QueueOutboundElement, element *QueueOutboundElement, device *Device) { for { select { case queue <- element: @@ -78,32 +75,30 @@ func addToOutboundQueue( select { case old := <-queue: old.Drop() + device.PutMessageBuffer(element.buffer) default: } } } } -func addToEncryptionQueue( - queue chan *QueueOutboundElement, - element *QueueOutboundElement, -) { - for { +func addToOutboundAndEncryptionQueues(outboundQueue chan *QueueOutboundElement, encryptionQueue chan *QueueOutboundElement, element *QueueOutboundElement) { + select { + case outboundQueue <- element: select { - case queue <- element: + case encryptionQueue <- element: return default: - select { - case old := <-queue: - // drop & release to potential consumer - old.Drop() - old.mutex.Unlock() - default: - } + element.Drop() + element.peer.device.PutMessageBuffer(element.buffer) + element.mutex.Unlock() } + default: + element.peer.device.PutMessageBuffer(element.buffer) } } + /* Queues a keepalive if no packets are queued for peer */ func (peer *Peer) SendKeepalive() bool { @@ -117,6 +112,7 @@ func (peer *Peer) SendKeepalive() bool { peer.device.log.Debug.Println(peer, "- Sending keepalive packet") return true default: + peer.device.PutMessageBuffer(elem.buffer) return false } } @@ -267,6 +263,7 @@ func (device *Device) RoutineReadFromTUN() { logError.Println("Failed to read packet from TUN device:", err) device.Close() } + device.PutMessageBuffer(elem.buffer) return } @@ -308,7 +305,7 @@ func (device *Device) RoutineReadFromTUN() { if peer.queue.packetInNonceQueueIsAwaitingKey.Get() { peer.SendHandshakeInitiation(false) } - addToOutboundQueue(peer.queue.nonce, elem) + addToNonceQueue(peer.queue.nonce, elem, device) elem = device.NewOutboundElement() } } @@ -342,7 +339,8 @@ func (peer *Peer) RoutineNonce() { flush := func() { for { select { - case <-peer.queue.nonce: + case elem := <-peer.queue.nonce: + device.PutMessageBuffer(elem.buffer) default: return } @@ -402,10 +400,12 @@ func (peer *Peer) RoutineNonce() { logDebug.Println(peer, "- Obtained awaited keypair") case <-peer.signals.flushNonceQueue: + device.PutMessageBuffer(elem.buffer) flush() goto NextPacket case <-peer.routines.stop: + device.PutMessageBuffer(elem.buffer) return } } @@ -420,6 +420,7 @@ func (peer *Peer) RoutineNonce() { if elem.nonce >= RejectAfterMessages { atomic.StoreUint64(&keypair.sendNonce, RejectAfterMessages) + device.PutMessageBuffer(elem.buffer) goto NextPacket } @@ -428,9 +429,7 @@ func (peer *Peer) RoutineNonce() { elem.mutex.Lock() // add to parallel and sequential queue - - addToEncryptionQueue(device.queue.encryption, elem) - addToOutboundQueue(peer.queue.outbound, elem) + addToOutboundAndEncryptionQueues(peer.queue.outbound, device.queue.encryption, elem) } } } |