From f73d2fb2d96bc3fbc8bc4cce452e3c19689de01e Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Fri, 26 Jan 2018 22:52:32 +0100 Subject: Added initial version of peer teardown There is a double lock issue with device.Close which has yet to be resolved. --- src/peer.go | 68 +++++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 17 deletions(-) (limited to 'src/peer.go') diff --git a/src/peer.go b/src/peer.go index 3d82989..5ad4511 100644 --- a/src/peer.go +++ b/src/peer.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "errors" "fmt" + "github.com/sasha-s/go-deadlock" "sync" "time" ) @@ -14,7 +15,8 @@ const ( type Peer struct { id uint - mutex sync.RWMutex + isRunning AtomicBool + mutex deadlock.RWMutex persistentKeepaliveInterval uint64 keyPairs KeyPairs handshake Handshake @@ -26,7 +28,7 @@ type Peer struct { lastHandshakeNano int64 // nano seconds since epoch } time struct { - mutex sync.RWMutex + mutex deadlock.RWMutex lastSend time.Time // last send message lastHandshake time.Time // last completed handshake nextKeepalive time.Time @@ -58,7 +60,7 @@ type Peer struct { inbound chan *QueueInboundElement // sequential ordering of work } routines struct { - mutex sync.Mutex // held when stopping / starting routines + mutex deadlock.Mutex // held when stopping / starting routines starting sync.WaitGroup // routines pending start stopping sync.WaitGroup // routines pending stop stop Signal // size 0, stop all goroutines in peer @@ -67,6 +69,14 @@ type Peer struct { } func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) { + + if device.isClosed.Get() { + return nil, errors.New("Device closed") + } + + device.mutex.Lock() + defer device.mutex.Unlock() + // create peer peer := new(Peer) @@ -75,17 +85,17 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) { peer.mac.Init(pk) peer.device = device + peer.isRunning.Set(false) + peer.timer.zeroAllKeys = NewTimer() peer.timer.keepalivePersistent = NewTimer() peer.timer.keepalivePassive = NewTimer() - peer.timer.zeroAllKeys = NewTimer() peer.timer.handshakeNew = NewTimer() peer.timer.handshakeDeadline = NewTimer() peer.timer.handshakeTimeout = NewTimer() // assign id for debugging - device.mutex.Lock() peer.id = device.idCounter device.idCounter += 1 @@ -102,7 +112,6 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) { return nil, errors.New("Adding existing peer") } device.peers[pk] = peer - device.mutex.Unlock() // precompute DH @@ -117,23 +126,20 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) { peer.endpoint = nil - // prepare queuing - - peer.queue.nonce = make(chan *QueueOutboundElement, QueueOutboundSize) - peer.queue.outbound = make(chan *QueueOutboundElement, QueueOutboundSize) - peer.queue.inbound = make(chan *QueueInboundElement, QueueInboundSize) - // prepare signaling & routines - peer.signal.newKeyPair = NewSignal() - peer.signal.handshakeBegin = NewSignal() - peer.signal.handshakeCompleted = NewSignal() - peer.signal.flushNonceQueue = NewSignal() - peer.routines.mutex.Lock() peer.routines.stop = NewSignal() peer.routines.mutex.Unlock() + // start peer + + peer.device.state.mutex.Lock() + if peer.device.isUp.Get() { + peer.Start() + } + peer.device.state.mutex.Unlock() + return peer, nil } @@ -148,6 +154,10 @@ func (peer *Peer) SendBuffer(buffer []byte) error { return errors.New("No known endpoint for peer") } + if peer.device.net.bind == nil { + return errors.New("No bind") + } + return peer.device.net.bind.Send(buffer, peer.endpoint) } @@ -174,12 +184,26 @@ func (peer *Peer) Start() { peer.routines.mutex.Lock() defer peer.routines.mutex.Lock() + peer.device.log.Debug.Println("Starting:", peer.String()) + // stop & wait for ungoing routines (if any) + peer.isRunning.Set(false) peer.routines.stop.Broadcast() peer.routines.starting.Wait() peer.routines.stopping.Wait() + // prepare queues + + peer.signal.newKeyPair = NewSignal() + peer.signal.handshakeBegin = NewSignal() + peer.signal.handshakeCompleted = NewSignal() + peer.signal.flushNonceQueue = NewSignal() + + peer.queue.nonce = make(chan *QueueOutboundElement, QueueOutboundSize) + peer.queue.outbound = make(chan *QueueOutboundElement, QueueOutboundSize) + peer.queue.inbound = make(chan *QueueInboundElement, QueueInboundSize) + // reset signal and start (new) routines peer.routines.stop = NewSignal() @@ -192,6 +216,7 @@ func (peer *Peer) Start() { go peer.RoutineSequentialReceiver() peer.routines.starting.Wait() + peer.isRunning.Set(true) } func (peer *Peer) Stop() { @@ -199,13 +224,22 @@ func (peer *Peer) Stop() { peer.routines.mutex.Lock() defer peer.routines.mutex.Lock() + peer.device.log.Debug.Println("Stopping:", peer.String()) + // stop & wait for ungoing routines (if any) peer.routines.stop.Broadcast() peer.routines.starting.Wait() peer.routines.stopping.Wait() + // close queues + + close(peer.queue.nonce) + close(peer.queue.outbound) + close(peer.queue.inbound) + // reset signal (to handle repeated stopping) peer.routines.stop = NewSignal() + peer.isRunning.Set(false) } -- cgit v1.2.3