From ba3e486667987f16290ac85dc35b53cb9702d662 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Fri, 30 Jun 2017 14:41:08 +0200 Subject: Completed initial version of outbound flow --- src/send.go | 206 +++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 140 insertions(+), 66 deletions(-) (limited to 'src/send.go') diff --git a/src/send.go b/src/send.go index ab75750..d4f9342 100644 --- a/src/send.go +++ b/src/send.go @@ -5,6 +5,8 @@ import ( "golang.org/x/crypto/chacha20poly1305" "net" "sync" + "sync/atomic" + "time" ) /* Handles outbound flow @@ -29,6 +31,7 @@ type QueueOutboundElement struct { packet []byte nonce uint64 keyPair *KeyPair + peer *Peer } func (peer *Peer) FlushNonceQueue() { @@ -46,6 +49,7 @@ func (peer *Peer) InsertOutbound(elem *QueueOutboundElement) { for { select { case peer.queue.outbound <- elem: + return default: select { case <-peer.queue.outbound: @@ -61,11 +65,15 @@ func (peer *Peer) InsertOutbound(elem *QueueOutboundElement) { * Obs. Single instance per TUN device */ func (device *Device) RoutineReadFromTUN(tun TUNDevice) { + if tun.MTU() == 0 { + // Dummy + return + } + device.log.Debug.Println("Routine, TUN Reader: started") for { // read packet - device.log.Debug.Println("Read") packet := make([]byte, 1<<16) // TODO: Fix & avoid dynamic allocation size, err := tun.Read(packet) if err != nil { @@ -94,13 +102,16 @@ func (device *Device) RoutineReadFromTUN(tun TUNDevice) { default: device.log.Debug.Println("Receieved packet with unknown IP version") - return } if peer == nil { device.log.Debug.Println("No peer configured for IP") continue } + if peer.endpoint == nil { + device.log.Debug.Println("No known endpoint for peer", peer.id) + continue + } // insert into nonce/pre-handshake queue @@ -131,69 +142,95 @@ func (peer *Peer) RoutineNonce() { var packet []byte var keyPair *KeyPair - for { + device := peer.device + logger := device.log.Debug - // wait for packet + logger.Println("Routine, nonce worker, started for peer", peer.id) - if packet == nil { - select { - case packet = <-peer.queue.nonce: - case <-peer.signal.stopSending: - close(peer.queue.outbound) - return + func() { + + for { + NextPacket: + + // wait for packet + + if packet == nil { + select { + case packet = <-peer.queue.nonce: + case <-peer.signal.stop: + return + } } - } - // wait for key pair + // wait for key pair + + for { + select { + case <-peer.signal.newKeyPair: + default: + } - for keyPair == nil { - peer.signal.newHandshake <- true - select { - case <-peer.keyPairs.newKeyPair: keyPair = peer.keyPairs.Current() - continue - case <-peer.signal.flushNonceQueue: - peer.FlushNonceQueue() - packet = nil - continue - case <-peer.signal.stopSending: - close(peer.queue.outbound) - return - } - } + if keyPair != nil && keyPair.sendNonce < RejectAfterMessages { + if time.Now().Sub(keyPair.created) < RejectAfterTime { + break + } + } - // process current packet + sendSignal(peer.signal.handshakeBegin) + logger.Println("Waiting for key-pair, peer", peer.id) - if packet != nil { + select { + case <-peer.signal.newKeyPair: + logger.Println("Key-pair negotiated for peer", peer.id) + goto NextPacket + + case <-peer.signal.flushNonceQueue: + logger.Println("Clearing queue for peer", peer.id) + peer.FlushNonceQueue() + packet = nil + goto NextPacket + + case <-peer.signal.stop: + return + } + } - // create work element + // process current packet - work := new(QueueOutboundElement) // TODO: profile, maybe use pool - work.keyPair = keyPair - work.packet = packet - work.nonce = keyPair.sendNonce - work.mutex.Lock() + if packet != nil { - packet = nil - keyPair.sendNonce += 1 + // create work element - // drop packets until there is space + work := new(QueueOutboundElement) // TODO: profile, maybe use pool + work.keyPair = keyPair + work.packet = packet + work.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) + work.peer = peer + work.mutex.Lock() - func() { - for { - select { - case peer.device.queue.encryption <- work: - return - default: - drop := <-peer.device.queue.encryption - drop.packet = nil - drop.mutex.Unlock() + packet = nil + + // drop packets until there is space + + func() { + for { + select { + case peer.device.queue.encryption <- work: + return + default: + drop := <-peer.device.queue.encryption + drop.packet = nil + drop.mutex.Unlock() + } } - } - }() - peer.queue.outbound <- work + }() + peer.queue.outbound <- work + } } - } + }() + + logger.Println("Routine, nonce worker, stopped for peer", peer.id) } /* Encrypts the elements in the queue @@ -227,6 +264,10 @@ func (device *Device) RoutineEncryption() { nil, ) work.mutex.Unlock() + + // initiate new handshake + + work.peer.KeepKeyFreshSending() } } @@ -235,21 +276,54 @@ func (device *Device) RoutineEncryption() { * Obs. Single instance per peer. * The routine terminates then the outbound queue is closed. */ -func (peer *Peer) RoutineSequential() { - for work := range peer.queue.outbound { - work.mutex.Lock() - func() { - peer.mutex.RLock() - defer peer.mutex.RUnlock() - if work.packet == nil { - return - } - if peer.endpoint == nil { - return - } - peer.device.conn.WriteToUDP(work.packet, peer.endpoint) - peer.timer.sendKeepalive.Reset(peer.persistentKeepaliveInterval) - }() - work.mutex.Unlock() +func (peer *Peer) RoutineSequentialSender() { + logger := peer.device.log.Debug + logger.Println("Routine, sequential sender, started for peer", peer.id) + + device := peer.device + + for { + select { + case <-peer.signal.stop: + logger.Println("Routine, sequential sender, stopped for peer", peer.id) + return + case work := <-peer.queue.outbound: + work.mutex.Lock() + func() { + if work.packet == nil { + return + } + + peer.mutex.RLock() + defer peer.mutex.RUnlock() + + if peer.endpoint == nil { + logger.Println("No endpoint for peer:", peer.id) + return + } + + device.net.mutex.RLock() + defer device.net.mutex.RUnlock() + + if device.net.conn == nil { + logger.Println("No source for device") + return + } + + logger.Println("Sending packet for peer", peer.id, work.packet) + + _, err := device.net.conn.WriteToUDP(work.packet, peer.endpoint) + logger.Println("SEND:", peer.endpoint, err) + atomic.AddUint64(&peer.tx_bytes, uint64(len(work.packet))) + + // shift keep-alive timer + + if peer.persistentKeepaliveInterval != 0 { + interval := time.Duration(peer.persistentKeepaliveInterval) * time.Second + peer.timer.sendKeepalive.Reset(interval) + } + }() + work.mutex.Unlock() + } } } -- cgit v1.2.3