diff options
-rw-r--r-- | Makefile | 14 | ||||
-rw-r--r-- | event.go | 48 | ||||
-rw-r--r-- | msg.go | 23 | ||||
-rw-r--r-- | netaddr.go | 17 | ||||
-rw-r--r-- | network.go | 155 | ||||
m--------- | salticidae | 0 | ||||
-rw-r--r-- | stream.go | 97 | ||||
-rw-r--r-- | test_msgnet/main.go | 12 | ||||
-rw-r--r-- | test_p2p_stress/main.go | 8 |
9 files changed, 320 insertions, 54 deletions
@@ -1,23 +1,23 @@ -.PHONY: all clean run_make +.PHONY: all clean all: build/test_msgnet build/test_p2p_stress -run_make: - make -C salticidae/ salticidae/libsalticidae.so: cd salticidae/; cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED=ON -DSALTICIDAE_DEBUG_LOG=OFF -DSALTICIDAE_CBINDINGS=ON -DBUILD_TEST=OFF ./ - make -C salticidae/ + make -C salticidae/ -j4 build: mkdir -p build -build/test_msgnet: salticidae/libsalticidae.so test_msgnet/main.go run_make +build/test_msgnet: salticidae/libsalticidae.so test_msgnet/main.go + make -C salticidae/ go build -o $@ github.com/Determinant/salticidae-go/test_msgnet -build/test_p2p_stress: salticidae/libsalticidae.so test_p2p_stress/main.go run_make +build/test_p2p_stress: salticidae/libsalticidae.so test_p2p_stress/main.go + make -C salticidae/ go build -o $@ github.com/Determinant/salticidae-go/test_p2p_stress clean: - rm -r build/ + rm -rf build/ cd salticidae/; make clean rm salticidae/CMakeCache.txt @@ -5,11 +5,13 @@ package salticidae import "C" import "runtime" +// The C pointer type for an EventContext handle. type CEventContext = *C.eventcontext_t type eventContext struct { inner CEventContext attached map[uintptr]interface{} } +// The handle for an event loop. type EventContext = *eventContext func NewEventContext() EventContext { @@ -24,15 +26,32 @@ func NewEventContext() EventContext { func (self EventContext) attach(ptr rawptr_t, x interface{}) { self.attached[uintptr(ptr)] = x } func (self EventContext) detach(ptr rawptr_t) { delete(self.attached, uintptr(ptr)) } func (self EventContext) free() { C.eventcontext_free(self.inner) } + +// Start the event loop. This is a blocking call that will hand over the +// control flow to the infinite loop which triggers callbacks upon new events. +// The function will return when Stop() is called. func (self EventContext) Dispatch() { C.eventcontext_dispatch(self.inner) } + +// Stop the event loop. This function is typically called in a callback. Notice +// that all methods of an EventContext should be invoked by the same thread +// which logically owns the loop. To schedule code executed in the event loop +// of a particular thread, see ThreadCall. func (self EventContext) Stop() { C.eventcontext_stop(self.inner) } +// The C pointer type for a ThreadCall handle. type CThreadCall = *C.threadcall_t type threadCall struct { inner CThreadCall } +// The handle for scheduling a function call executed by a particular +// EventContext event loop. type ThreadCall = *threadCall +// The C function pointer type which takes threadcall_handle_t* and void* +// (passing in the custom user data allocated by C.malloc) as parameters. type ThreadCallCallback = C.threadcall_callback_t +// Create a ThreadCall handle attached to the given EventContext. Each +// invokcation of AsyncCall() will schedule a function call executed in the +// given EventContext event loop. func NewThreadCall(ec EventContext) ThreadCall { res := &threadCall{ inner: C.threadcall_new(ec.inner) } runtime.SetFinalizer(res, func(self ThreadCall) { self.free() }) @@ -41,19 +60,27 @@ func NewThreadCall(ec EventContext) ThreadCall { func (self ThreadCall) free() { C.threadcall_free(self.inner) } +// Schedule a function to be executed in the target event loop. func (self ThreadCall) AsyncCall(callback ThreadCallCallback, userdata rawptr_t) { C.threadcall_async_call(self.inner, callback, userdata) } +// The C pointer type for TimerEvent handle. type CTimerEvent = *C.timerev_t type timerEvent struct { inner CTimerEvent ec EventContext } + +// The handle for a timed event. type TimerEvent = *timerEvent +// The C function pointer type which takes timerev_t* (the C pointer to +// TimerEvent) and void* (the unsafe pointer to any userdata) as parameter type TimerEventCallback = C.timerev_callback_t +// Create a TimerEvent handle attached to the given EventContext, with a +// registered callback. func NewTimerEvent(_ec EventContext, cb TimerEventCallback, userdata rawptr_t) TimerEvent { res := &timerEvent{ inner: C.timerev_new(_ec.inner, cb, userdata), @@ -65,26 +92,38 @@ func NewTimerEvent(_ec EventContext, cb TimerEventCallback, userdata rawptr_t) T } func (self TimerEvent) free() { C.timerev_free(self.inner) } + +// Change the callback. func (self TimerEvent) SetCallback(callback TimerEventCallback, userdata rawptr_t) { C.timerev_set_callback(self.inner, callback, userdata) } +// Schedule the timer to go off after t_sec seconds. func (self TimerEvent) Add(t_sec float64) { C.timerev_add(self.inner, C.double(t_sec)) } + + +// Unschedule the timer if it was scheduled. The timer could still be rescheduled +// by calling Add() afterwards. func (self TimerEvent) Del() { self.ec.detach(rawptr_t(self.inner)) C.timerev_del(self.inner) } +// Empty the timer. It will be unscheduled and deallocated and its methods +// should never be called again. func (self TimerEvent) Clear() { self.ec.detach(rawptr_t(self.inner)) C.timerev_clear(self.inner) } +// The C pointer type for a SigEvent. type CSigEvent = *C.sigev_t type sigEvent struct { inner CSigEvent ec EventContext } + +// The handle for a UNIX signal event. type SigEvent = *sigEvent type SigEventCallback = C.sigev_callback_t @@ -94,6 +133,8 @@ var ( SIGINT = C.SIGINT ) +// Create a SigEvent handle attached to the given EventContext, with a +// registered callback. func NewSigEvent(_ec EventContext, cb SigEventCallback, userdata rawptr_t) SigEvent { res := &sigEvent{ inner: C.sigev_new(_ec.inner, cb, userdata), @@ -105,12 +146,19 @@ func NewSigEvent(_ec EventContext, cb SigEventCallback, userdata rawptr_t) SigEv } func (self SigEvent) free() { C.sigev_free(self.inner) } + +// Register the handle to listen for UNIX signal sig. func (self SigEvent) Add(sig int) { C.sigev_add(self.inner, C.int(sig)) } + + +// Unregister the handle. The handle may be re-registered in the future. func (self SigEvent) Del() { self.ec.detach(rawptr_t(self.inner)) C.sigev_del(self.inner) } +// Unregister the handle. Any methods of the handle should no longer be used +// and the handle will be deallocated. func (self SigEvent) Clear() { self.ec.detach(rawptr_t(self.inner)) C.sigev_clear(self.inner) @@ -5,26 +5,41 @@ package salticidae import "C" import "runtime" +// The C pointer type for a Msg object type CMsg = *C.msg_t type msg struct { inner CMsg } +// Message sent by MsgNetwork type Msg = *msg +// Convert an existing C pointer into a go object. Notice that when the go +// object does *not* own the resource of the C pointer, so it is only valid to +// the extent in which the given C pointer is valid. The C memory will not be +// deallocated when the go object is finalized by GC. This applies to all other +// "FromC" functions. func MsgFromC(ptr *C.msg_t) Msg { return &msg{ inner: ptr } } -func NewMsgMovedFromByteArray(opcode Opcode, _moved_payload ByteArray) Msg { - res := &msg{ inner: C.msg_new_moved_from_bytearray(C._opcode_t(opcode), _moved_payload.inner) } +// Create a message by taking out all data from src. Notice this is a zero-copy +// operation that consumes and invalidates the data in src ("move" semantics) +// so that no more operation should be done to src after this function call. +func NewMsgMovedFromByteArray(opcode Opcode, src ByteArray) Msg { + res := &msg{ inner: C.msg_new_moved_from_bytearray(C._opcode_t(opcode), src.inner) } runtime.SetFinalizer(res, func(self Msg) { self.free() }) return res } func (self Msg) free() { C.msg_free(self.inner) } -func (self Msg) ConsumePayload() DataStream { - res := &dataStream{ inner: C.msg_consume_payload(self.inner) } +// Get the message payload by taking out all data. Notice this is a zero-copy +// operation that consumes and invalidates the data in the payload ("move" +// semantics) so that no more operation should be done to the payload after +// this function call. +func (self Msg) GetPayloadByMove() DataStream { + res := DataStreamFromC(C.msg_consume_payload(self.inner)) runtime.SetFinalizer(res, func(self DataStream) { self.free() }) return res } +// Get the opcode. func (self Msg) GetOpcode() Opcode { return Opcode(C.msg_get_opcode(self.inner)) } @@ -5,10 +5,17 @@ package salticidae import "C" import "runtime" +// The C pointer type for a NetAddr object type CNetAddr = *C.netaddr_t type netAddr struct { inner CNetAddr } +// Network address object. type NetAddr = *netAddr +// Convert an existing C pointer into a go object. Notice that when the go +// object does *not* own the resource of the C pointer, so it is only valid to +// the extent in which the given C pointer is valid. The C memory will not be +// deallocated when the go object is finalized by GC. This applies to all other +// "FromC" functions. func NetAddrFromC(ptr CNetAddr) NetAddr { return &netAddr{ inner: ptr } } @@ -17,8 +24,10 @@ type netAddrArray struct { inner *C.netaddr_array_t } +// An array of network address. type NetAddrArray = *netAddrArray +// Create NetAddr from a TCP socket format string (e.g. 127.0.0.1:8888). func NewAddrFromIPPortString(addr string) (res NetAddr) { c_str := C.CString(addr) res = &netAddr{ inner: C.netaddr_new_from_sipport(c_str) } @@ -27,6 +36,7 @@ func NewAddrFromIPPortString(addr string) (res NetAddr) { return } +// Convert a Go slice of net addresses to NetAddrArray. func NewAddrArrayFromAddrs(arr []NetAddr) (res NetAddrArray) { size := len(arr) if size > 0 { @@ -43,14 +53,21 @@ func NewAddrArrayFromAddrs(arr []NetAddr) (res NetAddrArray) { func (self NetAddr) free() { C.netaddr_free(self.inner) } +// Check if two addresses are the same. func (self NetAddr) IsEq(other NetAddr) bool { return bool(C.netaddr_is_eq(self.inner, other.inner)) } func (self NetAddr) IsNull() bool { return bool(C.netaddr_is_null(self.inner)) } +// Get the 32-bit IP representation. func (self NetAddr) GetIP() uint32 { return uint32(C.netaddr_get_ip(self.inner)) } +// Get the 16-bit port number (in UNIX network byte order, so need to apply +// ntohs(), for example, to convert the returned integer to the local endianness). func (self NetAddr) GetPort() uint16 { return uint16(C.netaddr_get_port(self.inner)) } +// Make a copy of the object. This is required if you want to keep the NetAddr +// returned (or passed as a callback parameter) by other salticidae methods +// (such like MsgNetwork/PeerNetwork), unless those method return a moved object. func (self NetAddr) Copy() NetAddr { res := &netAddr{ inner: C.netaddr_copy(self.inner) } runtime.SetFinalizer(res, func(self NetAddr) { self.free() }) @@ -4,52 +4,82 @@ package salticidae import "C" import "runtime" +// The C pointer type for a MsgNetwork handle. type CMsgNetwork = *C.msgnetwork_t type msgNetwork struct { inner CMsgNetwork } +// The handle for a message network. type MsgNetwork = *msgNetwork +// Convert an existing C pointer into a go object. Notice that when the go +// object does *not* own the resource of the C pointer, so it is only valid to +// the extent in which the given C pointer is valid. The C memory will not be +// deallocated when the go object is finalized by GC. This applies to all other +// "FromC" functions. func MsgNetworkFromC(ptr CMsgNetwork) MsgNetwork { return &msgNetwork{ inner: ptr } } +// The C pointer type for a MsgNetworkConn handle. type CMsgNetworkConn = *C.msgnetwork_conn_t type msgNetworkConn struct { inner CMsgNetworkConn } +// The handle for a message network connection. type MsgNetworkConn = *msgNetworkConn func MsgNetworkConnFromC(ptr CMsgNetworkConn) MsgNetworkConn { return &msgNetworkConn{ inner: ptr } } +var ( + CONN_MODE_ACTIVE = MsgNetworkConnMode(C.CONN_MODE_ACTIVE) + CONN_MODE_PASSIVE = MsgNetworkConnMode(C.CONN_MODE_PASSIVE) + CONN_MODE_DEAD = MsgNetworkConnMode(C.CONN_MODE_DEAD) +) + +// The connection mode. CONN_MODE_ACTIVE: a connection established from the +// local side. CONN_MODE_PASSIVE: a connection established from the remote +// side. CONN_MODE_DEAD: a connection that is already closed. type MsgNetworkConnMode = C.msgnetwork_conn_mode_t func (self MsgNetworkConn) free() { C.msgnetwork_conn_free(self.inner) } +// Get the corresponding MsgNetwork handle that manages this connection. The +// returned handle is only valid during the lifetime of this connection. func (self MsgNetworkConn) GetNet() MsgNetwork { return MsgNetworkFromC(C.msgnetwork_conn_get_net(self.inner)) } -var ( - CONN_MODE_ACTIVE = MsgNetworkConnMode(C.CONN_MODE_ACTIVE) - CONN_MODE_PASSIVE = MsgNetworkConnMode(C.CONN_MODE_PASSIVE) - CONN_MODE_DEAD = MsgNetworkConnMode(C.CONN_MODE_DEAD) -) - func (self MsgNetworkConn) GetMode() MsgNetworkConnMode { return C.msgnetwork_conn_get_mode(self.inner) } +// Get the address of the remote end of this connection. Use Copy() to make a +// copy of the address if you want to use the address object beyond the +// lifetime of the connection. func (self MsgNetworkConn) GetAddr() NetAddr { return NetAddrFromC(C.msgnetwork_conn_get_addr(self.inner)) } +// Make a copy of the object. This is required if you want to keep the +// connection passed as a callback parameter by other salticidae methods (such +// like MsgNetwork/PeerNetwork). +func (self MsgNetworkConn) Copy() MsgNetworkConn { + res := MsgNetworkConnFromC(C.msgnetwork_conn_copy(self.inner)) + runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.free() }) + return res +} + + +// The C pointer type for a MsgNetworkConfig object. type CMsgNetworkConfig = *C.msgnetwork_config_t type msgNetworkConfig struct { inner CMsgNetworkConfig } +// Configuration for MsgNetwork. type MsgNetworkConfig = *msgNetworkConfig func MsgNetworkConfigFromC(ptr CMsgNetworkConfig) MsgNetworkConfig { return &msgNetworkConfig{ inner: ptr } } +// Create the configuration object with default settings. func NewMsgNetworkConfig() MsgNetworkConfig { res := MsgNetworkConfigFromC(C.msgnetwork_config_new()) runtime.SetFinalizer(res, func(self MsgNetworkConfig) { self.free() }) @@ -58,30 +88,39 @@ func NewMsgNetworkConfig() MsgNetworkConfig { func (self MsgNetworkConfig) free() { C.msgnetwork_config_free(self.inner) } +// Set the number of consecutive read attempts in the message delivery queue. +// Usually the default value is good enough. This is used to make the tradeoff +// between the event loop fairness and the amortization of syscall cost. func (self MsgNetworkConfig) BurstSize(size int) { C.msgnetwork_config_burst_size(self.inner, C.size_t(size)) } +// Maximum backlogs (see POSIX TCP backlog). func (self MsgNetworkConfig) MaxListenBacklog(backlog int) { C.msgnetwork_config_max_listen_backlog(self.inner, C.int(backlog)) } +// The timeout for connecting to the remote (in seconds). func (self MsgNetworkConfig) ConnServerTimeout(timeout float64) { C.msgnetwork_config_conn_server_timeout(self.inner, C.double(timeout)) } +// The size for an inbound data chunk (per read() syscall). func (self MsgNetworkConfig) SegBuffSize(size int) { C.msgnetwork_config_seg_buff_size(self.inner, C.size_t(size)) } +// The number of worker threads. func (self MsgNetworkConfig) NWorker(nworker int) { C.msgnetwork_config_nworker(self.inner, C.size_t(nworker)) } +// The capacity of the send buffer. func (self MsgNetworkConfig) QueueCapacity(capacity int) { C.msgnetwork_config_queue_capacity(self.inner, C.size_t(capacity)) } +// Create a message network handle which is attached to given event loop. func NewMsgNetwork(ec EventContext, config MsgNetworkConfig) MsgNetwork { res := MsgNetworkFromC(C.msgnetwork_new(ec.inner, config.inner)) ec.attach(rawptr_t(res.inner), res) @@ -90,81 +129,110 @@ func NewMsgNetwork(ec EventContext, config MsgNetworkConfig) MsgNetwork { } func (self MsgNetwork) free() { C.msgnetwork_free(self.inner) } -func (self MsgNetwork) Listen(addr NetAddr, err *Error) { C.msgnetwork_listen(self.inner, addr.inner, err) } + +// Start the message network (by spawning worker threads). This should be +// called before using any other methods. func (self MsgNetwork) Start() { C.msgnetwork_start(self.inner) } + +// Listen to the specified network address. +func (self MsgNetwork) Listen(addr NetAddr, err *Error) { C.msgnetwork_listen(self.inner, addr.inner, err) } + +// Stop the message network. No other methods should be called after this. func (self MsgNetwork) Stop() { C.msgnetwork_stop(self.inner) } +// Send a message through the given connection. func (self MsgNetwork) SendMsg(msg Msg, conn MsgNetworkConn) { C.msgnetwork_send_msg(self.inner, msg.inner, conn.inner) } +// Send a message through the given connection, using a worker thread to +// seralize and put data to the send buffer. The payload contained in the given +// msg will be moved and sent. Thus, no methods of msg involving the payload +// should be called afterwards. func (self MsgNetwork) SendMsgDeferredByMove(msg Msg, conn MsgNetworkConn) { C.msgnetwork_send_msg_deferred_by_move(self.inner, msg.inner, conn.inner) } +// Try to connect to a remote address. The connection handle is returned. The +// returned connection handle could be kept in your program. func (self MsgNetwork) Connect(addr NetAddr, err *Error) MsgNetworkConn { res := MsgNetworkConnFromC(C.msgnetwork_connect(self.inner, addr.inner, err)) runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.free() }) return res } + +// Terminate the given connection. func (self MsgNetwork) Terminate(conn MsgNetworkConn) { C.msgnetwork_terminate(self.inner, conn.inner) } +// The C function pointer type which takes msg_t*, msgnetwork_conn_t* and void* +// (passing in the custom user data allocated by C.malloc) as parameters. +type MsgNetworkMsgCallback = C.msgnetwork_msg_callback_t +// The C function pointer type which takes msgnetwork_conn_t*, bool (true for +// the connection is established, false for the connection is terminated) and +// void* as parameters. +type MsgNetworkConnCallback = C.msgnetwork_conn_callback_t +// The C function Pointer type which takes SalticidaeCError* and void* as parameters. +type MsgNetworkErrorCallback = C.msgnetwork_error_callback_t +// Register a message handler for the type of message identified by opcode. The +// callback function will be invoked upon the delivery of each message with the +// given opcode, by the thread of the event loop the MsgNetwork is attached to. func (self MsgNetwork) RegHandler(opcode Opcode, callback MsgNetworkMsgCallback, userdata rawptr_t) { C.msgnetwork_reg_handler(self.inner, C._opcode_t(opcode), callback, userdata) } +// Register a connection handler invoked when the connection state is changed. func (self MsgNetwork) RegConnHandler(callback MsgNetworkConnCallback, userdata rawptr_t) { C.msgnetwork_reg_conn_handler(self.inner, callback, userdata) } +// Register an error handler invoked when there is recoverable errors during any +// asynchronous call/execution inside the MsgNetwork. func (self MsgNetwork) RegErrorHandler(callback MsgNetworkErrorCallback, userdata rawptr_t) { C.msgnetwork_reg_error_handler(self.inner, callback, userdata) } -func (self MsgNetworkConn) Copy() MsgNetworkConn { - res := MsgNetworkConnFromC(C.msgnetwork_conn_copy(self.inner)) - runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.free() }) - return res -} -func (self MsgNetworkConn) Free() { C.msgnetwork_conn_free(self.inner) } - -type MsgNetworkMsgCallback = C.msgnetwork_msg_callback_t -type MsgNetworkConnCallback = C.msgnetwork_conn_callback_t -type MsgNetworkErrorCallback = C.msgnetwork_error_callback_t - +// The C pointer type for a PeerNetwork handle. type CPeerNetwork = *C.peernetwork_t type peerNetwork struct { inner CPeerNetwork } +// The handle for a peer-to-peer network. type PeerNetwork = *peerNetwork func PeerNetworkFromC(ptr CPeerNetwork) PeerNetwork { return &peerNetwork{ inner: ptr } } - +// The C pointer type for a PeerNetworkConn handle. type CPeerNetworkConn = *C.peernetwork_conn_t type peerNetworkConn struct { inner CPeerNetworkConn } +// The handle for a PeerNetwork connection. type PeerNetworkConn = *peerNetworkConn func PeerNetworkConnFromC(ptr CPeerNetworkConn) PeerNetworkConn { return &peerNetworkConn{ inner: ptr } } -type PeerNetworkIdMode = C.peernetwork_id_mode_t - var ( ID_MODE_IP_BASED = PeerNetworkIdMode(C.ID_MODE_IP_BASED) ID_MODE_IP_PORT_BASED = PeerNetworkIdMode(C.ID_MODE_IP_PORT_BASED) ) +// The identity mode. ID_MODE_IP_BASED: a remote peer is identified by the IP +// only. ID_MODE_IP_PORT_BASED: a remote peer is identified by IP + port +// number. +type PeerNetworkIdMode = C.peernetwork_id_mode_t + +// The C pointer type for a PeerNetworkConfig handle. type CPeerNetworkConfig = *C.peernetwork_config_t type peerNetworkConfig struct { inner CPeerNetworkConfig } +// Configuration for PeerNetwork. type PeerNetworkConfig = *peerNetworkConfig func PeerNetworkConfigFromC(ptr CPeerNetworkConfig) PeerNetworkConfig { return &peerNetworkConfig{ inner: ptr } } +// Create the configuration object with default settings. func NewPeerNetworkConfig() PeerNetworkConfig { res := PeerNetworkConfigFromC(C.peernetwork_config_new()) runtime.SetFinalizer(res, func(self PeerNetworkConfig) { self.free() }) @@ -173,26 +241,34 @@ func NewPeerNetworkConfig() PeerNetworkConfig { func (self PeerNetworkConfig) free() { C.peernetwork_config_free(self.inner) } +// Set the connection retry delay (in seconds). func (self PeerNetworkConfig) RetryConnDelay(t_sec float64) { C.peernetwork_config_retry_conn_delay(self.inner, C.double(t_sec)) } +// Set the period for sending ping messsages (in seconds). func (self PeerNetworkConfig) PingPeriod(t_sec float64) { C.peernetwork_config_ping_period(self.inner, C.double(t_sec)) } +// Set the time it takes after sending a ping message before a connection is +// considered as broken. func (self PeerNetworkConfig) ConnTimeout(t_sec float64) { C.peernetwork_config_conn_timeout(self.inner, C.double(t_sec)) } +// Set the identity mode. func (self PeerNetworkConfig) IdMode(mode PeerNetworkIdMode) { C.peernetwork_config_id_mode(self.inner, mode) } +// Use the PeerNetworkConfig object as a MsgNetworkConfig object (to invoke the +// methods inherited from MsgNetworkConfig, such as NWorker). func (self PeerNetworkConfig) AsMsgNetworkConfig() MsgNetworkConfig { return MsgNetworkConfigFromC(C.peernetwork_config_as_msgnetwork_config(self.inner)) } +// Create a peer-to-peer message network handle. func NewPeerNetwork(ec EventContext, config PeerNetworkConfig) PeerNetwork { res := PeerNetworkFromC(C.peernetwork_new(ec.inner, config.inner)) ec.attach(rawptr_t(res.inner), res) @@ -202,30 +278,50 @@ func NewPeerNetwork(ec EventContext, config PeerNetworkConfig) PeerNetwork { func (self PeerNetwork) free() { C.peernetwork_free(self.inner) } +// Add a peer to the list of known peers. The P2P network will try to keep +// bi-direction connections to all known peers in the list (through +// reconnection and connection deduplication). func (self PeerNetwork) AddPeer(paddr NetAddr) { C.peernetwork_add_peer(self.inner, paddr.inner) } +// Test whether a peer is already in the list. func (self PeerNetwork) HasPeer(paddr NetAddr) bool { return bool(C.peernetwork_has_peer(self.inner, paddr.inner)) } +// Get the connection of the known peer. The connection handle is returned. The +// returned connection handle could be kept in your program. func (self PeerNetwork) GetPeerConn(paddr NetAddr, err *Error) PeerNetworkConn { res := PeerNetworkConnFromC(C.peernetwork_get_peer_conn(self.inner, paddr.inner, err)) runtime.SetFinalizer(res, func(self PeerNetworkConn) { self.free() }) return res } +// Use the PeerNetwork handle as a MsgNetwork handle (to invoke the methods +// inherited from MsgNetwork, such as RegHandler). func (self PeerNetwork) AsMsgNetwork() MsgNetwork { return MsgNetworkFromC(C.peernetwork_as_msgnetwork(self.inner)) } +// Use the MsgNetwork handle as a PeerNetwork handle (forcing the conversion). func (self MsgNetwork) AsPeerNetworkUnsafe() PeerNetwork { return PeerNetworkFromC(C.msgnetwork_as_peernetwork_unsafe(self.inner)) } +// Create a MsgNetworkConn handle from a PeerNetworkConn (representing the same +// connection). func NewMsgNetworkConnFromPeerNetWorkConn(conn PeerNetworkConn) MsgNetworkConn { res := MsgNetworkConnFromC(C.msgnetwork_conn_new_from_peernetwork_conn(conn.inner)) runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.free() }) return res } +// Create a PeerNetworkConn handle from a MsgNetworkConn (representing the same +// connection and forcing the conversion). +func NewPeerNetworkConnFromMsgNetWorkConnUnsafe(conn MsgNetworkConn) PeerNetworkConn { + res := PeerNetworkConnFromC(C.peernetwork_conn_new_from_msgnetwork_conn_unsafe(conn.inner)) + runtime.SetFinalizer(res, func(self PeerNetworkConn) { self.free() }) + return res +} + +// Make a copy of the connection handle. func (self PeerNetworkConn) Copy() PeerNetworkConn { res := PeerNetworkConnFromC(C.peernetwork_conn_copy(self.inner)) runtime.SetFinalizer(res, func(self PeerNetworkConn) { self.free() }) @@ -234,25 +330,42 @@ func (self PeerNetworkConn) Copy() PeerNetworkConn { func (self PeerNetworkConn) free() { C.peernetwork_conn_free(self.inner) } +// Listen to the specified network address. Notice that this method overrides +// Listen() in MsgNetwork, so you should always call this one instead of +// AsMsgNetwork().Listen(). func (self PeerNetwork) Listen(listenAddr NetAddr, err *Error) { C.peernetwork_listen(self.inner, listenAddr.inner, err) } +// Send a message to the given peer. func (self PeerNetwork) SendMsg(msg Msg, addr NetAddr) { C.peernetwork_send_msg(self.inner, msg.inner, addr.inner) } +// Send a message to the given peer, using a worker thread to seralize and put +// data to the send buffer. The payload contained in the given msg will be +// moved and sent. Thus, no methods of msg involving the payload should be +// called afterwards. func (self PeerNetwork) SendMsgDeferredByMove(msg Msg, addr NetAddr) { C.peernetwork_send_msg_deferred_by_move(self.inner, msg.inner, addr.inner) } +// Send a message to the given list of peers. The payload contained in the +// given msg will be moved and sent. Thus, no methods of msg involving the +// payload should be called afterwards. func (self PeerNetwork) MulticastMsgByMove(msg Msg, paddrs []NetAddr) { na := NewAddrArrayFromAddrs(paddrs) C.peernetwork_multicast_msg_by_move(self.inner, msg.inner, na.inner) } +// The C function pointer type which takes netaddr_t* and void* (passing in the +// custom user data allocated by C.malloc) as parameters. type MsgNetworkUnknownPeerCallback = C.msgnetwork_unknown_peer_callback_t +// Register a connection handler invoked when a remote peer that is not in the +// list of known peers attempted to connect. By default configuration, the +// connection was rejected, and you can call AddPeer() to enroll this peer if +// you hope to establish the connection soon. func (self PeerNetwork) RegUnknownPeerHandler(callback MsgNetworkUnknownPeerCallback, userdata rawptr_t) { C.peernetwork_reg_unknown_peer_handler(self.inner, callback, userdata) } diff --git a/salticidae b/salticidae -Subproject 823585c4db2ef6752d12f489c83edab577b8609 +Subproject ff904b23147e72db4f7f77f8269242d9a5a0859 @@ -8,8 +8,10 @@ import "runtime" type byteArray struct { inner *C.bytearray_t } +// Array of binary data. type ByteArray = *byteArray +// Create an empty byte array (with zero contained bytes). func NewByteArray() ByteArray { res := &byteArray{ inner: C.bytearray_new() } runtime.SetFinalizer(res, func(self ByteArray) { self.free() }) @@ -18,31 +20,48 @@ func NewByteArray() ByteArray { func (self ByteArray) free() { C.bytearray_free(self.inner) } +// Create a byte array by taking out all data from src. Notice this is a +// zero-copy operation that consumes and invalidates the data in src ("move" +// semantics) so that no more operation should be done to src after this +// function call. func NewByteArrayMovedFromDataStream(src DataStream) ByteArray { res := &byteArray{ inner: C.bytearray_new_moved_from_datastream(src.inner) } runtime.SetFinalizer(res, func(self ByteArray) { self.free() }) return res } +// The C pointer to a DataStream object. +type CDataStream = *C.datastream_t type dataStream struct { - inner *C.datastream_t + inner CDataStream + attached map[uintptr]interface{} } +// Stream of binary data. type DataStream = *dataStream +func DataStreamFromC(ptr CDataStream) DataStream { + return &dataStream{ + inner: ptr, + attached: make(map[uintptr]interface{}), + } +} + +// Create an empty DataStream. func NewDataStream() DataStream { - res := &dataStream{ inner: C.datastream_new() } + res := DataStreamFromC(C.datastream_new()) runtime.SetFinalizer(res, func(self DataStream) { self.free() }) return res } +// Create a DataStream with data copied from bytes. func NewDataStreamFromBytes(bytes []byte) (res DataStream) { size := len(bytes) if size > 0 { base := (*C.uint8_t)(&bytes[0]) - res = &dataStream{ inner: C.datastream_new_from_bytes(base, C.size_t(size)) } + res = DataStreamFromC(C.datastream_new_from_bytes(base, C.size_t(size))) } else { - res = &dataStream{ inner: C.datastream_new() } + res = DataStreamFromC(C.datastream_new()) } runtime.SetFinalizer(res, func(self DataStream) { self.free() }) return @@ -50,28 +69,42 @@ func NewDataStreamFromBytes(bytes []byte) (res DataStream) { func (self DataStream) free() { C.datastream_free(self.inner) } -func (self DataStream) Copy() { - res := &dataStream{ inner: C.datastream_copy(self.inner, src.inner) } +func (self DataStream) attach(ptr rawptr_t, obj interface{}) { self.attached[uintptr(ptr)] = obj } +func (self DataStream) detach(ptr rawptr_t) { delete(self.attached, uintptr(ptr)) } + +// Make a copy of the object. +func (self DataStream) Copy() DataStream { + res := DataStreamFromC(C.datastream_copy(self.inner)) runtime.SetFinalizer(res, func(self DataStream) { self.free() }) return res } // TODO: datastream_data +// Empty the DataStream. func (self DataStream) Clear() { C.datastream_clear(self.inner) } func (self DataStream) Size() int { return int(C.datastream_size(self.inner)) } +// Write a uint8 integer to the stream (no byte order conversion). func (self DataStream) PutU8(v uint8) bool { return bool(C.datastream_put_u8(self.inner, C.uint8_t(v))) } +// Write a uint16 integer to the stream (no byte order conversion). func (self DataStream) PutU16(v uint16) bool { return bool(C.datastream_put_u16(self.inner, C.uint16_t(v))) } +// Write a uint32 integer to the stream (no byte order conversion). func (self DataStream) PutU32(v uint32) bool { return bool(C.datastream_put_u32(self.inner, C.uint32_t(v))) } -func (self DataStream) PutU64(v uint32) bool { return bool(C.datastream_put_u64(self.inner, C.uint64_t(v))) } +// Write a uint64 integer to the stream (no byte order conversion). +func (self DataStream) PutU64(v uint64) bool { return bool(C.datastream_put_u64(self.inner, C.uint64_t(v))) } +// Write an int8 integer to the stream (no byte order conversion). func (self DataStream) PutI8(v int8) bool { return bool(C.datastream_put_i8(self.inner, C.int8_t(v))) } +// Write an int16 integer to the stream (no byte order conversion). func (self DataStream) PutI16(v int16) bool { return bool(C.datastream_put_i16(self.inner, C.int16_t(v))) } +// Write an int32 integer to the stream (no byte order conversion). func (self DataStream) PutI32(v int32) bool { return bool(C.datastream_put_i32(self.inner, C.int32_t(v))) } +// Write an int64 integer to the stream (no byte order conversion). func (self DataStream) PutI64(v int32) bool { return bool(C.datastream_put_i64(self.inner, C.int64_t(v))) } +// Write arbitrary bytes to the stream. func (self DataStream) PutData(bytes []byte) bool { size := len(bytes) if size > 0 { @@ -80,28 +113,57 @@ func (self DataStream) PutData(bytes []byte) bool { } else { return true } } +// Parse a uint8 integer by consuming the stream (no byte order conversion). func (self DataStream) GetU8(succ *bool) uint8 { return uint8(C.datastream_get_u8(self.inner, (*C.bool)(succ))) } +// Parse a uint16 integer by consuming the stream (no byte order conversion). func (self DataStream) GetU16(succ *bool) uint16 { return uint16(C.datastream_get_u16(self.inner, (*C.bool)(succ))) } +// Parse a uint32 integer by consuming the stream (no byte order conversion). func (self DataStream) GetU32(succ *bool) uint32 { return uint32(C.datastream_get_u32(self.inner, (*C.bool)(succ))) } +// Parse a uint64 integer by consuming the stream (no byte order conversion). func (self DataStream) GetU64(succ *bool) uint64 { return uint64(C.datastream_get_u64(self.inner, (*C.bool)(succ))) } +// Parse an int8 integer by consuming the stream (no byte order conversion). func (self DataStream) GetI8(succ *bool) int8 { return int8(C.datastream_get_i8(self.inner, (*C.bool)(succ))) } +// Parse an int16 integer by consuming the stream (no byte order conversion). func (self DataStream) GetI16(succ *bool) int16 { return int16(C.datastream_get_i16(self.inner, (*C.bool)(succ))) } +// Parse an int32 integer by consuming the stream (no byte order conversion). func (self DataStream) GetI32(succ *bool) int32 { return int32(C.datastream_get_i32(self.inner, (*C.bool)(succ))) } +// Parse an int64 integer by consuming the stream (no byte order conversion). func (self DataStream) GetI64(succ *bool) int64 { return int64(C.datastream_get_i64(self.inner, (*C.bool)(succ))) } +// The handle returned by GetDataInPlace. The Go slice returned by Get() is +// valid only during the lifetime of the handle. +type dataStreamBytes struct { + bytes []byte + ds DataStream +} + +type DataStreamBytes = *dataStreamBytes -func (self DataStream) GetDataInPlace(length int) []byte { +func (self DataStreamBytes) Get() []byte { return self.bytes } +func (self DataStreamBytes) Release() { self.ds.detach(rawptr_t(self)) } + +// Get the given length of preceeding bytes from the stream as a byte slice by +// consuming the stream. Notice this function does not copy the bytes, so the +// slice is only valid during the lifetime of DataStreamBytes handle. +func (self DataStream) GetDataInPlace(length int) DataStreamBytes { base := C.datastream_get_data_inplace(self.inner, C.size_t(length)) - return C.GoBytes(rawptr_t(base), C.int(length)) + res := &dataStreamBytes{ + bytes: C.GoBytes(rawptr_t(base), C.int(length)), + ds: self, + } + self.attach(rawptr_t(res), res) + return res } type uint256 struct { inner *C.uint256_t } +// 256-bit integer. type UInt256 = *uint256 +// Create a 256-bit integer. func NewUInt256() UInt256 { res := &uint256{ inner: C.uint256_new() } runtime.SetFinalizer(res, func(self UInt256) { self.free() }) @@ -109,18 +171,28 @@ func NewUInt256() UInt256 { } func (self UInt256) free() { C.uint256_free(self.inner) } -func (self UInt256) UInt256IsNull() bool { return bool(C.uint256_is_null(self.inner)) } -func (self UInt256) UInt256IsEq(other UInt256) bool { return bool(C.uint256_is_eq(self.inner, other.inner)) } + +func (self UInt256) IsNull() bool { return bool(C.uint256_is_null(self.inner)) } + +// Check if two 256-bit integers are equal. +func (self UInt256) IsEq(other UInt256) bool { return bool(C.uint256_is_eq(self.inner, other.inner)) } + +// Write the integer to the given DataStream. func (self UInt256) Serialize(s DataStream) { C.uint256_serialize(self.inner, s.inner) } + +// Parse the integer from the given DataStream. func (self UInt256) Unserialize(s DataStream) { C.uint256_unserialize(self.inner, s.inner) } -func (self UInt256) IsEq(other UInt256) bool { return bool(C.uint256_is_eq(self.inner, other.inner)) } +// Get the Sha256 hash of the given DataStream content (without consuming the +// stream). func (self DataStream) GetHash() UInt256 { res := &uint256{ inner: C.datastream_get_hash(self.inner) } runtime.SetFinalizer(res, func(self UInt256) { self.free() }) return res } +// Get hexadicemal string representation of the given DataStream content +// (without consuming the stream). func (self DataStream) GetHex() string { tmp := C.datastream_get_hex(self.inner) res := C.GoString(tmp) @@ -128,6 +200,7 @@ func (self DataStream) GetHex() string { return res } +// Get hexadicemal string representation of the 256-bit integer. func (self UInt256) GetHex() string { s := NewDataStream() self.Serialize(s) diff --git a/test_msgnet/main.go b/test_msgnet/main.go index 4e52982..66d7287 100644 --- a/test_msgnet/main.go +++ b/test_msgnet/main.go @@ -34,10 +34,10 @@ func msgHelloSerialize(name string, text string) salticidae.Msg { } func msgHelloUnserialize(msg salticidae.Msg) (name string, text string) { - p := msg.ConsumePayload() - length := binary.LittleEndian.Uint32(p.GetDataInPlace(4)) - name = string(p.GetDataInPlace(int(length))) - text = string(p.GetDataInPlace(p.Size())) + p := msg.GetPayloadByMove() + t := p.GetDataInPlace(4); length := binary.LittleEndian.Uint32(t.Get()); t.Release() + t = p.GetDataInPlace(int(length)); name = string(t.Get()); t.Release() + t = p.GetDataInPlace(p.Size()); text = string(t.Get()); t.Release() return } @@ -79,7 +79,7 @@ func onReceiveHello(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ u name, text := msgHelloUnserialize(msg) fmt.Printf("[%s] %s says %s\n", myName, name, text) ack := msgAckSerialize() - net.SendMsgByMove(ack, conn) + net.SendMsg(ack, conn) } //export onReceiveAck @@ -106,7 +106,7 @@ func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, _ unsafe.P if conn.GetMode() == salticidae.CONN_MODE_ACTIVE { fmt.Printf("[%s] Connected, sending hello.\n", name) hello := msgHelloSerialize(name, "Hello there!") - n.net.SendMsgByMove(hello, conn) + n.net.SendMsg(hello, conn) } else { fmt.Printf("[%s] Accepted, waiting for greetings.\n", name) } diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go index 251f2e1..9e0757f 100644 --- a/test_p2p_stress/main.go +++ b/test_p2p_stress/main.go @@ -54,7 +54,7 @@ func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) { } func msgRandUnserialize(msg salticidae.Msg) salticidae.DataStream { - return msg.ConsumePayload() + return msg.GetPayloadByMove() } func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg { @@ -66,7 +66,7 @@ func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg { } func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 { - p := msg.ConsumePayload() + p := msg.GetPayloadByMove() hash := salticidae.NewUInt256() hash.Unserialize(p) return hash @@ -126,7 +126,7 @@ func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) { msg, hash := msgRandSerialize(size) tc := app.getTC(addr2id(conn.GetAddr())) tc.hash = hash - app.net.AsMsgNetwork().SendMsgByMove(msg, conn) + app.net.AsMsgNetwork().SendMsg(msg, conn) } var apps []AppContext @@ -157,7 +157,7 @@ func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, user conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)) net := conn.GetNet() ack := msgAckSerialize(msgRandUnserialize(msg).GetHash()) - net.SendMsgByMove(ack, conn) + net.SendMsg(ack, conn) } //export onReceiveAck |