From 168ef61a638e4875b260edbc51551bae0dc34ac3 Mon Sep 17 00:00:00 2001 From: "Jason A. Donenfeld" Date: Tue, 1 May 2018 16:59:13 +0200 Subject: Add missing locks and fix debug output, and try to flush queues Flushing queues on exit is sort of a partial solution, but this could be better. Really what we want is for no more packets to be enqueued after isUp is set to false. --- receive.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 10 deletions(-) (limited to 'receive.go') diff --git a/receive.go b/receive.go index ca20900..7d35497 100644 --- a/receive.go +++ b/receive.go @@ -7,6 +7,7 @@ import ( "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" "net" + "strconv" "sync" "sync/atomic" "time" @@ -101,7 +102,11 @@ func (device *Device) addToHandshakeQueue( func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) { logDebug := device.log.Debug - logDebug.Println("Routine, receive incoming, IP version:", IP) + defer func() { + logDebug.Println("Routine: receive incoming IPv" + strconv.Itoa(IP) + " - stopped") + }() + + logDebug.Println("Routine: receive incoming IPv" + strconv.Itoa(IP) + " - starting") // receive datagrams until conn is closed @@ -224,15 +229,31 @@ func (device *Device) RoutineDecryption() { var nonce [chacha20poly1305.NonceSize]byte logDebug := device.log.Debug - logDebug.Println("Routine, decryption, started for device") + defer func() { + for { + select { + case elem, ok := <-device.queue.decryption: + if ok { + elem.Drop() + } + default: + break + } + } + logDebug.Println("Routine: decryption worker - stopped") + }() + logDebug.Println("Routine: decryption worker - started") for { select { case <-device.signal.stop.Wait(): - logDebug.Println("Routine, decryption worker, stopped") return - case elem := <-device.queue.decryption: + case elem, ok := <-device.queue.decryption: + + if !ok { + return + } // check if dropped @@ -282,18 +303,35 @@ func (device *Device) RoutineHandshake() { logInfo := device.log.Info logError := device.log.Error logDebug := device.log.Debug - logDebug.Println("Routine, handshake routine, started for device") + + defer func() { + for { + select { + case <-device.queue.handshake: + default: + return + } + } + logDebug.Println("Routine: handshake worker - stopped") + }() + + logDebug.Println("Routine: handshake worker - started") var temp [MessageHandshakeSize]byte var elem QueueHandshakeElement + var ok bool for { select { - case elem = <-device.queue.handshake: + case elem, ok = <-device.queue.handshake: case <-device.signal.stop.Wait(): return } + if !ok { + return + } + // handle cookie fields and ratelimiting switch elem.msgType { @@ -419,7 +457,7 @@ func (device *Device) RoutineHandshake() { peer.endpoint = elem.endpoint peer.mutex.Unlock() - logDebug.Println(peer, ": Received handshake initiation") + logDebug.Println(peer.String() + ": Received handshake initiation") // create response @@ -477,7 +515,7 @@ func (device *Device) RoutineHandshake() { peer.endpoint = elem.endpoint peer.mutex.Unlock() - logDebug.Println(peer, ": Received handshake response") + logDebug.Println(peer.String() + ": Received handshake response") peer.TimerEphemeralKeyCreated() @@ -504,10 +542,10 @@ func (peer *Peer) RoutineSequentialReceiver() { defer func() { peer.routines.stopping.Done() - logDebug.Println(peer.String(), ": Routine, Sequential Receiver, Stopped") + logDebug.Println(peer.String() + ": Routine: sequential receiver - stopped") }() - logDebug.Println(peer.String(), ": Routine, Sequential Receiver, Started") + logDebug.Println(peer.String() + ": Routine: sequential receiver - started") peer.routines.starting.Done() -- cgit v1.2.3