diff options
author | Jason A. Donenfeld <Jason@zx2c4.com> | 2019-07-01 09:39:08 +0200 |
---|---|---|
committer | Jason A. Donenfeld <Jason@zx2c4.com> | 2019-07-01 09:39:08 +0200 |
commit | 647d7b7157b6957b61ebe6be60f49828c025a4d7 (patch) | |
tree | d7ddeb99b49f8b33b15946f106e274252e446c43 /device/send.go | |
parent | 5e6eff81b6f7f18b3dd24bec03ea71f009a3e938 (diff) | |
download | wireguard-go-647d7b7157b6957b61ebe6be60f49828c025a4d7.tar.gz wireguard-go-647d7b7157b6957b61ebe6be60f49828c025a4d7.zip |
device: prepare for multiple send/receive
Diffstat (limited to '')
-rw-r--r-- | device/send.go | 87 |
1 files changed, 55 insertions, 32 deletions
diff --git a/device/send.go b/device/send.go index c4aa5b9..edc58c0 100644 --- a/device/send.go +++ b/device/send.go @@ -160,7 +160,7 @@ func (peer *Peer) SendHandshakeInitiation(isRetry bool) error { peer.timersAnyAuthenticatedPacketTraversal() peer.timersAnyAuthenticatedPacketSent() - err = peer.SendBuffer(packet) + err = peer.SendBuffer(packet, true) if err != nil { peer.device.log.Error.Println(peer, "- Failed to send handshake initiation", err) } @@ -198,7 +198,7 @@ func (peer *Peer) SendHandshakeResponse() error { peer.timersAnyAuthenticatedPacketTraversal() peer.timersAnyAuthenticatedPacketSent() - err = peer.SendBuffer(packet) + err = peer.SendBuffer(packet, true) if err != nil { peer.device.log.Error.Println(peer, "- Failed to send handshake response", err) } @@ -219,7 +219,7 @@ func (device *Device) SendHandshakeCookie(initiatingElem *QueueHandshakeElement) var buff [MessageCookieReplySize]byte writer := bytes.NewBuffer(buff[:0]) binary.Write(writer, binary.LittleEndian, reply) - device.net.bind.Send(writer.Bytes(), initiatingElem.endpoint) + device.net.bind.Send(writer.Bytes(), initiatingElem.endpoint, true) if err != nil { device.log.Error.Println("Failed to send cookie reply:", err) } @@ -541,6 +541,33 @@ func (device *Device) RoutineEncryption() { } } +func (peer *Peer) sendElementStopOrFlush(shouldFlush *bool) (stop bool, elemOk bool, elem *QueueOutboundElement) { + if !*shouldFlush { + select { + case <-peer.routines.stop: + stop = true + return + case elem, elemOk = <-peer.queue.outbound: + return + } + } else { + select { + case <-peer.routines.stop: + stop = true + return + case elem, elemOk = <-peer.queue.outbound: + return + default: + *shouldFlush = false + err := peer.device.net.bind.Flush() + if err != nil { + peer.device.log.Error.Printf("Unable to flush send packets: %v", err) + } + return peer.sendElementStopOrFlush(shouldFlush) + } + } +} + /* Sequentially reads packets from queue and sends to endpoint * * Obs. Single instance per peer. @@ -577,41 +604,37 @@ func (peer *Peer) RoutineSequentialSender() { peer.routines.starting.Done() + shouldFlush := false for { - select { - - case <-peer.routines.stop: + stop, ok, elem := peer.sendElementStopOrFlush(&shouldFlush) + if stop || !ok { return + } - case elem, ok := <-peer.queue.outbound: - - if !ok { - return - } - - elem.Lock() - if elem.IsDropped() { - device.PutOutboundElement(elem) - continue - } - - peer.timersAnyAuthenticatedPacketTraversal() - peer.timersAnyAuthenticatedPacketSent() + elem.Lock() + if elem.IsDropped() { + device.PutOutboundElement(elem) + continue + } - // send message and return buffer to pool + peer.timersAnyAuthenticatedPacketTraversal() + peer.timersAnyAuthenticatedPacketSent() - err := peer.SendBuffer(elem.packet) - if len(elem.packet) != MessageKeepaliveSize { - peer.timersDataSent() - } - device.PutMessageBuffer(elem.buffer) - device.PutOutboundElement(elem) - if err != nil { - logError.Println(peer, "- Failed to send data packet", err) - continue - } + // send message and return buffer to pool - peer.keepKeyFreshSending() + err := peer.SendBuffer(elem.packet, false) + if len(elem.packet) != MessageKeepaliveSize { + peer.timersDataSent() } + device.PutMessageBuffer(elem.buffer) + device.PutOutboundElement(elem) + if err != nil { + logError.Println(peer, "- Failed to send data packet", err) + continue + } else { + shouldFlush = true + } + + peer.keepKeyFreshSending() } } |