diff options
author | Determinant <[email protected]> | 2019-06-28 17:10:49 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2019-06-28 17:10:49 -0400 |
commit | 6d1be18d58a2e6874cddc59e0f01a14e153d1938 (patch) | |
tree | 40cc75352695cd5019538eabd1e818c0b7245244 | |
parent | 1e27eb1b25c2998ede63fc7a138477e1ae4b9508 (diff) |
add MPSCQueue and more
-rw-r--r-- | event.go | 59 | ||||
-rw-r--r-- | msg.go | 10 | ||||
m--------- | salticidae | 0 |
3 files changed, 68 insertions, 1 deletions
@@ -76,7 +76,7 @@ type timerEvent struct { type TimerEvent = *timerEvent // The C function pointer type which takes timerev_t* (the C pointer to -// TimerEvent) and void* (the unsafe pointer to any userdata) as parameter +// TimerEvent) and void* (the unsafe pointer to any userdata) as parameter. type TimerEventCallback = C.timerev_callback_t // Create a TimerEvent handle attached to the given EventContext, with a @@ -163,3 +163,60 @@ func (self SigEvent) Clear() { self.ec.detach(rawptr_t(self.inner)) C.sigev_clear(self.inner) } + +// The C pointer type for a MPSCQueue object. +type CMPSCQueue = *C.mpscqueue_t +type mpscQueue struct { + inner CMPSCQueue + ec EventContext +} + +// The object for a Multi-Producer, Single-Consumer queue. +type MPSCQueue = *mpscQueue + +// The C function pointer type which takes mpscqueue_t* (the C pointer to +// MPSCQueue) and void* (the unsafe pointer to any userdata) as parameter, and +// returns either true (partial read from the queue, so it should be scheduled +// again), or false (the queue is drained, e.g. TryDequeue returns false). +type MPSCQueueCallback = C.mpscqueue_callback_t + +// Create a MPSCQueue object. +func NewMPSCQueue() MPSCQueue { + res := &mpscQueue{ inner: C.mpscqueue_new(), ec: nil } + runtime.SetFinalizer(res, func(self MPSCQueue) { self.free() }) + return res +} + +func (self MPSCQueue) free() { C.mpscqueue_free(self.inner) } + +// Register the callback invoked when there are new elements in the MPSC queue. +func (self MPSCQueue) RegHandler(_ec EventContext, callback MPSCQueueCallback, userdata rawptr_t) { + C.mpscqueue_reg_handler(self.inner, _ec.inner, callback, userdata) + self.ec = _ec + _ec.attach(rawptr_t(self.inner), self) +} + +// Unregister the callback. +func (self MPSCQueue) UnregHandler() { + self.ec.detach(rawptr_t(self.inner)) + C.mpscqueue_unreg_handler(self.inner) +} + +// Enqueue an element (thread-safe). It returns true if successful. If +// unbounded is true the queue is expanded when full (and this function will +// return true). +func (self MPSCQueue) Enqueue(elem rawptr_t, unbounded bool) bool { + return bool(C.mpscqueue_enqueue(self.inner, elem, C.bool(unbounded))) +} + +// Try to dequeue an element from the queue (should only be called from one +// thread). It returns true if successful. +func (self MPSCQueue) TryDequeue(elem *rawptr_t) bool { + return bool(C.mpscqueue_try_dequeue(self.inner, elem)) +} + +// Set the initial capacity of the queue. This should only be called before the +// first dequeue/enqueue operation. +func (self MPSCQueue) SetCapacity(capacity int) { + C.mpscqueue_set_capacity(self.inner, C.size_t(capacity)) +} @@ -43,3 +43,13 @@ func (self Msg) GetPayloadByMove() DataStream { func (self Msg) GetOpcode() Opcode { return Opcode(C.msg_get_opcode(self.inner)) } + +// Get the magic number. +func (self Msg) GetMagic() uint32 { + return uint32(C.msg_get_magic(self.inner)) +} + +// Set the magic number (the default value is 0x0). +func (self Msg) SetMagic(magic uint32) { + C.msg_set_magic(self.inner, C.uint32_t(magic)) +} diff --git a/salticidae b/salticidae -Subproject 06b5a84602dc6a5dd76d82c9f88359cdb8cf52d +Subproject fedfeaced48150e27907628244e95ca54dbe7c0 |