From 6d1be18d58a2e6874cddc59e0f01a14e153d1938 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 28 Jun 2019 17:10:49 -0400 Subject: add MPSCQueue and more --- event.go | 59 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- msg.go | 10 ++++++++++ salticidae | 2 +- 3 files changed, 69 insertions(+), 2 deletions(-) diff --git a/event.go b/event.go index 893d5e0..3923dc8 100644 --- a/event.go +++ b/event.go @@ -76,7 +76,7 @@ type timerEvent struct { type TimerEvent = *timerEvent // The C function pointer type which takes timerev_t* (the C pointer to -// TimerEvent) and void* (the unsafe pointer to any userdata) as parameter +// TimerEvent) and void* (the unsafe pointer to any userdata) as parameter. type TimerEventCallback = C.timerev_callback_t // Create a TimerEvent handle attached to the given EventContext, with a @@ -163,3 +163,60 @@ func (self SigEvent) Clear() { self.ec.detach(rawptr_t(self.inner)) C.sigev_clear(self.inner) } + +// The C pointer type for a MPSCQueue object. +type CMPSCQueue = *C.mpscqueue_t +type mpscQueue struct { + inner CMPSCQueue + ec EventContext +} + +// The object for a Multi-Producer, Single-Consumer queue. +type MPSCQueue = *mpscQueue + +// The C function pointer type which takes mpscqueue_t* (the C pointer to +// MPSCQueue) and void* (the unsafe pointer to any userdata) as parameter, and +// returns either true (partial read from the queue, so it should be scheduled +// again), or false (the queue is drained, e.g. TryDequeue returns false). +type MPSCQueueCallback = C.mpscqueue_callback_t + +// Create a MPSCQueue object. +func NewMPSCQueue() MPSCQueue { + res := &mpscQueue{ inner: C.mpscqueue_new(), ec: nil } + runtime.SetFinalizer(res, func(self MPSCQueue) { self.free() }) + return res +} + +func (self MPSCQueue) free() { C.mpscqueue_free(self.inner) } + +// Register the callback invoked when there are new elements in the MPSC queue. +func (self MPSCQueue) RegHandler(_ec EventContext, callback MPSCQueueCallback, userdata rawptr_t) { + C.mpscqueue_reg_handler(self.inner, _ec.inner, callback, userdata) + self.ec = _ec + _ec.attach(rawptr_t(self.inner), self) +} + +// Unregister the callback. +func (self MPSCQueue) UnregHandler() { + self.ec.detach(rawptr_t(self.inner)) + C.mpscqueue_unreg_handler(self.inner) +} + +// Enqueue an element (thread-safe). It returns true if successful. If +// unbounded is true the queue is expanded when full (and this function will +// return true). +func (self MPSCQueue) Enqueue(elem rawptr_t, unbounded bool) bool { + return bool(C.mpscqueue_enqueue(self.inner, elem, C.bool(unbounded))) +} + +// Try to dequeue an element from the queue (should only be called from one +// thread). It returns true if successful. +func (self MPSCQueue) TryDequeue(elem *rawptr_t) bool { + return bool(C.mpscqueue_try_dequeue(self.inner, elem)) +} + +// Set the initial capacity of the queue. This should only be called before the +// first dequeue/enqueue operation. +func (self MPSCQueue) SetCapacity(capacity int) { + C.mpscqueue_set_capacity(self.inner, C.size_t(capacity)) +} diff --git a/msg.go b/msg.go index 0e3434e..d1ee1bb 100644 --- a/msg.go +++ b/msg.go @@ -43,3 +43,13 @@ func (self Msg) GetPayloadByMove() DataStream { func (self Msg) GetOpcode() Opcode { return Opcode(C.msg_get_opcode(self.inner)) } + +// Get the magic number. +func (self Msg) GetMagic() uint32 { + return uint32(C.msg_get_magic(self.inner)) +} + +// Set the magic number (the default value is 0x0). +func (self Msg) SetMagic(magic uint32) { + C.msg_set_magic(self.inner, C.uint32_t(magic)) +} diff --git a/salticidae b/salticidae index 06b5a84..fedfeac 160000 --- a/salticidae +++ b/salticidae @@ -1 +1 @@ -Subproject commit 06b5a84602dc6a5dd76d82c9f88359cdb8cf52da +Subproject commit fedfeaced48150e27907628244e95ca54dbe7c02 -- cgit v1.2.3