diff options
Diffstat (limited to 'device/peer.go')
-rw-r--r-- | device/peer.go | 23 |
1 files changed, 13 insertions, 10 deletions
diff --git a/device/peer.go b/device/peer.go index 3e4f4ec..49b9acb 100644 --- a/device/peer.go +++ b/device/peer.go @@ -51,8 +51,11 @@ type Peer struct { sentLastMinuteHandshake AtomicBool } + state struct { + mu sync.Mutex // protects against concurrent Start/Stop + } + queue struct { - sync.RWMutex staged chan *QueueOutboundElement // staged packets before a handshake is available outbound chan *QueueOutboundElement // sequential ordering of udp transmission inbound chan *QueueInboundElement // sequential ordering of tun writing @@ -158,8 +161,8 @@ func (peer *Peer) Start() { } // prevent simultaneous start/stop operations - peer.queue.Lock() - defer peer.queue.Unlock() + peer.state.mu.Lock() + defer peer.state.mu.Unlock() if peer.isRunning.Get() { return @@ -177,8 +180,8 @@ func (peer *Peer) Start() { peer.handshake.mutex.Unlock() // prepare queues - peer.queue.outbound = make(chan *QueueOutboundElement, QueueOutboundSize) - peer.queue.inbound = make(chan *QueueInboundElement, QueueInboundSize) + peer.queue.outbound = newAutodrainingOutboundQueue(device) + peer.queue.inbound = newAutodrainingInboundQueue(device) if peer.queue.staged == nil { peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize) } @@ -239,8 +242,8 @@ func (peer *Peer) ExpireCurrentKeypairs() { } func (peer *Peer) Stop() { - peer.queue.Lock() - defer peer.queue.Unlock() + peer.state.mu.Lock() + defer peer.state.mu.Unlock() if !peer.isRunning.Swap(false) { return @@ -249,9 +252,9 @@ func (peer *Peer) Stop() { peer.device.log.Verbosef("%v - Stopping...", peer) peer.timersStop() - - close(peer.queue.inbound) - close(peer.queue.outbound) + // Signal that RoutineSequentialSender and RoutineSequentialReceiver should exit. + peer.queue.inbound <- nil + peer.queue.outbound <- nil peer.stopping.Wait() peer.device.queue.encryption.wg.Done() // no more writes to encryption queue from us |