From b962e27a59c8fe49c95c6515577ce89898d9ae65 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 2 Jul 2019 01:06:59 -0400 Subject: update API --- bench_network/main.go | 10 ++-- msg.go | 6 +-- netaddr.go | 93 ++++++++++++++++++++++----------- network.go | 133 +++++++++++++++++++++++++++++++++--------------- salticidae | 2 +- test_msgnet/main.go | 9 ++-- test_msgnet_tls/main.go | 9 ++-- test_p2p_stress/main.go | 72 +++++++++++++------------- 8 files changed, 207 insertions(+), 127 deletions(-) diff --git a/bench_network/main.go b/bench_network/main.go index fc46ea0..d7d0925 100644 --- a/bench_network/main.go +++ b/bench_network/main.go @@ -98,15 +98,14 @@ func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata u if connected { if conn.GetMode() == salticidae.CONN_MODE_ACTIVE { fmt.Printf("[%s] connected, sending hello.\n", mynet.name) - mynet.conn = conn.Copy() + mynet.conn = conn.Copy(true) mynet.tcall.AsyncCall(salticidae.ThreadCallCallback(C.onTrigger), userdata) } else { fmt.Printf("[%s] passively connected, waiting for greetings.\n", mynet.name) } } else { fmt.Printf("[%s] disconnected, retrying.\n", mynet.name) - err := salticidae.NewError() - mynet.net.Connect(conn.GetAddr(), false, &err) + mynet.net.Connect(conn.GetAddr()) } return true } @@ -150,8 +149,7 @@ func main() { ec = salticidae.NewEventContext() err := salticidae.NewError() - aliceAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345", &err) - //bobAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346", &err) + aliceAddr := salticidae.NewNetAddrFromIPPortString("127.0.0.1:12345", true, &err) mynets = append(mynets, genMyNet(ec, "alice", 10, 0)) alice := &mynets[0] @@ -165,7 +163,7 @@ func main() { go func() { bob := &mynets[1] bob.net.Start() - bob.net.Connect(aliceAddr, false, &err); checkError(&err) + bob.net.Connect(aliceAddr) tec.Dispatch() bobThread <-struct{}{} }() diff --git a/msg.go b/msg.go index 23ddcc1..f7e6b1c 100644 --- a/msg.go +++ b/msg.go @@ -5,13 +5,13 @@ package salticidae import "C" import "runtime" -// The C pointer type for a Msg object +// The C pointer type for a Msg object. type CMsg = *C.msg_t type msg struct { inner CMsg autoFree bool } -// Message sent by MsgNetwork +// Message sent by MsgNetwork. type Msg = *msg // Convert an existing C pointer into a go object. Notice that when the go @@ -19,7 +19,7 @@ type Msg = *msg // the extent in which the given C pointer is valid. The C memory will not be // deallocated when the go object is finalized by GC. This applies to all other // "FromC" functions. -func MsgFromC(ptr *C.msg_t) Msg { return &msg{ inner: ptr } } +func MsgFromC(ptr CMsg) Msg { return &msg{ inner: ptr } } func msgSetFinalizer(res Msg, autoFree bool) { res.autoFree = autoFree diff --git a/netaddr.go b/netaddr.go index 7955334..cc69313 100644 --- a/netaddr.go +++ b/netaddr.go @@ -7,7 +7,10 @@ import "runtime" // The C pointer type for a NetAddr object type CNetAddr = *C.netaddr_t -type netAddr struct { inner CNetAddr } +type netAddr struct { + inner CNetAddr + autoFree bool +} // Network address object. type NetAddr = *netAddr @@ -20,44 +23,29 @@ func NetAddrFromC(ptr CNetAddr) NetAddr { return &netAddr{ inner: ptr } } -type netAddrArray struct { - inner *C.netaddr_array_t +func netAddrSetFinalizer(res NetAddr, autoFree bool) { + res.autoFree = autoFree + if res != nil && autoFree { + runtime.SetFinalizer(res, func(self NetAddr) { self.Free() }) + } } -// An array of network address. -type NetAddrArray = *netAddrArray - // Create NetAddr from a TCP socket format string (e.g. 127.0.0.1:8888). -func NewAddrFromIPPortString(addr string, err *Error) (res NetAddr) { +func NewNetAddrFromIPPortString(addr string, autoFree bool, err *Error) (res NetAddr) { c_str := C.CString(addr) res = &netAddr{ inner: C.netaddr_new_from_sipport(c_str, err) } C.free(rawptr_t(c_str)) - runtime.SetFinalizer(res, func(self NetAddr) { self.free() }) + netAddrSetFinalizer(res, autoFree) return } -// Convert a Go slice of net addresses to NetAddrArray. -func NewAddrArrayFromAddrs(arr []NetAddr) (res NetAddrArray) { - size := len(arr) - _arr := make([]CNetAddr, size) - for i, v := range arr { - _arr[i] = v.inner - } - if size > 0 { - // FIXME: here we assume struct of a single pointer has the same memory - // footprint the pointer - base := (**C.netaddr_t)(rawptr_t(&_arr[0])) - res = &netAddrArray{ inner: C.netaddr_array_new_from_addrs(base, C.size_t(size)) } - } else { - res = &netAddrArray{ inner: C.netaddr_array_new() } +func (self NetAddr) Free() { + C.netaddr_free(self.inner) + if self.autoFree { + runtime.SetFinalizer(self, nil) } - runtime.KeepAlive(_arr) - runtime.SetFinalizer(res, func(self NetAddrArray) { self.free() }) - return } -func (self NetAddr) free() { C.netaddr_free(self.inner) } - // Check if two addresses are the same. func (self NetAddr) IsEq(other NetAddr) bool { return bool(C.netaddr_is_eq(self.inner, other.inner)) } @@ -73,10 +61,55 @@ func (self NetAddr) GetPort() uint16 { return uint16(C.netaddr_get_port(self.inn // Make a copy of the object. This is required if you want to keep the NetAddr // returned (or passed as a callback parameter) by other salticidae methods // (such like MsgNetwork/PeerNetwork), unless those method return a moved object. -func (self NetAddr) Copy() NetAddr { +func (self NetAddr) Copy(autoFree bool) NetAddr { res := &netAddr{ inner: C.netaddr_copy(self.inner) } - runtime.SetFinalizer(res, func(self NetAddr) { self.free() }) + netAddrSetFinalizer(res, autoFree) return res } -func (self NetAddrArray) free() { C.netaddr_array_free(self.inner) } +// The C pointer type for a NetAddrArray object. +type CNetAddrArray = *C.netaddr_array_t +type netAddrArray struct { + inner CNetAddrArray + autoFree bool +} +// An array of network address. +type NetAddrArray = *netAddrArray + +func NetAddrArrayFromC(ptr CNetAddrArray) NetAddrArray { + return &netAddrArray{ inner: ptr } +} + +func netAddrArraySetFinalizer(res NetAddrArray, autoFree bool) { + res.autoFree = autoFree + if res != nil && autoFree { + runtime.SetFinalizer(res, func(self NetAddrArray) { self.Free() }) + } +} + +// Convert a Go slice of net addresses to NetAddrArray. +func NewNetAddrArrayFromAddrs(arr []NetAddr, autoFree bool) (res NetAddrArray) { + size := len(arr) + _arr := make([]CNetAddr, size) + for i, v := range arr { + _arr[i] = v.inner + } + if size > 0 { + // FIXME: here we assume struct of a single pointer has the same memory + // footprint the pointer + base := (**C.netaddr_t)(rawptr_t(&_arr[0])) + res = NetAddrArrayFromC(C.netaddr_array_new_from_addrs(base, C.size_t(size))) + } else { + res = NetAddrArrayFromC(C.netaddr_array_new()) + } + runtime.KeepAlive(_arr) + netAddrArraySetFinalizer(res, autoFree) + return +} + +func (self NetAddrArray) Free() { + C.netaddr_array_free(self.inner) + if self.autoFree { + runtime.SetFinalizer(self, nil) + } +} diff --git a/network.go b/network.go index 22e0c31..b123eca 100644 --- a/network.go +++ b/network.go @@ -22,7 +22,10 @@ func MsgNetworkFromC(ptr CMsgNetwork) MsgNetwork { // The C pointer type for a MsgNetworkConn handle. type CMsgNetworkConn = *C.msgnetwork_conn_t -type msgNetworkConn struct { inner CMsgNetworkConn } +type msgNetworkConn struct { + inner CMsgNetworkConn + autoFree bool +} // The handle for a message network connection. type MsgNetworkConn = *msgNetworkConn @@ -40,7 +43,19 @@ var ( // side. CONN_MODE_DEAD: a connection that is already closed. type MsgNetworkConnMode = C.msgnetwork_conn_mode_t -func (self MsgNetworkConn) free() { C.msgnetwork_conn_free(self.inner) } +func msgNetworkConnSetFinalizer(res MsgNetworkConn, autoFree bool) { + res.autoFree = autoFree + if res != nil && autoFree { + runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.Free() }) + } +} + +func (self MsgNetworkConn) Free() { + C.msgnetwork_conn_free(self.inner) + if self.autoFree { + runtime.SetFinalizer(self, nil) + } +} // Get the corresponding MsgNetwork handle that manages this connection. The // returned handle is only valid during the lifetime of this connection. @@ -74,9 +89,9 @@ func (self MsgNetworkConn) GetPeerCert() X509 { // Make a copy of the object. This is required if you want to keep the // connection passed as a callback parameter by other salticidae methods (such // like MsgNetwork/PeerNetwork). -func (self MsgNetworkConn) Copy() MsgNetworkConn { +func (self MsgNetworkConn) Copy(autoFree bool) MsgNetworkConn { res := MsgNetworkConnFromC(C.msgnetwork_conn_copy(self.inner)) - runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.free() }) + msgNetworkConnSetFinalizer(res, autoFree) return res } @@ -193,28 +208,32 @@ func (self MsgNetwork) Listen(addr NetAddr, err *Error) { C.msgnetwork_listen(se func (self MsgNetwork) Stop() { C.msgnetwork_stop(self.inner) } // Send a message through the given connection. -func (self MsgNetwork) SendMsg(msg Msg, conn MsgNetworkConn) { - C.msgnetwork_send_msg(self.inner, msg.inner, conn.inner) +func (self MsgNetwork) SendMsg(msg Msg, conn MsgNetworkConn) bool { + return bool(C.msgnetwork_send_msg(self.inner, msg.inner, conn.inner)) } // Send a message through the given connection, using a worker thread to // seralize and put data to the send buffer. The payload contained in the given // msg will be moved and sent. Thus, no methods of msg involving the payload // should be called afterwards. -func (self MsgNetwork) SendMsgDeferredByMove(msg Msg, conn MsgNetworkConn) { - C.msgnetwork_send_msg_deferred_by_move(self.inner, msg.inner, conn.inner) +func (self MsgNetwork) SendMsgDeferredByMove(msg Msg, conn MsgNetworkConn) int32 { + return int32(C.msgnetwork_send_msg_deferred_by_move(self.inner, msg.inner, conn.inner)) } // Try to connect to a remote address. The connection handle is returned. The // returned connection handle could be kept in your program. -func (self MsgNetwork) Connect(addr NetAddr, blocking bool, err *Error) MsgNetworkConn { - res := MsgNetworkConnFromC(C.msgnetwork_connect(self.inner, addr.inner, C.bool(blocking), err)) - if res != nil { - runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.free() }) - } +func (self MsgNetwork) ConnectSync(addr NetAddr, autoFree bool, err *Error) MsgNetworkConn { + res := MsgNetworkConnFromC(C.msgnetwork_connect_sync(self.inner, addr.inner, err)) + msgNetworkConnSetFinalizer(res, autoFree) return res } +// Try to connect to a remote address (async). It returns an id which could be +// used to identify the corresponding error in the error callback. +func (self MsgNetwork) Connect(addr NetAddr) int32 { + return int32(C.msgnetwork_connect(self.inner, addr.inner)) +} + // Terminate the given connection. func (self MsgNetwork) Terminate(conn MsgNetworkConn) { C.msgnetwork_terminate(self.inner, conn.inner) } @@ -258,7 +277,10 @@ func PeerNetworkFromC(ptr CPeerNetwork) PeerNetwork { // The C pointer type for a PeerNetworkConn handle. type CPeerNetworkConn = *C.peernetwork_conn_t -type peerNetworkConn struct { inner CPeerNetworkConn } +type peerNetworkConn struct { + inner CPeerNetworkConn + autoFree bool +} // The handle for a PeerNetwork connection. type PeerNetworkConn = *peerNetworkConn @@ -266,6 +288,13 @@ func PeerNetworkConnFromC(ptr CPeerNetworkConn) PeerNetworkConn { return &peerNetworkConn{ inner: ptr } } +func peerNetworkConnSetFinalizer(res PeerNetworkConn, autoFree bool) { + res.autoFree = autoFree + if res != nil && autoFree { + runtime.SetFinalizer(res, func(self PeerNetworkConn) { self.Free() }) + } +} + var ( ADDR_BASED = PeerNetworkIdMode(C.ID_MODE_ADDR_BASED) CERT_BASED = PeerNetworkIdMode(C.ID_MODE_CERT_BASED) @@ -348,11 +377,9 @@ func (self PeerNetwork) HasPeer(addr NetAddr) bool { return bool(C.peernetwork_h // Get the connection of the known peer. The connection handle is returned. The // returned connection handle could be kept in your program. -func (self PeerNetwork) GetPeerConn(addr NetAddr, err *Error) PeerNetworkConn { +func (self PeerNetwork) GetPeerConn(addr NetAddr, autoFree bool, err *Error) PeerNetworkConn { res := PeerNetworkConnFromC(C.peernetwork_get_peer_conn(self.inner, addr.inner, err)) - if res != nil { - runtime.SetFinalizer(res, func(self PeerNetworkConn) { self.free() }) - } + peerNetworkConnSetFinalizer(res, autoFree) return res } @@ -369,35 +396,40 @@ func (self MsgNetwork) AsPeerNetworkUnsafe() PeerNetwork { // Create a MsgNetworkConn handle from a PeerNetworkConn (representing the same // connection). -func NewMsgNetworkConnFromPeerNetworkConn(conn PeerNetworkConn) MsgNetworkConn { +func NewMsgNetworkConnFromPeerNetworkConn(conn PeerNetworkConn, autoFree bool) MsgNetworkConn { res := MsgNetworkConnFromC(C.msgnetwork_conn_new_from_peernetwork_conn(conn.inner)) - runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.free() }) + msgNetworkConnSetFinalizer(res, autoFree) return res } // Create a PeerNetworkConn handle from a MsgNetworkConn (representing the same // connection and forcing the conversion). -func NewPeerNetworkConnFromMsgNetworkConnUnsafe(conn MsgNetworkConn) PeerNetworkConn { +func NewPeerNetworkConnFromMsgNetworkConnUnsafe(conn MsgNetworkConn, autoFree bool) PeerNetworkConn { res := PeerNetworkConnFromC(C.peernetwork_conn_new_from_msgnetwork_conn_unsafe(conn.inner)) - runtime.SetFinalizer(res, func(self PeerNetworkConn) { self.free() }) + peerNetworkConnSetFinalizer(res, autoFree) return res } // Make a copy of the connection handle. -func (self PeerNetworkConn) Copy() PeerNetworkConn { +func (self PeerNetworkConn) Copy(autoFree bool) PeerNetworkConn { res := PeerNetworkConnFromC(C.peernetwork_conn_copy(self.inner)) - runtime.SetFinalizer(res, func(self PeerNetworkConn) { self.free() }) + peerNetworkConnSetFinalizer(res, autoFree) return res } // Get the listening address of the remote peer (no Copy() is needed). -func (self PeerNetworkConn) GetPeerAddr() NetAddr { +func (self PeerNetworkConn) GetPeerAddr(autoFree bool) NetAddr { res := NetAddrFromC(C.peernetwork_conn_get_peer_addr(self.inner)) - runtime.SetFinalizer(res, func(self NetAddr) { self.free() }) + netAddrSetFinalizer(res, autoFree) return res } -func (self PeerNetworkConn) free() { C.peernetwork_conn_free(self.inner) } +func (self PeerNetworkConn) Free() { + C.peernetwork_conn_free(self.inner) + if self.autoFree { + runtime.SetFinalizer(self, nil) + } +} // Listen to the specified network address. Notice that this method overrides // Listen() in MsgNetwork, so you should always call this one instead of @@ -415,16 +447,18 @@ func (self PeerNetwork) SendMsg(msg Msg, addr NetAddr) bool { // data to the send buffer. The payload contained in the given msg will be // moved and sent. Thus, no methods of msg involving the payload should be // called afterwards. -func (self PeerNetwork) SendMsgDeferredByMove(msg Msg, addr NetAddr) { - C.peernetwork_send_msg_deferred_by_move(self.inner, msg.inner, addr.inner) +func (self PeerNetwork) SendMsgDeferredByMove(msg Msg, addr NetAddr) int32 { + return int32(C.peernetwork_send_msg_deferred_by_move(self.inner, msg.inner, addr.inner)) } // Send a message to the given list of peers. The payload contained in the // given msg will be moved and sent. Thus, no methods of msg involving the // payload should be called afterwards. -func (self PeerNetwork) MulticastMsgByMove(msg Msg, addrs []NetAddr) { - na := NewAddrArrayFromAddrs(addrs) - C.peernetwork_multicast_msg_by_move(self.inner, msg.inner, na.inner) +func (self PeerNetwork) MulticastMsgByMove(msg Msg, addrs []NetAddr) (res int32) { + na := NewNetAddrArrayFromAddrs(addrs, false) + res = int32(C.peernetwork_multicast_msg_by_move(self.inner, msg.inner, na.inner)) + na.Free() + return res } // The C function pointer type which takes peernetwork_conn_t*, bool and void* @@ -460,7 +494,10 @@ func ClientNetworkFromC(ptr CClientNetwork) ClientNetwork { // The C pointer type for a ClientNetworkConn handle. type CClientNetworkConn = *C.clientnetwork_conn_t -type clientNetworkConn struct { inner CClientNetworkConn } +type clientNetworkConn struct { + inner CClientNetworkConn + autoFree bool +} // The handle for a ClientNetwork connection. type ClientNetworkConn = *clientNetworkConn @@ -468,6 +505,13 @@ func ClientNetworkConnFromC(ptr CClientNetworkConn) ClientNetworkConn { return &clientNetworkConn{ inner: ptr } } +func clientNetworkConnSetFinalizer(res ClientNetworkConn, autoFree bool) { + res.autoFree = autoFree + if res != nil && autoFree { + runtime.SetFinalizer(res, func(self ClientNetworkConn) { self.Free() }) + } +} + // Create a client-server message network handle. func NewClientNetwork(ec EventContext, config MsgNetworkConfig, err *Error) ClientNetwork { res := ClientNetworkFromC(C.clientnetwork_new(ec.inner, config.inner, err)) @@ -493,28 +537,33 @@ func (self MsgNetwork) AsClientNetworkUnsafe() ClientNetwork { // Create a MsgNetworkConn handle from a ClientNetworkConn (representing the same // connection). -func NewMsgNetworkConnFromClientNetworkConn(conn ClientNetworkConn) MsgNetworkConn { +func NewMsgNetworkConnFromClientNetworkConn(conn ClientNetworkConn, autoFree bool) MsgNetworkConn { res := MsgNetworkConnFromC(C.msgnetwork_conn_new_from_clientnetwork_conn(conn.inner)) - runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.free() }) + msgNetworkConnSetFinalizer(res, autoFree) return res } // Create a ClientNetworkConn handle from a MsgNetworkConn (representing the same // connection and forcing the conversion). -func NewClientNetworkConnFromMsgNetworkConnUnsafe(conn MsgNetworkConn) ClientNetworkConn { +func NewClientNetworkConnFromMsgNetworkConnUnsafe(conn MsgNetworkConn, autoFree bool) ClientNetworkConn { res := ClientNetworkConnFromC(C.clientnetwork_conn_new_from_msgnetwork_conn_unsafe(conn.inner)) - runtime.SetFinalizer(res, func(self ClientNetworkConn) { self.free() }) + clientNetworkConnSetFinalizer(res, autoFree) return res } // Make a copy of the connection handle. -func (self ClientNetworkConn) Copy() ClientNetworkConn { +func (self ClientNetworkConn) Copy(autoFree bool) ClientNetworkConn { res := ClientNetworkConnFromC(C.clientnetwork_conn_copy(self.inner)) - runtime.SetFinalizer(res, func(self ClientNetworkConn) { self.free() }) + clientNetworkConnSetFinalizer(res, autoFree) return res } -func (self ClientNetworkConn) free() { C.clientnetwork_conn_free(self.inner) } +func (self ClientNetworkConn) Free() { + C.clientnetwork_conn_free(self.inner) + if self.autoFree { + runtime.SetFinalizer(self, nil) + } +} // Send a message to the given client. func (self ClientNetwork) SendMsg(msg Msg, addr NetAddr) bool { @@ -525,6 +574,6 @@ func (self ClientNetwork) SendMsg(msg Msg, addr NetAddr) bool { // data to the send buffer. The payload contained in the given msg will be // moved and sent. Thus, no methods of msg involving the payload should be // called afterwards. -func (self ClientNetwork) SendMsgDeferredByMove(msg Msg, addr NetAddr) { - C.clientnetwork_send_msg_deferred_by_move(self.inner, msg.inner, addr.inner) +func (self ClientNetwork) SendMsgDeferredByMove(msg Msg, addr NetAddr) int32 { + return int32(C.clientnetwork_send_msg_deferred_by_move(self.inner, msg.inner, addr.inner)) } diff --git a/salticidae b/salticidae index 624550d..53f7769 160000 --- a/salticidae +++ b/salticidae @@ -1 +1 @@ -Subproject commit 624550d7e1c032a4d3896ba002aa3ecb895f447c +Subproject commit 53f776997d0e92650b9f3a16224cef1c0c76b716 diff --git a/test_msgnet/main.go b/test_msgnet/main.go index 5b31c2c..e35212f 100644 --- a/test_msgnet/main.go +++ b/test_msgnet/main.go @@ -102,8 +102,7 @@ func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata u } } else { fmt.Printf("[%s] disconnected, retrying.\n", myName) - err := salticidae.NewError() - net.Connect(conn.GetAddr(), false, &err) + net.Connect(conn.GetAddr()) } return true } @@ -131,7 +130,7 @@ func genMyNet(ec salticidae.EventContext, n.net.Start() n.net.Listen(myAddr, &err); checkError(&err) - n.net.Connect(otherAddr, false, &err); checkError(&err) + n.net.Connect(otherAddr) return n } @@ -139,8 +138,8 @@ func main() { ec = salticidae.NewEventContext() err := salticidae.NewError() - aliceAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345", &err) - bobAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346", &err) + aliceAddr := salticidae.NewNetAddrFromIPPortString("127.0.0.1:12345", true, &err) + bobAddr := salticidae.NewNetAddrFromIPPortString("127.0.0.1:12346", true, &err) alice = genMyNet(ec, "alice", aliceAddr, bobAddr) bob = genMyNet(ec, "bob", bobAddr, aliceAddr) diff --git a/test_msgnet_tls/main.go b/test_msgnet_tls/main.go index 78601a2..1b434b2 100644 --- a/test_msgnet_tls/main.go +++ b/test_msgnet_tls/main.go @@ -110,8 +110,7 @@ func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata u } } else { fmt.Printf("[%s] disconnected, retrying.\n", myName) - err := salticidae.NewError() - net.Connect(conn.GetAddr(), false, &err) + net.Connect(conn.GetAddr()) } return C.bool(res) } @@ -143,7 +142,7 @@ func genMyNet(ec salticidae.EventContext, n.net.Start() n.net.Listen(myAddr, &err); checkError(&err) - n.net.Connect(otherAddr, false, &err); checkError(&err) + n.net.Connect(otherAddr) return n } @@ -151,8 +150,8 @@ func main() { ec = salticidae.NewEventContext() err := salticidae.NewError() - aliceAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345", &err) - bobAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346", &err) + aliceAddr := salticidae.NewNetAddrFromIPPortString("127.0.0.1:12345", true, &err) + bobAddr := salticidae.NewNetAddrFromIPPortString("127.0.0.1:12346", true, &err) alice = genMyNet(ec, "alice", "ed5a9a8c7429dcb235a88244bc69d43d16b35008ce49736b27aaa3042a674043", aliceAddr, bobAddr) bob = genMyNet(ec, "bob", "ef3bea4e72f4d0e85da7643545312e2ff6dded5e176560bdffb1e53b1cef4896", bobAddr, aliceAddr) diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go index 29cfbfa..6f14371 100644 --- a/test_p2p_stress/main.go +++ b/test_p2p_stress/main.go @@ -11,7 +11,7 @@ package main // void onReceiveAck(msg_t *, msgnetwork_conn_t *, void *); // void onStopLoop(threadcall_handle_t *, void *); // void peerHandler(peernetwork_conn_t *, bool, void *); -// void errorHandler(SalticidaeCError *, bool, void *); +// void errorHandler(SalticidaeCError *, bool, int32_t, void *); // void onTimeout(timerev_t *, void *); // typedef struct timeout_callback_context_t { // int app_id; @@ -42,7 +42,7 @@ const ( ) func msgRandSerialize(view uint32, size int) (salticidae.Msg, salticidae.UInt256) { - serialized := salticidae.NewDataStream(true) + serialized := salticidae.NewDataStream(false); defer serialized.Free() serialized.PutU32(salticidae.ToLittleEndianU32(view)) buffer := make([]byte, size) _, err := rand.Read(buffer) @@ -50,27 +50,26 @@ func msgRandSerialize(view uint32, size int) (salticidae.Msg, salticidae.UInt256 panic("rand source failed") } serialized.PutData(buffer) - hash := salticidae.NewByteArrayFromBytes(buffer, true).GetHash(true) - return salticidae.NewMsgMovedFromByteArray( - MSG_OPCODE_RAND, - salticidae.NewByteArrayMovedFromDataStream(serialized, true), true), hash + ba := salticidae.NewByteArrayFromBytes(buffer, false); defer ba.Free() + payload := salticidae.NewByteArrayMovedFromDataStream(serialized, false); defer payload.Free() + return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_RAND, payload, true), ba.GetHash(true) } func msgRandUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) { d := msg.GetPayloadByMove() succ := true view = salticidae.FromLittleEndianU32(d.GetU32(&succ)) - hash = salticidae.NewByteArrayCopiedFromDataStream(d, true).GetHash(true) + ba := salticidae.NewByteArrayCopiedFromDataStream(d, false); defer ba.Free() + hash = ba.GetHash(true) return } func msgAckSerialize(view uint32, hash salticidae.UInt256) salticidae.Msg { - serialized := salticidae.NewDataStream(true) + serialized := salticidae.NewDataStream(false); defer serialized.Free() serialized.PutU32(salticidae.ToLittleEndianU32(view)) hash.Serialize(serialized) - return salticidae.NewMsgMovedFromByteArray( - MSG_OPCODE_ACK, - salticidae.NewByteArrayMovedFromDataStream(serialized, true), true) + payload := salticidae.NewByteArrayMovedFromDataStream(serialized, false); defer payload.Free() + return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_ACK, payload, true) } func msgAckUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) { @@ -133,8 +132,7 @@ func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) { return } -func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) { - tc := app.getTC(addr2id(conn.GetAddr())) +func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn, tc *TestContext) { msg, hash := msgRandSerialize(tc.view, size) tc.hash = hash app.net.AsMsgNetwork().SendMsg(msg, conn) @@ -149,7 +147,7 @@ var ids []*C.int //export onTimeout func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) { ctx := (*C.struct_timeout_callback_context_t)(userdata) - app := &apps[ctx.app_id] + app := &apps[int(ctx.app_id)] tc := app.getTC(uint64(ctx.addr_id)) tc.ncompleted++ app.net.AsMsgNetwork().Terminate( @@ -163,7 +161,7 @@ func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) { } //export onReceiveRand -func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { +func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) { msg := salticidae.MsgFromC(salticidae.CMsg(_msg)) conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)) net := conn.GetNet() @@ -175,14 +173,17 @@ func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, user //export onReceiveAck func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { view, hash := msgAckUnserialize(salticidae.MsgFromC(salticidae.CMsg(_msg))) - id := *(* int)(userdata) + id := int(*(*C.int)(userdata)) app := &apps[id] conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)) - addr := addr2id(conn.GetAddr()) - tc := app.getTC(addr) + pconn := salticidae.NewPeerNetworkConnFromMsgNetworkConnUnsafe(conn, false); defer pconn.Free() + addr := pconn.GetPeerAddr(false); defer addr.Free() + if addr.IsNull() { return } + addrID := addr2id(addr) + tc := app.getTC(addrID) if view != tc.view { - fmt.Printf("dropping stale MsgAck") + fmt.Printf("dropping stale MsgAck\n") return } @@ -191,11 +192,11 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd } if tc.state == segBuffSize * 2 { - sendRand(tc.state, app, conn) + sendRand(tc.state, app, conn, tc) tc.state = -1 ctx := C.timeout_callback_context_new() ctx.app_id = C.int(id) - ctx.addr_id = C.uint64_t(addr) + ctx.addr_id = C.uint64_t(addrID) ctx.conn = C.msgnetwork_conn_copy(_conn) if tc.timer != nil { C.msgnetwork_conn_free(tc.timer_ctx.conn) @@ -207,39 +208,40 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd tc.timer.Add(t) fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t) } else if tc.state == -1 { - sendRand(rand.Int() % (segBuffSize * 10), app, conn) + sendRand(rand.Int() % (segBuffSize * 10), app, conn, tc) } else { tc.state++ - sendRand(tc.state, app, conn) + sendRand(tc.state, app, conn, tc) } } //export peerHandler func peerHandler(_conn *C.struct_peernetwork_conn_t, connected C.bool, userdata unsafe.Pointer) { if connected { - conn := salticidae.NewMsgNetworkConnFromPeerNetworkConn(salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))) - id := *(*int)(userdata) + pconn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn)) + conn := salticidae.NewMsgNetworkConnFromPeerNetworkConn(pconn, false); defer conn.Free() + id := int(*(*C.int)(userdata)) app := &apps[id] - if conn.GetMode() == salticidae.CONN_MODE_ACTIVE { - tc := app.getTC(addr2id(conn.GetAddr())) - tc.state = 1 - fmt.Printf("INFO: increasing phase\n") - sendRand(tc.state, app, conn) - } + addr := pconn.GetPeerAddr(false); defer addr.Free() + tc := app.getTC(addr2id(addr)) + tc.state = 1 + tc.view++ + sendRand(tc.state, app, conn, tc) } } //export errorHandler -func errorHandler(_err *C.struct_SalticidaeCError, fatal C.bool, _ unsafe.Pointer) { +func errorHandler(_err *C.struct_SalticidaeCError, fatal C.bool, asyncID C.int32_t, _ unsafe.Pointer) { err := (*salticidae.Error)(unsafe.Pointer(_err)) s := "recoverable" if fatal { s = "fatal" } - fmt.Printf("Captured %s error during an async call: %s\n", s, salticidae.StrError(err.GetCode())) + fmt.Printf("Captured %s error during an async call %d: %s\n", s, asyncID, salticidae.StrError(err.GetCode())) } //export onStopLoop func onStopLoop(_ *C.threadcall_handle_t, userdata unsafe.Pointer) { - ec := apps[*(*int)(userdata)].ec + id := int(*(*C.int)(userdata)) + ec := apps[id].ec ec.Stop() } @@ -262,7 +264,7 @@ func main() { var addrs []salticidae.NetAddr for i := 0; i < 5; i++ { addrs = append(addrs, - salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i), &err)) + salticidae.NewNetAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i), true, &err)) } netconfig := salticidae.NewPeerNetworkConfig() nc := netconfig.AsMsgNetworkConfig() -- cgit v1.2.3