From 8c34c4cbb3780c433148966a004f5a51aace0f64 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Fri, 4 Aug 2017 16:15:53 +0200 Subject: First set of code review patches --- src/receive.go | 44 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 9 deletions(-) (limited to 'src/receive.go') diff --git a/src/receive.go b/src/receive.go index 700b894..fb5c51f 100644 --- a/src/receive.go +++ b/src/receive.go @@ -73,6 +73,8 @@ func (device *Device) addToHandshakeQueue( } /* Routine determining the busy state of the interface + * + * TODO: Under load for some time */ func (device *Device) RoutineBusyMonitor() { samples := 0 @@ -131,6 +133,7 @@ func (device *Device) RoutineReceiveIncomming() { buffer = device.GetMessageBuffer() } + // TODO: Take writelock to sleep device.net.mutex.RLock() conn := device.net.conn device.net.mutex.RUnlock() @@ -139,6 +142,7 @@ func (device *Device) RoutineReceiveIncomming() { continue } + // TODO: Wait for new conn or message conn.SetReadDeadline(time.Now().Add(time.Second)) size, raddr, err := conn.ReadFromUDP(buffer[:]) @@ -156,6 +160,8 @@ func (device *Device) RoutineReceiveIncomming() { case MessageInitiationType, MessageResponseType: + // TODO: Check size early + // add to handshake queue device.addToHandshakeQueue( @@ -171,6 +177,8 @@ func (device *Device) RoutineReceiveIncomming() { case MessageCookieReplyType: + // TODO: Queue all the things + // verify and update peer cookie state if len(packet) != MessageCookieReplySize { @@ -250,7 +258,7 @@ func (device *Device) RoutineDecryption() { // check if dropped if elem.IsDropped() { - elem.mutex.Unlock() + elem.mutex.Unlock() // TODO: Make consistent with send continue } @@ -318,6 +326,7 @@ func (device *Device) RoutineHandshake() { logError.Println("Failed to create cookie reply:", err) return } + // TODO: Use temp writer := bytes.NewBuffer(elem.packet[:0]) binary.Write(writer, binary.LittleEndian, reply) elem.packet = writer.Bytes() @@ -330,6 +339,8 @@ func (device *Device) RoutineHandshake() { // ratelimit + // TODO: Only ratelimit when busy + if !device.ratelimiter.Allow(elem.source.IP) { return } @@ -364,9 +375,14 @@ func (device *Device) RoutineHandshake() { ) return } - peer.TimerPacketReceived() + + // update timers + + peer.TimerAnyAuthenticatedPacketTraversal() + peer.TimerAnyAuthenticatedPacketReceived() // update endpoint + // TODO: Add a race condition \s peer.mutex.Lock() peer.endpoint = elem.source @@ -381,6 +397,7 @@ func (device *Device) RoutineHandshake() { } peer.TimerEphemeralKeyCreated() + peer.NewKeyPair() logDebug.Println("Creating response message for", peer.String()) @@ -392,8 +409,7 @@ func (device *Device) RoutineHandshake() { // send response peer.SendBuffer(packet) - peer.TimerPacketSent() - peer.NewKeyPair() + peer.TimerAnyAuthenticatedPacketTraversal() case MessageResponseType: @@ -423,8 +439,14 @@ func (device *Device) RoutineHandshake() { return } - peer.TimerPacketReceived() + // update timers + + peer.TimerAnyAuthenticatedPacketTraversal() + peer.TimerAnyAuthenticatedPacketReceived() peer.TimerHandshakeComplete() + + // derive key-pair + peer.NewKeyPair() peer.SendKeepAlive() @@ -467,8 +489,8 @@ func (peer *Peer) RoutineSequentialReceiver() { return } - peer.TimerPacketReceived() - peer.TimerTransportReceived() + peer.TimerAnyAuthenticatedPacketTraversal() + peer.TimerAnyAuthenticatedPacketReceived() peer.KeepKeyFreshReceiving() // check if using new key-pair @@ -504,6 +526,7 @@ func (peer *Peer) RoutineSequentialReceiver() { field := elem.packet[IPv4offsetTotalLength : IPv4offsetTotalLength+2] length := binary.BigEndian.Uint16(field) + // TODO: check length of packet & NOT TOO SMALL either elem.packet = elem.packet[:length] // verify IPv4 source @@ -525,6 +548,7 @@ func (peer *Peer) RoutineSequentialReceiver() { field := elem.packet[IPv6offsetPayloadLength : IPv6offsetPayloadLength+2] length := binary.BigEndian.Uint16(field) length += ipv6.HeaderLen + // TODO: check length of packet elem.packet = elem.packet[:length] // verify IPv6 source @@ -542,11 +566,13 @@ func (peer *Peer) RoutineSequentialReceiver() { atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet))) device.addToInboundQueue(device.queue.inbound, elem) + + // TODO: move TUN write into per peer routine }() } } -func (device *Device) RoutineWriteToTUN(tun TUNDevice) { +func (device *Device) RoutineWriteToTUN() { logError := device.log.Error logDebug := device.log.Debug @@ -557,7 +583,7 @@ func (device *Device) RoutineWriteToTUN(tun TUNDevice) { case <-device.signal.stop: return case elem := <-device.queue.inbound: - _, err := tun.Write(elem.packet) + _, err := device.tun.Write(elem.packet) device.PutMessageBuffer(elem.buffer) if err != nil { logError.Println("Failed to write packet to TUN device:", err) -- cgit v1.2.3