diff options
Diffstat (limited to 'device/channels.go')
-rw-r--r-- | device/channels.go | 62 |
1 files changed, 61 insertions, 1 deletions
diff --git a/device/channels.go b/device/channels.go index 4471477..8cd6aee 100644 --- a/device/channels.go +++ b/device/channels.go @@ -5,7 +5,10 @@ package device -import "sync" +import ( + "runtime" + "sync" +) // An outboundQueue is a channel of QueueOutboundElements awaiting encryption. // An outboundQueue is ref-counted using its wg field. @@ -67,3 +70,60 @@ func newHandshakeQueue() *handshakeQueue { }() return q } + +// newAutodrainingInboundQueue returns a channel that will be drained when it gets GC'd. +// It is useful in cases in which is it hard to manage the lifetime of the channel. +// The returned channel must not be closed. Senders should signal shutdown using +// some other means, such as sending a sentinel nil values. +func newAutodrainingInboundQueue(device *Device) chan *QueueInboundElement { + type autodrainingInboundQueue struct { + c chan *QueueInboundElement + } + q := &autodrainingInboundQueue{ + c: make(chan *QueueInboundElement, QueueInboundSize), + } + runtime.SetFinalizer(q, func(q *autodrainingInboundQueue) { + for { + select { + case elem := <-q.c: + if elem == nil { + continue + } + device.PutMessageBuffer(elem.buffer) + device.PutInboundElement(elem) + default: + return + } + } + }) + return q.c +} + +// newAutodrainingOutboundQueue returns a channel that will be drained when it gets GC'd. +// It is useful in cases in which is it hard to manage the lifetime of the channel. +// The returned channel must not be closed. Senders should signal shutdown using +// some other means, such as sending a sentinel nil values. +// All sends to the channel must be best-effort, because there may be no receivers. +func newAutodrainingOutboundQueue(device *Device) chan *QueueOutboundElement { + type autodrainingOutboundQueue struct { + c chan *QueueOutboundElement + } + q := &autodrainingOutboundQueue{ + c: make(chan *QueueOutboundElement, QueueOutboundSize), + } + runtime.SetFinalizer(q, func(q *autodrainingOutboundQueue) { + for { + select { + case elem := <-q.c: + if elem == nil { + continue + } + device.PutMessageBuffer(elem.buffer) + device.PutOutboundElement(elem) + default: + return + } + } + }) + return q.c +} |