From 6d1be18d58a2e6874cddc59e0f01a14e153d1938 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 28 Jun 2019 17:10:49 -0400 Subject: add MPSCQueue and more --- event.go | 59 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) (limited to 'event.go') diff --git a/event.go b/event.go index 893d5e0..3923dc8 100644 --- a/event.go +++ b/event.go @@ -76,7 +76,7 @@ type timerEvent struct { type TimerEvent = *timerEvent // The C function pointer type which takes timerev_t* (the C pointer to -// TimerEvent) and void* (the unsafe pointer to any userdata) as parameter +// TimerEvent) and void* (the unsafe pointer to any userdata) as parameter. type TimerEventCallback = C.timerev_callback_t // Create a TimerEvent handle attached to the given EventContext, with a @@ -163,3 +163,60 @@ func (self SigEvent) Clear() { self.ec.detach(rawptr_t(self.inner)) C.sigev_clear(self.inner) } + +// The C pointer type for a MPSCQueue object. +type CMPSCQueue = *C.mpscqueue_t +type mpscQueue struct { + inner CMPSCQueue + ec EventContext +} + +// The object for a Multi-Producer, Single-Consumer queue. +type MPSCQueue = *mpscQueue + +// The C function pointer type which takes mpscqueue_t* (the C pointer to +// MPSCQueue) and void* (the unsafe pointer to any userdata) as parameter, and +// returns either true (partial read from the queue, so it should be scheduled +// again), or false (the queue is drained, e.g. TryDequeue returns false). +type MPSCQueueCallback = C.mpscqueue_callback_t + +// Create a MPSCQueue object. +func NewMPSCQueue() MPSCQueue { + res := &mpscQueue{ inner: C.mpscqueue_new(), ec: nil } + runtime.SetFinalizer(res, func(self MPSCQueue) { self.free() }) + return res +} + +func (self MPSCQueue) free() { C.mpscqueue_free(self.inner) } + +// Register the callback invoked when there are new elements in the MPSC queue. +func (self MPSCQueue) RegHandler(_ec EventContext, callback MPSCQueueCallback, userdata rawptr_t) { + C.mpscqueue_reg_handler(self.inner, _ec.inner, callback, userdata) + self.ec = _ec + _ec.attach(rawptr_t(self.inner), self) +} + +// Unregister the callback. +func (self MPSCQueue) UnregHandler() { + self.ec.detach(rawptr_t(self.inner)) + C.mpscqueue_unreg_handler(self.inner) +} + +// Enqueue an element (thread-safe). It returns true if successful. If +// unbounded is true the queue is expanded when full (and this function will +// return true). +func (self MPSCQueue) Enqueue(elem rawptr_t, unbounded bool) bool { + return bool(C.mpscqueue_enqueue(self.inner, elem, C.bool(unbounded))) +} + +// Try to dequeue an element from the queue (should only be called from one +// thread). It returns true if successful. +func (self MPSCQueue) TryDequeue(elem *rawptr_t) bool { + return bool(C.mpscqueue_try_dequeue(self.inner, elem)) +} + +// Set the initial capacity of the queue. This should only be called before the +// first dequeue/enqueue operation. +func (self MPSCQueue) SetCapacity(capacity int) { + C.mpscqueue_set_capacity(self.inner, C.size_t(capacity)) +} -- cgit v1.2.3