diff options
Diffstat (limited to 'receive.go')
-rw-r--r-- | receive.go | 79 |
1 files changed, 26 insertions, 53 deletions
@@ -43,59 +43,28 @@ func (elem *QueueInboundElement) IsDropped() bool { return atomic.LoadInt32(&elem.dropped) == AtomicTrue } -func (device *Device) addToInboundQueue( - queue chan *QueueInboundElement, - element *QueueInboundElement, -) { - for { +func (device *Device) addToInboundAndDecryptionQueues(inboundQueue chan *QueueInboundElement, decryptionQueue chan *QueueInboundElement, element *QueueInboundElement) bool { + select { + case inboundQueue <- element: select { - case queue <- element: - return + case decryptionQueue <- element: + return true default: - select { - case old := <-queue: - old.Drop() - default: - } + element.Drop() + element.mutex.Unlock() + return false } + default: + return false } } -func (device *Device) addToDecryptionQueue( - queue chan *QueueInboundElement, - element *QueueInboundElement, -) { - for { - select { - case queue <- element: - return - default: - select { - case old := <-queue: - // drop & release to potential consumer - old.Drop() - old.mutex.Unlock() - default: - } - } - } -} - -func (device *Device) addToHandshakeQueue( - queue chan QueueHandshakeElement, - element QueueHandshakeElement, -) { - for { - select { - case queue <- element: - return - default: - select { - case elem := <-queue: - device.PutMessageBuffer(elem.buffer) - default: - } - } +func (device *Device) addToHandshakeQueue(queue chan QueueHandshakeElement, element QueueHandshakeElement) bool { + select { + case queue <- element: + return true + default: + return false } } @@ -154,6 +123,7 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) { } if err != nil { + device.PutMessageBuffer(buffer) return } @@ -212,9 +182,9 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) { // add to decryption queues if peer.isRunning.Get() { - device.addToDecryptionQueue(device.queue.decryption, elem) - device.addToInboundQueue(peer.queue.inbound, elem) - buffer = device.GetMessageBuffer() + if device.addToInboundAndDecryptionQueues(peer.queue.inbound, device.queue.decryption, elem) { + buffer = device.GetMessageBuffer() + } } continue @@ -235,7 +205,7 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) { } if okay { - device.addToHandshakeQueue( + if (device.addToHandshakeQueue( device.queue.handshake, QueueHandshakeElement{ msgType: msgType, @@ -243,8 +213,9 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) { packet: packet, endpoint: endpoint, }, - ) - buffer = device.GetMessageBuffer() + )) { + buffer = device.GetMessageBuffer() + } } } } @@ -307,6 +278,8 @@ func (device *Device) RoutineDecryption() { ) if err != nil { elem.Drop() + device.PutMessageBuffer(elem.buffer) + elem.mutex.Unlock() } elem.mutex.Unlock() } |