aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile14
-rw-r--r--event.go48
-rw-r--r--msg.go23
-rw-r--r--netaddr.go17
-rw-r--r--network.go155
m---------salticidae0
-rw-r--r--stream.go97
-rw-r--r--test_msgnet/main.go12
-rw-r--r--test_p2p_stress/main.go8
9 files changed, 320 insertions, 54 deletions
diff --git a/Makefile b/Makefile
index a7a5d47..4e2b0cb 100644
--- a/Makefile
+++ b/Makefile
@@ -1,23 +1,23 @@
-.PHONY: all clean run_make
+.PHONY: all clean
all: build/test_msgnet build/test_p2p_stress
-run_make:
- make -C salticidae/
salticidae/libsalticidae.so:
cd salticidae/; cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED=ON -DSALTICIDAE_DEBUG_LOG=OFF -DSALTICIDAE_CBINDINGS=ON -DBUILD_TEST=OFF ./
- make -C salticidae/
+ make -C salticidae/ -j4
build:
mkdir -p build
-build/test_msgnet: salticidae/libsalticidae.so test_msgnet/main.go run_make
+build/test_msgnet: salticidae/libsalticidae.so test_msgnet/main.go
+ make -C salticidae/
go build -o $@ github.com/Determinant/salticidae-go/test_msgnet
-build/test_p2p_stress: salticidae/libsalticidae.so test_p2p_stress/main.go run_make
+build/test_p2p_stress: salticidae/libsalticidae.so test_p2p_stress/main.go
+ make -C salticidae/
go build -o $@ github.com/Determinant/salticidae-go/test_p2p_stress
clean:
- rm -r build/
+ rm -rf build/
cd salticidae/; make clean
rm salticidae/CMakeCache.txt
diff --git a/event.go b/event.go
index 7ebc643..893d5e0 100644
--- a/event.go
+++ b/event.go
@@ -5,11 +5,13 @@ package salticidae
import "C"
import "runtime"
+// The C pointer type for an EventContext handle.
type CEventContext = *C.eventcontext_t
type eventContext struct {
inner CEventContext
attached map[uintptr]interface{}
}
+// The handle for an event loop.
type EventContext = *eventContext
func NewEventContext() EventContext {
@@ -24,15 +26,32 @@ func NewEventContext() EventContext {
func (self EventContext) attach(ptr rawptr_t, x interface{}) { self.attached[uintptr(ptr)] = x }
func (self EventContext) detach(ptr rawptr_t) { delete(self.attached, uintptr(ptr)) }
func (self EventContext) free() { C.eventcontext_free(self.inner) }
+
+// Start the event loop. This is a blocking call that will hand over the
+// control flow to the infinite loop which triggers callbacks upon new events.
+// The function will return when Stop() is called.
func (self EventContext) Dispatch() { C.eventcontext_dispatch(self.inner) }
+
+// Stop the event loop. This function is typically called in a callback. Notice
+// that all methods of an EventContext should be invoked by the same thread
+// which logically owns the loop. To schedule code executed in the event loop
+// of a particular thread, see ThreadCall.
func (self EventContext) Stop() { C.eventcontext_stop(self.inner) }
+// The C pointer type for a ThreadCall handle.
type CThreadCall = *C.threadcall_t
type threadCall struct { inner CThreadCall }
+// The handle for scheduling a function call executed by a particular
+// EventContext event loop.
type ThreadCall = *threadCall
+// The C function pointer type which takes threadcall_handle_t* and void*
+// (passing in the custom user data allocated by C.malloc) as parameters.
type ThreadCallCallback = C.threadcall_callback_t
+// Create a ThreadCall handle attached to the given EventContext. Each
+// invokcation of AsyncCall() will schedule a function call executed in the
+// given EventContext event loop.
func NewThreadCall(ec EventContext) ThreadCall {
res := &threadCall{ inner: C.threadcall_new(ec.inner) }
runtime.SetFinalizer(res, func(self ThreadCall) { self.free() })
@@ -41,19 +60,27 @@ func NewThreadCall(ec EventContext) ThreadCall {
func (self ThreadCall) free() { C.threadcall_free(self.inner) }
+// Schedule a function to be executed in the target event loop.
func (self ThreadCall) AsyncCall(callback ThreadCallCallback, userdata rawptr_t) {
C.threadcall_async_call(self.inner, callback, userdata)
}
+// The C pointer type for TimerEvent handle.
type CTimerEvent = *C.timerev_t
type timerEvent struct {
inner CTimerEvent
ec EventContext
}
+
+// The handle for a timed event.
type TimerEvent = *timerEvent
+// The C function pointer type which takes timerev_t* (the C pointer to
+// TimerEvent) and void* (the unsafe pointer to any userdata) as parameter
type TimerEventCallback = C.timerev_callback_t
+// Create a TimerEvent handle attached to the given EventContext, with a
+// registered callback.
func NewTimerEvent(_ec EventContext, cb TimerEventCallback, userdata rawptr_t) TimerEvent {
res := &timerEvent{
inner: C.timerev_new(_ec.inner, cb, userdata),
@@ -65,26 +92,38 @@ func NewTimerEvent(_ec EventContext, cb TimerEventCallback, userdata rawptr_t) T
}
func (self TimerEvent) free() { C.timerev_free(self.inner) }
+
+// Change the callback.
func (self TimerEvent) SetCallback(callback TimerEventCallback, userdata rawptr_t) {
C.timerev_set_callback(self.inner, callback, userdata)
}
+// Schedule the timer to go off after t_sec seconds.
func (self TimerEvent) Add(t_sec float64) { C.timerev_add(self.inner, C.double(t_sec)) }
+
+
+// Unschedule the timer if it was scheduled. The timer could still be rescheduled
+// by calling Add() afterwards.
func (self TimerEvent) Del() {
self.ec.detach(rawptr_t(self.inner))
C.timerev_del(self.inner)
}
+// Empty the timer. It will be unscheduled and deallocated and its methods
+// should never be called again.
func (self TimerEvent) Clear() {
self.ec.detach(rawptr_t(self.inner))
C.timerev_clear(self.inner)
}
+// The C pointer type for a SigEvent.
type CSigEvent = *C.sigev_t
type sigEvent struct {
inner CSigEvent
ec EventContext
}
+
+// The handle for a UNIX signal event.
type SigEvent = *sigEvent
type SigEventCallback = C.sigev_callback_t
@@ -94,6 +133,8 @@ var (
SIGINT = C.SIGINT
)
+// Create a SigEvent handle attached to the given EventContext, with a
+// registered callback.
func NewSigEvent(_ec EventContext, cb SigEventCallback, userdata rawptr_t) SigEvent {
res := &sigEvent{
inner: C.sigev_new(_ec.inner, cb, userdata),
@@ -105,12 +146,19 @@ func NewSigEvent(_ec EventContext, cb SigEventCallback, userdata rawptr_t) SigEv
}
func (self SigEvent) free() { C.sigev_free(self.inner) }
+
+// Register the handle to listen for UNIX signal sig.
func (self SigEvent) Add(sig int) { C.sigev_add(self.inner, C.int(sig)) }
+
+
+// Unregister the handle. The handle may be re-registered in the future.
func (self SigEvent) Del() {
self.ec.detach(rawptr_t(self.inner))
C.sigev_del(self.inner)
}
+// Unregister the handle. Any methods of the handle should no longer be used
+// and the handle will be deallocated.
func (self SigEvent) Clear() {
self.ec.detach(rawptr_t(self.inner))
C.sigev_clear(self.inner)
diff --git a/msg.go b/msg.go
index 0f0267d..0e3434e 100644
--- a/msg.go
+++ b/msg.go
@@ -5,26 +5,41 @@ package salticidae
import "C"
import "runtime"
+// The C pointer type for a Msg object
type CMsg = *C.msg_t
type msg struct { inner CMsg }
+// Message sent by MsgNetwork
type Msg = *msg
+// Convert an existing C pointer into a go object. Notice that when the go
+// object does *not* own the resource of the C pointer, so it is only valid to
+// the extent in which the given C pointer is valid. The C memory will not be
+// deallocated when the go object is finalized by GC. This applies to all other
+// "FromC" functions.
func MsgFromC(ptr *C.msg_t) Msg { return &msg{ inner: ptr } }
-func NewMsgMovedFromByteArray(opcode Opcode, _moved_payload ByteArray) Msg {
- res := &msg{ inner: C.msg_new_moved_from_bytearray(C._opcode_t(opcode), _moved_payload.inner) }
+// Create a message by taking out all data from src. Notice this is a zero-copy
+// operation that consumes and invalidates the data in src ("move" semantics)
+// so that no more operation should be done to src after this function call.
+func NewMsgMovedFromByteArray(opcode Opcode, src ByteArray) Msg {
+ res := &msg{ inner: C.msg_new_moved_from_bytearray(C._opcode_t(opcode), src.inner) }
runtime.SetFinalizer(res, func(self Msg) { self.free() })
return res
}
func (self Msg) free() { C.msg_free(self.inner) }
-func (self Msg) ConsumePayload() DataStream {
- res := &dataStream{ inner: C.msg_consume_payload(self.inner) }
+// Get the message payload by taking out all data. Notice this is a zero-copy
+// operation that consumes and invalidates the data in the payload ("move"
+// semantics) so that no more operation should be done to the payload after
+// this function call.
+func (self Msg) GetPayloadByMove() DataStream {
+ res := DataStreamFromC(C.msg_consume_payload(self.inner))
runtime.SetFinalizer(res, func(self DataStream) { self.free() })
return res
}
+// Get the opcode.
func (self Msg) GetOpcode() Opcode {
return Opcode(C.msg_get_opcode(self.inner))
}
diff --git a/netaddr.go b/netaddr.go
index 4bd7ebb..070bbe2 100644
--- a/netaddr.go
+++ b/netaddr.go
@@ -5,10 +5,17 @@ package salticidae
import "C"
import "runtime"
+// The C pointer type for a NetAddr object
type CNetAddr = *C.netaddr_t
type netAddr struct { inner CNetAddr }
+// Network address object.
type NetAddr = *netAddr
+// Convert an existing C pointer into a go object. Notice that when the go
+// object does *not* own the resource of the C pointer, so it is only valid to
+// the extent in which the given C pointer is valid. The C memory will not be
+// deallocated when the go object is finalized by GC. This applies to all other
+// "FromC" functions.
func NetAddrFromC(ptr CNetAddr) NetAddr {
return &netAddr{ inner: ptr }
}
@@ -17,8 +24,10 @@ type netAddrArray struct {
inner *C.netaddr_array_t
}
+// An array of network address.
type NetAddrArray = *netAddrArray
+// Create NetAddr from a TCP socket format string (e.g. 127.0.0.1:8888).
func NewAddrFromIPPortString(addr string) (res NetAddr) {
c_str := C.CString(addr)
res = &netAddr{ inner: C.netaddr_new_from_sipport(c_str) }
@@ -27,6 +36,7 @@ func NewAddrFromIPPortString(addr string) (res NetAddr) {
return
}
+// Convert a Go slice of net addresses to NetAddrArray.
func NewAddrArrayFromAddrs(arr []NetAddr) (res NetAddrArray) {
size := len(arr)
if size > 0 {
@@ -43,14 +53,21 @@ func NewAddrArrayFromAddrs(arr []NetAddr) (res NetAddrArray) {
func (self NetAddr) free() { C.netaddr_free(self.inner) }
+// Check if two addresses are the same.
func (self NetAddr) IsEq(other NetAddr) bool { return bool(C.netaddr_is_eq(self.inner, other.inner)) }
func (self NetAddr) IsNull() bool { return bool(C.netaddr_is_null(self.inner)) }
+// Get the 32-bit IP representation.
func (self NetAddr) GetIP() uint32 { return uint32(C.netaddr_get_ip(self.inner)) }
+// Get the 16-bit port number (in UNIX network byte order, so need to apply
+// ntohs(), for example, to convert the returned integer to the local endianness).
func (self NetAddr) GetPort() uint16 { return uint16(C.netaddr_get_port(self.inner)) }
+// Make a copy of the object. This is required if you want to keep the NetAddr
+// returned (or passed as a callback parameter) by other salticidae methods
+// (such like MsgNetwork/PeerNetwork), unless those method return a moved object.
func (self NetAddr) Copy() NetAddr {
res := &netAddr{ inner: C.netaddr_copy(self.inner) }
runtime.SetFinalizer(res, func(self NetAddr) { self.free() })
diff --git a/network.go b/network.go
index 04d653b..0440326 100644
--- a/network.go
+++ b/network.go
@@ -4,52 +4,82 @@ package salticidae
import "C"
import "runtime"
+// The C pointer type for a MsgNetwork handle.
type CMsgNetwork = *C.msgnetwork_t
type msgNetwork struct { inner CMsgNetwork }
+// The handle for a message network.
type MsgNetwork = *msgNetwork
+// Convert an existing C pointer into a go object. Notice that when the go
+// object does *not* own the resource of the C pointer, so it is only valid to
+// the extent in which the given C pointer is valid. The C memory will not be
+// deallocated when the go object is finalized by GC. This applies to all other
+// "FromC" functions.
func MsgNetworkFromC(ptr CMsgNetwork) MsgNetwork {
return &msgNetwork{ inner: ptr }
}
+// The C pointer type for a MsgNetworkConn handle.
type CMsgNetworkConn = *C.msgnetwork_conn_t
type msgNetworkConn struct { inner CMsgNetworkConn }
+// The handle for a message network connection.
type MsgNetworkConn = *msgNetworkConn
func MsgNetworkConnFromC(ptr CMsgNetworkConn) MsgNetworkConn {
return &msgNetworkConn{ inner: ptr }
}
+var (
+ CONN_MODE_ACTIVE = MsgNetworkConnMode(C.CONN_MODE_ACTIVE)
+ CONN_MODE_PASSIVE = MsgNetworkConnMode(C.CONN_MODE_PASSIVE)
+ CONN_MODE_DEAD = MsgNetworkConnMode(C.CONN_MODE_DEAD)
+)
+
+// The connection mode. CONN_MODE_ACTIVE: a connection established from the
+// local side. CONN_MODE_PASSIVE: a connection established from the remote
+// side. CONN_MODE_DEAD: a connection that is already closed.
type MsgNetworkConnMode = C.msgnetwork_conn_mode_t
func (self MsgNetworkConn) free() { C.msgnetwork_conn_free(self.inner) }
+// Get the corresponding MsgNetwork handle that manages this connection. The
+// returned handle is only valid during the lifetime of this connection.
func (self MsgNetworkConn) GetNet() MsgNetwork {
return MsgNetworkFromC(C.msgnetwork_conn_get_net(self.inner))
}
-var (
- CONN_MODE_ACTIVE = MsgNetworkConnMode(C.CONN_MODE_ACTIVE)
- CONN_MODE_PASSIVE = MsgNetworkConnMode(C.CONN_MODE_PASSIVE)
- CONN_MODE_DEAD = MsgNetworkConnMode(C.CONN_MODE_DEAD)
-)
-
func (self MsgNetworkConn) GetMode() MsgNetworkConnMode {
return C.msgnetwork_conn_get_mode(self.inner)
}
+// Get the address of the remote end of this connection. Use Copy() to make a
+// copy of the address if you want to use the address object beyond the
+// lifetime of the connection.
func (self MsgNetworkConn) GetAddr() NetAddr {
return NetAddrFromC(C.msgnetwork_conn_get_addr(self.inner))
}
+// Make a copy of the object. This is required if you want to keep the
+// connection passed as a callback parameter by other salticidae methods (such
+// like MsgNetwork/PeerNetwork).
+func (self MsgNetworkConn) Copy() MsgNetworkConn {
+ res := MsgNetworkConnFromC(C.msgnetwork_conn_copy(self.inner))
+ runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.free() })
+ return res
+}
+
+
+// The C pointer type for a MsgNetworkConfig object.
type CMsgNetworkConfig = *C.msgnetwork_config_t
type msgNetworkConfig struct { inner CMsgNetworkConfig }
+// Configuration for MsgNetwork.
type MsgNetworkConfig = *msgNetworkConfig
func MsgNetworkConfigFromC(ptr CMsgNetworkConfig) MsgNetworkConfig {
return &msgNetworkConfig{ inner: ptr }
}
+// Create the configuration object with default settings.
func NewMsgNetworkConfig() MsgNetworkConfig {
res := MsgNetworkConfigFromC(C.msgnetwork_config_new())
runtime.SetFinalizer(res, func(self MsgNetworkConfig) { self.free() })
@@ -58,30 +88,39 @@ func NewMsgNetworkConfig() MsgNetworkConfig {
func (self MsgNetworkConfig) free() { C.msgnetwork_config_free(self.inner) }
+// Set the number of consecutive read attempts in the message delivery queue.
+// Usually the default value is good enough. This is used to make the tradeoff
+// between the event loop fairness and the amortization of syscall cost.
func (self MsgNetworkConfig) BurstSize(size int) {
C.msgnetwork_config_burst_size(self.inner, C.size_t(size))
}
+// Maximum backlogs (see POSIX TCP backlog).
func (self MsgNetworkConfig) MaxListenBacklog(backlog int) {
C.msgnetwork_config_max_listen_backlog(self.inner, C.int(backlog))
}
+// The timeout for connecting to the remote (in seconds).
func (self MsgNetworkConfig) ConnServerTimeout(timeout float64) {
C.msgnetwork_config_conn_server_timeout(self.inner, C.double(timeout))
}
+// The size for an inbound data chunk (per read() syscall).
func (self MsgNetworkConfig) SegBuffSize(size int) {
C.msgnetwork_config_seg_buff_size(self.inner, C.size_t(size))
}
+// The number of worker threads.
func (self MsgNetworkConfig) NWorker(nworker int) {
C.msgnetwork_config_nworker(self.inner, C.size_t(nworker))
}
+// The capacity of the send buffer.
func (self MsgNetworkConfig) QueueCapacity(capacity int) {
C.msgnetwork_config_queue_capacity(self.inner, C.size_t(capacity))
}
+// Create a message network handle which is attached to given event loop.
func NewMsgNetwork(ec EventContext, config MsgNetworkConfig) MsgNetwork {
res := MsgNetworkFromC(C.msgnetwork_new(ec.inner, config.inner))
ec.attach(rawptr_t(res.inner), res)
@@ -90,81 +129,110 @@ func NewMsgNetwork(ec EventContext, config MsgNetworkConfig) MsgNetwork {
}
func (self MsgNetwork) free() { C.msgnetwork_free(self.inner) }
-func (self MsgNetwork) Listen(addr NetAddr, err *Error) { C.msgnetwork_listen(self.inner, addr.inner, err) }
+
+// Start the message network (by spawning worker threads). This should be
+// called before using any other methods.
func (self MsgNetwork) Start() { C.msgnetwork_start(self.inner) }
+
+// Listen to the specified network address.
+func (self MsgNetwork) Listen(addr NetAddr, err *Error) { C.msgnetwork_listen(self.inner, addr.inner, err) }
+
+// Stop the message network. No other methods should be called after this.
func (self MsgNetwork) Stop() { C.msgnetwork_stop(self.inner) }
+// Send a message through the given connection.
func (self MsgNetwork) SendMsg(msg Msg, conn MsgNetworkConn) {
C.msgnetwork_send_msg(self.inner, msg.inner, conn.inner)
}
+// Send a message through the given connection, using a worker thread to
+// seralize and put data to the send buffer. The payload contained in the given
+// msg will be moved and sent. Thus, no methods of msg involving the payload
+// should be called afterwards.
func (self MsgNetwork) SendMsgDeferredByMove(msg Msg, conn MsgNetworkConn) {
C.msgnetwork_send_msg_deferred_by_move(self.inner, msg.inner, conn.inner)
}
+// Try to connect to a remote address. The connection handle is returned. The
+// returned connection handle could be kept in your program.
func (self MsgNetwork) Connect(addr NetAddr, err *Error) MsgNetworkConn {
res := MsgNetworkConnFromC(C.msgnetwork_connect(self.inner, addr.inner, err))
runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.free() })
return res
}
+
+// Terminate the given connection.
func (self MsgNetwork) Terminate(conn MsgNetworkConn) { C.msgnetwork_terminate(self.inner, conn.inner) }
+// The C function pointer type which takes msg_t*, msgnetwork_conn_t* and void*
+// (passing in the custom user data allocated by C.malloc) as parameters.
+type MsgNetworkMsgCallback = C.msgnetwork_msg_callback_t
+// The C function pointer type which takes msgnetwork_conn_t*, bool (true for
+// the connection is established, false for the connection is terminated) and
+// void* as parameters.
+type MsgNetworkConnCallback = C.msgnetwork_conn_callback_t
+// The C function Pointer type which takes SalticidaeCError* and void* as parameters.
+type MsgNetworkErrorCallback = C.msgnetwork_error_callback_t
+// Register a message handler for the type of message identified by opcode. The
+// callback function will be invoked upon the delivery of each message with the
+// given opcode, by the thread of the event loop the MsgNetwork is attached to.
func (self MsgNetwork) RegHandler(opcode Opcode, callback MsgNetworkMsgCallback, userdata rawptr_t) {
C.msgnetwork_reg_handler(self.inner, C._opcode_t(opcode), callback, userdata)
}
+// Register a connection handler invoked when the connection state is changed.
func (self MsgNetwork) RegConnHandler(callback MsgNetworkConnCallback, userdata rawptr_t) {
C.msgnetwork_reg_conn_handler(self.inner, callback, userdata)
}
+// Register an error handler invoked when there is recoverable errors during any
+// asynchronous call/execution inside the MsgNetwork.
func (self MsgNetwork) RegErrorHandler(callback MsgNetworkErrorCallback, userdata rawptr_t) {
C.msgnetwork_reg_error_handler(self.inner, callback, userdata)
}
-func (self MsgNetworkConn) Copy() MsgNetworkConn {
- res := MsgNetworkConnFromC(C.msgnetwork_conn_copy(self.inner))
- runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.free() })
- return res
-}
-func (self MsgNetworkConn) Free() { C.msgnetwork_conn_free(self.inner) }
-
-type MsgNetworkMsgCallback = C.msgnetwork_msg_callback_t
-type MsgNetworkConnCallback = C.msgnetwork_conn_callback_t
-type MsgNetworkErrorCallback = C.msgnetwork_error_callback_t
-
+// The C pointer type for a PeerNetwork handle.
type CPeerNetwork = *C.peernetwork_t
type peerNetwork struct { inner CPeerNetwork }
+// The handle for a peer-to-peer network.
type PeerNetwork = *peerNetwork
func PeerNetworkFromC(ptr CPeerNetwork) PeerNetwork {
return &peerNetwork{ inner: ptr }
}
-
+// The C pointer type for a PeerNetworkConn handle.
type CPeerNetworkConn = *C.peernetwork_conn_t
type peerNetworkConn struct { inner CPeerNetworkConn }
+// The handle for a PeerNetwork connection.
type PeerNetworkConn = *peerNetworkConn
func PeerNetworkConnFromC(ptr CPeerNetworkConn) PeerNetworkConn {
return &peerNetworkConn{ inner: ptr }
}
-type PeerNetworkIdMode = C.peernetwork_id_mode_t
-
var (
ID_MODE_IP_BASED = PeerNetworkIdMode(C.ID_MODE_IP_BASED)
ID_MODE_IP_PORT_BASED = PeerNetworkIdMode(C.ID_MODE_IP_PORT_BASED)
)
+// The identity mode. ID_MODE_IP_BASED: a remote peer is identified by the IP
+// only. ID_MODE_IP_PORT_BASED: a remote peer is identified by IP + port
+// number.
+type PeerNetworkIdMode = C.peernetwork_id_mode_t
+
+// The C pointer type for a PeerNetworkConfig handle.
type CPeerNetworkConfig = *C.peernetwork_config_t
type peerNetworkConfig struct { inner CPeerNetworkConfig }
+// Configuration for PeerNetwork.
type PeerNetworkConfig = *peerNetworkConfig
func PeerNetworkConfigFromC(ptr CPeerNetworkConfig) PeerNetworkConfig {
return &peerNetworkConfig{ inner: ptr }
}
+// Create the configuration object with default settings.
func NewPeerNetworkConfig() PeerNetworkConfig {
res := PeerNetworkConfigFromC(C.peernetwork_config_new())
runtime.SetFinalizer(res, func(self PeerNetworkConfig) { self.free() })
@@ -173,26 +241,34 @@ func NewPeerNetworkConfig() PeerNetworkConfig {
func (self PeerNetworkConfig) free() { C.peernetwork_config_free(self.inner) }
+// Set the connection retry delay (in seconds).
func (self PeerNetworkConfig) RetryConnDelay(t_sec float64) {
C.peernetwork_config_retry_conn_delay(self.inner, C.double(t_sec))
}
+// Set the period for sending ping messsages (in seconds).
func (self PeerNetworkConfig) PingPeriod(t_sec float64) {
C.peernetwork_config_ping_period(self.inner, C.double(t_sec))
}
+// Set the time it takes after sending a ping message before a connection is
+// considered as broken.
func (self PeerNetworkConfig) ConnTimeout(t_sec float64) {
C.peernetwork_config_conn_timeout(self.inner, C.double(t_sec))
}
+// Set the identity mode.
func (self PeerNetworkConfig) IdMode(mode PeerNetworkIdMode) {
C.peernetwork_config_id_mode(self.inner, mode)
}
+// Use the PeerNetworkConfig object as a MsgNetworkConfig object (to invoke the
+// methods inherited from MsgNetworkConfig, such as NWorker).
func (self PeerNetworkConfig) AsMsgNetworkConfig() MsgNetworkConfig {
return MsgNetworkConfigFromC(C.peernetwork_config_as_msgnetwork_config(self.inner))
}
+// Create a peer-to-peer message network handle.
func NewPeerNetwork(ec EventContext, config PeerNetworkConfig) PeerNetwork {
res := PeerNetworkFromC(C.peernetwork_new(ec.inner, config.inner))
ec.attach(rawptr_t(res.inner), res)
@@ -202,30 +278,50 @@ func NewPeerNetwork(ec EventContext, config PeerNetworkConfig) PeerNetwork {
func (self PeerNetwork) free() { C.peernetwork_free(self.inner) }
+// Add a peer to the list of known peers. The P2P network will try to keep
+// bi-direction connections to all known peers in the list (through
+// reconnection and connection deduplication).
func (self PeerNetwork) AddPeer(paddr NetAddr) { C.peernetwork_add_peer(self.inner, paddr.inner) }
+// Test whether a peer is already in the list.
func (self PeerNetwork) HasPeer(paddr NetAddr) bool { return bool(C.peernetwork_has_peer(self.inner, paddr.inner)) }
+// Get the connection of the known peer. The connection handle is returned. The
+// returned connection handle could be kept in your program.
func (self PeerNetwork) GetPeerConn(paddr NetAddr, err *Error) PeerNetworkConn {
res := PeerNetworkConnFromC(C.peernetwork_get_peer_conn(self.inner, paddr.inner, err))
runtime.SetFinalizer(res, func(self PeerNetworkConn) { self.free() })
return res
}
+// Use the PeerNetwork handle as a MsgNetwork handle (to invoke the methods
+// inherited from MsgNetwork, such as RegHandler).
func (self PeerNetwork) AsMsgNetwork() MsgNetwork {
return MsgNetworkFromC(C.peernetwork_as_msgnetwork(self.inner))
}
+// Use the MsgNetwork handle as a PeerNetwork handle (forcing the conversion).
func (self MsgNetwork) AsPeerNetworkUnsafe() PeerNetwork {
return PeerNetworkFromC(C.msgnetwork_as_peernetwork_unsafe(self.inner))
}
+// Create a MsgNetworkConn handle from a PeerNetworkConn (representing the same
+// connection).
func NewMsgNetworkConnFromPeerNetWorkConn(conn PeerNetworkConn) MsgNetworkConn {
res := MsgNetworkConnFromC(C.msgnetwork_conn_new_from_peernetwork_conn(conn.inner))
runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.free() })
return res
}
+// Create a PeerNetworkConn handle from a MsgNetworkConn (representing the same
+// connection and forcing the conversion).
+func NewPeerNetworkConnFromMsgNetWorkConnUnsafe(conn MsgNetworkConn) PeerNetworkConn {
+ res := PeerNetworkConnFromC(C.peernetwork_conn_new_from_msgnetwork_conn_unsafe(conn.inner))
+ runtime.SetFinalizer(res, func(self PeerNetworkConn) { self.free() })
+ return res
+}
+
+// Make a copy of the connection handle.
func (self PeerNetworkConn) Copy() PeerNetworkConn {
res := PeerNetworkConnFromC(C.peernetwork_conn_copy(self.inner))
runtime.SetFinalizer(res, func(self PeerNetworkConn) { self.free() })
@@ -234,25 +330,42 @@ func (self PeerNetworkConn) Copy() PeerNetworkConn {
func (self PeerNetworkConn) free() { C.peernetwork_conn_free(self.inner) }
+// Listen to the specified network address. Notice that this method overrides
+// Listen() in MsgNetwork, so you should always call this one instead of
+// AsMsgNetwork().Listen().
func (self PeerNetwork) Listen(listenAddr NetAddr, err *Error) {
C.peernetwork_listen(self.inner, listenAddr.inner, err)
}
+// Send a message to the given peer.
func (self PeerNetwork) SendMsg(msg Msg, addr NetAddr) {
C.peernetwork_send_msg(self.inner, msg.inner, addr.inner)
}
+// Send a message to the given peer, using a worker thread to seralize and put
+// data to the send buffer. The payload contained in the given msg will be
+// moved and sent. Thus, no methods of msg involving the payload should be
+// called afterwards.
func (self PeerNetwork) SendMsgDeferredByMove(msg Msg, addr NetAddr) {
C.peernetwork_send_msg_deferred_by_move(self.inner, msg.inner, addr.inner)
}
+// Send a message to the given list of peers. The payload contained in the
+// given msg will be moved and sent. Thus, no methods of msg involving the
+// payload should be called afterwards.
func (self PeerNetwork) MulticastMsgByMove(msg Msg, paddrs []NetAddr) {
na := NewAddrArrayFromAddrs(paddrs)
C.peernetwork_multicast_msg_by_move(self.inner, msg.inner, na.inner)
}
+// The C function pointer type which takes netaddr_t* and void* (passing in the
+// custom user data allocated by C.malloc) as parameters.
type MsgNetworkUnknownPeerCallback = C.msgnetwork_unknown_peer_callback_t
+// Register a connection handler invoked when a remote peer that is not in the
+// list of known peers attempted to connect. By default configuration, the
+// connection was rejected, and you can call AddPeer() to enroll this peer if
+// you hope to establish the connection soon.
func (self PeerNetwork) RegUnknownPeerHandler(callback MsgNetworkUnknownPeerCallback, userdata rawptr_t) {
C.peernetwork_reg_unknown_peer_handler(self.inner, callback, userdata)
}
diff --git a/salticidae b/salticidae
-Subproject 823585c4db2ef6752d12f489c83edab577b8609
+Subproject ff904b23147e72db4f7f77f8269242d9a5a0859
diff --git a/stream.go b/stream.go
index 39bfd5c..c35933e 100644
--- a/stream.go
+++ b/stream.go
@@ -8,8 +8,10 @@ import "runtime"
type byteArray struct {
inner *C.bytearray_t
}
+// Array of binary data.
type ByteArray = *byteArray
+// Create an empty byte array (with zero contained bytes).
func NewByteArray() ByteArray {
res := &byteArray{ inner: C.bytearray_new() }
runtime.SetFinalizer(res, func(self ByteArray) { self.free() })
@@ -18,31 +20,48 @@ func NewByteArray() ByteArray {
func (self ByteArray) free() { C.bytearray_free(self.inner) }
+// Create a byte array by taking out all data from src. Notice this is a
+// zero-copy operation that consumes and invalidates the data in src ("move"
+// semantics) so that no more operation should be done to src after this
+// function call.
func NewByteArrayMovedFromDataStream(src DataStream) ByteArray {
res := &byteArray{ inner: C.bytearray_new_moved_from_datastream(src.inner) }
runtime.SetFinalizer(res, func(self ByteArray) { self.free() })
return res
}
+// The C pointer to a DataStream object.
+type CDataStream = *C.datastream_t
type dataStream struct {
- inner *C.datastream_t
+ inner CDataStream
+ attached map[uintptr]interface{}
}
+// Stream of binary data.
type DataStream = *dataStream
+func DataStreamFromC(ptr CDataStream) DataStream {
+ return &dataStream{
+ inner: ptr,
+ attached: make(map[uintptr]interface{}),
+ }
+}
+
+// Create an empty DataStream.
func NewDataStream() DataStream {
- res := &dataStream{ inner: C.datastream_new() }
+ res := DataStreamFromC(C.datastream_new())
runtime.SetFinalizer(res, func(self DataStream) { self.free() })
return res
}
+// Create a DataStream with data copied from bytes.
func NewDataStreamFromBytes(bytes []byte) (res DataStream) {
size := len(bytes)
if size > 0 {
base := (*C.uint8_t)(&bytes[0])
- res = &dataStream{ inner: C.datastream_new_from_bytes(base, C.size_t(size)) }
+ res = DataStreamFromC(C.datastream_new_from_bytes(base, C.size_t(size)))
} else {
- res = &dataStream{ inner: C.datastream_new() }
+ res = DataStreamFromC(C.datastream_new())
}
runtime.SetFinalizer(res, func(self DataStream) { self.free() })
return
@@ -50,28 +69,42 @@ func NewDataStreamFromBytes(bytes []byte) (res DataStream) {
func (self DataStream) free() { C.datastream_free(self.inner) }
-func (self DataStream) Copy() {
- res := &dataStream{ inner: C.datastream_copy(self.inner, src.inner) }
+func (self DataStream) attach(ptr rawptr_t, obj interface{}) { self.attached[uintptr(ptr)] = obj }
+func (self DataStream) detach(ptr rawptr_t) { delete(self.attached, uintptr(ptr)) }
+
+// Make a copy of the object.
+func (self DataStream) Copy() DataStream {
+ res := DataStreamFromC(C.datastream_copy(self.inner))
runtime.SetFinalizer(res, func(self DataStream) { self.free() })
return res
}
// TODO: datastream_data
+// Empty the DataStream.
func (self DataStream) Clear() { C.datastream_clear(self.inner) }
func (self DataStream) Size() int { return int(C.datastream_size(self.inner)) }
+// Write a uint8 integer to the stream (no byte order conversion).
func (self DataStream) PutU8(v uint8) bool { return bool(C.datastream_put_u8(self.inner, C.uint8_t(v))) }
+// Write a uint16 integer to the stream (no byte order conversion).
func (self DataStream) PutU16(v uint16) bool { return bool(C.datastream_put_u16(self.inner, C.uint16_t(v))) }
+// Write a uint32 integer to the stream (no byte order conversion).
func (self DataStream) PutU32(v uint32) bool { return bool(C.datastream_put_u32(self.inner, C.uint32_t(v))) }
-func (self DataStream) PutU64(v uint32) bool { return bool(C.datastream_put_u64(self.inner, C.uint64_t(v))) }
+// Write a uint64 integer to the stream (no byte order conversion).
+func (self DataStream) PutU64(v uint64) bool { return bool(C.datastream_put_u64(self.inner, C.uint64_t(v))) }
+// Write an int8 integer to the stream (no byte order conversion).
func (self DataStream) PutI8(v int8) bool { return bool(C.datastream_put_i8(self.inner, C.int8_t(v))) }
+// Write an int16 integer to the stream (no byte order conversion).
func (self DataStream) PutI16(v int16) bool { return bool(C.datastream_put_i16(self.inner, C.int16_t(v))) }
+// Write an int32 integer to the stream (no byte order conversion).
func (self DataStream) PutI32(v int32) bool { return bool(C.datastream_put_i32(self.inner, C.int32_t(v))) }
+// Write an int64 integer to the stream (no byte order conversion).
func (self DataStream) PutI64(v int32) bool { return bool(C.datastream_put_i64(self.inner, C.int64_t(v))) }
+// Write arbitrary bytes to the stream.
func (self DataStream) PutData(bytes []byte) bool {
size := len(bytes)
if size > 0 {
@@ -80,28 +113,57 @@ func (self DataStream) PutData(bytes []byte) bool {
} else { return true }
}
+// Parse a uint8 integer by consuming the stream (no byte order conversion).
func (self DataStream) GetU8(succ *bool) uint8 { return uint8(C.datastream_get_u8(self.inner, (*C.bool)(succ))) }
+// Parse a uint16 integer by consuming the stream (no byte order conversion).
func (self DataStream) GetU16(succ *bool) uint16 { return uint16(C.datastream_get_u16(self.inner, (*C.bool)(succ))) }
+// Parse a uint32 integer by consuming the stream (no byte order conversion).
func (self DataStream) GetU32(succ *bool) uint32 { return uint32(C.datastream_get_u32(self.inner, (*C.bool)(succ))) }
+// Parse a uint64 integer by consuming the stream (no byte order conversion).
func (self DataStream) GetU64(succ *bool) uint64 { return uint64(C.datastream_get_u64(self.inner, (*C.bool)(succ))) }
+// Parse an int8 integer by consuming the stream (no byte order conversion).
func (self DataStream) GetI8(succ *bool) int8 { return int8(C.datastream_get_i8(self.inner, (*C.bool)(succ))) }
+// Parse an int16 integer by consuming the stream (no byte order conversion).
func (self DataStream) GetI16(succ *bool) int16 { return int16(C.datastream_get_i16(self.inner, (*C.bool)(succ))) }
+// Parse an int32 integer by consuming the stream (no byte order conversion).
func (self DataStream) GetI32(succ *bool) int32 { return int32(C.datastream_get_i32(self.inner, (*C.bool)(succ))) }
+// Parse an int64 integer by consuming the stream (no byte order conversion).
func (self DataStream) GetI64(succ *bool) int64 { return int64(C.datastream_get_i64(self.inner, (*C.bool)(succ))) }
+// The handle returned by GetDataInPlace. The Go slice returned by Get() is
+// valid only during the lifetime of the handle.
+type dataStreamBytes struct {
+ bytes []byte
+ ds DataStream
+}
+
+type DataStreamBytes = *dataStreamBytes
-func (self DataStream) GetDataInPlace(length int) []byte {
+func (self DataStreamBytes) Get() []byte { return self.bytes }
+func (self DataStreamBytes) Release() { self.ds.detach(rawptr_t(self)) }
+
+// Get the given length of preceeding bytes from the stream as a byte slice by
+// consuming the stream. Notice this function does not copy the bytes, so the
+// slice is only valid during the lifetime of DataStreamBytes handle.
+func (self DataStream) GetDataInPlace(length int) DataStreamBytes {
base := C.datastream_get_data_inplace(self.inner, C.size_t(length))
- return C.GoBytes(rawptr_t(base), C.int(length))
+ res := &dataStreamBytes{
+ bytes: C.GoBytes(rawptr_t(base), C.int(length)),
+ ds: self,
+ }
+ self.attach(rawptr_t(res), res)
+ return res
}
type uint256 struct {
inner *C.uint256_t
}
+// 256-bit integer.
type UInt256 = *uint256
+// Create a 256-bit integer.
func NewUInt256() UInt256 {
res := &uint256{ inner: C.uint256_new() }
runtime.SetFinalizer(res, func(self UInt256) { self.free() })
@@ -109,18 +171,28 @@ func NewUInt256() UInt256 {
}
func (self UInt256) free() { C.uint256_free(self.inner) }
-func (self UInt256) UInt256IsNull() bool { return bool(C.uint256_is_null(self.inner)) }
-func (self UInt256) UInt256IsEq(other UInt256) bool { return bool(C.uint256_is_eq(self.inner, other.inner)) }
+
+func (self UInt256) IsNull() bool { return bool(C.uint256_is_null(self.inner)) }
+
+// Check if two 256-bit integers are equal.
+func (self UInt256) IsEq(other UInt256) bool { return bool(C.uint256_is_eq(self.inner, other.inner)) }
+
+// Write the integer to the given DataStream.
func (self UInt256) Serialize(s DataStream) { C.uint256_serialize(self.inner, s.inner) }
+
+// Parse the integer from the given DataStream.
func (self UInt256) Unserialize(s DataStream) { C.uint256_unserialize(self.inner, s.inner) }
-func (self UInt256) IsEq(other UInt256) bool { return bool(C.uint256_is_eq(self.inner, other.inner)) }
+// Get the Sha256 hash of the given DataStream content (without consuming the
+// stream).
func (self DataStream) GetHash() UInt256 {
res := &uint256{ inner: C.datastream_get_hash(self.inner) }
runtime.SetFinalizer(res, func(self UInt256) { self.free() })
return res
}
+// Get hexadicemal string representation of the given DataStream content
+// (without consuming the stream).
func (self DataStream) GetHex() string {
tmp := C.datastream_get_hex(self.inner)
res := C.GoString(tmp)
@@ -128,6 +200,7 @@ func (self DataStream) GetHex() string {
return res
}
+// Get hexadicemal string representation of the 256-bit integer.
func (self UInt256) GetHex() string {
s := NewDataStream()
self.Serialize(s)
diff --git a/test_msgnet/main.go b/test_msgnet/main.go
index 4e52982..66d7287 100644
--- a/test_msgnet/main.go
+++ b/test_msgnet/main.go
@@ -34,10 +34,10 @@ func msgHelloSerialize(name string, text string) salticidae.Msg {
}
func msgHelloUnserialize(msg salticidae.Msg) (name string, text string) {
- p := msg.ConsumePayload()
- length := binary.LittleEndian.Uint32(p.GetDataInPlace(4))
- name = string(p.GetDataInPlace(int(length)))
- text = string(p.GetDataInPlace(p.Size()))
+ p := msg.GetPayloadByMove()
+ t := p.GetDataInPlace(4); length := binary.LittleEndian.Uint32(t.Get()); t.Release()
+ t = p.GetDataInPlace(int(length)); name = string(t.Get()); t.Release()
+ t = p.GetDataInPlace(p.Size()); text = string(t.Get()); t.Release()
return
}
@@ -79,7 +79,7 @@ func onReceiveHello(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ u
name, text := msgHelloUnserialize(msg)
fmt.Printf("[%s] %s says %s\n", myName, name, text)
ack := msgAckSerialize()
- net.SendMsgByMove(ack, conn)
+ net.SendMsg(ack, conn)
}
//export onReceiveAck
@@ -106,7 +106,7 @@ func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, _ unsafe.P
if conn.GetMode() == salticidae.CONN_MODE_ACTIVE {
fmt.Printf("[%s] Connected, sending hello.\n", name)
hello := msgHelloSerialize(name, "Hello there!")
- n.net.SendMsgByMove(hello, conn)
+ n.net.SendMsg(hello, conn)
} else {
fmt.Printf("[%s] Accepted, waiting for greetings.\n", name)
}
diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go
index 251f2e1..9e0757f 100644
--- a/test_p2p_stress/main.go
+++ b/test_p2p_stress/main.go
@@ -54,7 +54,7 @@ func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) {
}
func msgRandUnserialize(msg salticidae.Msg) salticidae.DataStream {
- return msg.ConsumePayload()
+ return msg.GetPayloadByMove()
}
func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg {
@@ -66,7 +66,7 @@ func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg {
}
func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 {
- p := msg.ConsumePayload()
+ p := msg.GetPayloadByMove()
hash := salticidae.NewUInt256()
hash.Unserialize(p)
return hash
@@ -126,7 +126,7 @@ func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) {
msg, hash := msgRandSerialize(size)
tc := app.getTC(addr2id(conn.GetAddr()))
tc.hash = hash
- app.net.AsMsgNetwork().SendMsgByMove(msg, conn)
+ app.net.AsMsgNetwork().SendMsg(msg, conn)
}
var apps []AppContext
@@ -157,7 +157,7 @@ func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, user
conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
net := conn.GetNet()
ack := msgAckSerialize(msgRandUnserialize(msg).GetHash())
- net.SendMsgByMove(ack, conn)
+ net.SendMsg(ack, conn)
}
//export onReceiveAck