diff options
-rw-r--r-- | Makefile | 1 | ||||
-rw-r--r-- | msg.go | 8 | ||||
-rw-r--r-- | network.go | 59 | ||||
m--------- | salticidae | 0 | ||||
-rw-r--r-- | stream.go | 1 | ||||
-rw-r--r-- | test_msgnet/main.go | 6 | ||||
-rw-r--r-- | test_p2p_stress/main.go | 68 |
7 files changed, 124 insertions, 19 deletions
@@ -11,3 +11,4 @@ build: build/test_msgnet: salticidae/libsalticidae.so go build -o $@ salticidae-go/test_msgnet + go build -o $@ salticidae-go/test_p2p_stress @@ -6,14 +6,14 @@ import "C" type Msg = *C.struct_msg_t -func NewMsg(opcode Opcode, _moved_payload ByteArray) Msg { - return C.msg_new(C._opcode_t(opcode), _moved_payload) +func NewMsgMovedFromByteArray(opcode Opcode, _moved_payload ByteArray) Msg { + return C.msg_new_moved_from_bytearray(C._opcode_t(opcode), _moved_payload) } func (self Msg) Free() { C.msg_free(self) } -func (self Msg) GetPayload() DataStream { - return C.msg_get_payload(self) +func (self Msg) ConsumePayload() DataStream { + return C.msg_consume_payload(self) } func (self Msg) GetOpcode() Opcode { @@ -34,6 +34,30 @@ func NewMsgNetworkConfig() MsgNetworkConfig { return C.msgnetwork_config_new() } func (self MsgNetworkConfig) Free() { C.msgnetwork_config_free(self) } +func (self MsgNetworkConfig) BurstSize(size int) { + C.msgnetwork_config_burst_size(self, C.size_t(size)) +} + +func (self MsgNetworkConfig) MaxListenBacklog(backlog int) { + C.msgnetwork_config_max_listen_backlog(self, C.int(backlog)) +} + +func (self MsgNetworkConfig) ConnServerTimeout(timeout float64) { + C.msgnetwork_config_conn_server_timeout(self, C.double(timeout)) +} + +func (self MsgNetworkConfig) SegBuffSize(size int) { + C.msgnetwork_config_seg_buff_size(self, C.size_t(size)) +} + +func (self MsgNetworkConfig) NWorker(nworker int) { + C.msgnetwork_config_nworker(self, C.size_t(nworker)) +} + +func (self MsgNetworkConfig) QueueCapacity(capacity int) { + C.msgnetwork_config_queue_capacity(self, C.size_t(capacity)) +} + func NewMsgNetwork(ec EventContext, config MsgNetworkConfig) MsgNetwork { return C.msgnetwork_new(ec, config) } @@ -61,10 +85,39 @@ type PeerNetwork = *C.struct_peernetwork_t type PeerNetworkConn = *C.struct_peernetwork_conn_t +type PeerNetworkIdMode = C.peernetwork_id_mode_t + +var ( + ID_MODE_IP_BASED = PeerNetworkIdMode(C.ID_MODE_IP_BASED) + ID_MODE_IP_PORT_BASED = PeerNetworkIdMode(C.ID_MODE_IP_PORT_BASED) +) + type PeerNetworkConfig = *C.struct_peernetwork_config_t func NewPeerNetworkConfig() PeerNetworkConfig { return C.peernetwork_config_new() } +func (self PeerNetworkConfig) Free() { C.peernetwork_config_free(self) } + +func (self PeerNetworkConfig) RetryConnDelay(t_sec float64) { + C.peernetwork_config_retry_conn_delay(self, C.double(t_sec)) +} + +func (self PeerNetworkConfig) PingPeriod(t_sec float64) { + C.peernetwork_config_ping_period(self, C.double(t_sec)) +} + +func (self PeerNetworkConfig) ConnTimeout(t_sec float64) { + C.peernetwork_config_conn_timeout(self, C.double(t_sec)) +} + +func (self PeerNetworkConfig) IdMode(mode PeerNetworkIdMode) { + C.peernetwork_config_id_mode(self, mode) +} + +func (self PeerNetworkConfig) AsMsgNetworkConfig() MsgNetworkConfig { + return C.peernetwork_config_as_msgnetwork_config(self) +} + func NewPeerNetwork(ec EventContext, config PeerNetworkConfig) PeerNetwork { return C.peernetwork_new(ec, config) } @@ -81,11 +134,11 @@ func (self PeerNetwork) AsMsgNetwork() MsgNetwork { return C.peernetwork_as_msgn func NewMsgNetworkConnFromPeerNetWorkConn(conn PeerNetworkConn) MsgNetworkConn { return C.msgnetwork_conn_new_from_peernetwork_conn(conn) } -func (self PeerNetwork) SendMsg(_moved_msg Msg, paddr NetAddr) { C.peernetwork_send_msg(self, _moved_msg, paddr) } +func (self PeerNetwork) SendMsgByMove(_moved_msg Msg, paddr NetAddr) { C.peernetwork_send_msg_by_move(self, _moved_msg, paddr) } -func (self PeerNetwork) MulticastMsg(_moved_msg Msg, paddrs []NetAddr) { +func (self PeerNetwork) MulticastMsgByMove(_moved_msg Msg, paddrs []NetAddr) { na := NewAddrArrayFromAddrs(paddrs) - C.peernetwork_multicast_msg(self, _moved_msg, na) + C.peernetwork_multicast_msg_by_move(self, _moved_msg, na) } func (self PeerNetwork) Listen(listenAddr NetAddr) { C.peernetwork_listen(self, listenAddr) } diff --git a/salticidae b/salticidae -Subproject ef377f5d85503451a16d50fbc535a7537a28b57 +Subproject 047791d61340f8b727be06d37b613b3914dec95 @@ -79,6 +79,7 @@ func (self DataStream) GetDataInPlace(length int) []byte { type UInt256 = *C.uint256_t func NewUInt256() UInt256 { return C.uint256_new() } +func (self UInt256) Free() { C.uint256_free(self) } func (self UInt256) UInt256IsNull() bool { return bool(C.uint256_is_null(self)) } func (self UInt256) UInt256IsEq(other UInt256) bool { return bool(C.uint256_is_eq(self, other)) } func (self UInt256) Serialize(s DataStream) { C.uint256_serialize(self, s) } diff --git a/test_msgnet/main.go b/test_msgnet/main.go index 230bd83..1ebdc74 100644 --- a/test_msgnet/main.go +++ b/test_msgnet/main.go @@ -38,13 +38,13 @@ func msgHelloSerialize(name string, text string) salticidae.Msg { serialized.PutData(t) serialized.PutData([]byte(name)) serialized.PutData([]byte(text)) - return salticidae.NewMsg( + return salticidae.NewMsgMovedFromByteArray( MSG_OPCODE_HELLO, salticidae.NewByteArrayMovedFromDataStream(serialized)) } func msgHelloUnserialize(msg salticidae.Msg) MsgHello { - p := msg.GetPayload() + p := msg.ConsumePayload() length := binary.LittleEndian.Uint32(p.GetDataInPlace(4)) name := string(p.GetDataInPlace(int(length))) text := string(p.GetDataInPlace(p.Size())) @@ -53,7 +53,7 @@ func msgHelloUnserialize(msg salticidae.Msg) MsgHello { } func msgAckSerialize() salticidae.Msg { - return salticidae.NewMsg(MSG_OPCODE_ACK, salticidae.NewByteArray()) + return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_ACK, salticidae.NewByteArray()) } type MyNet struct { diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go index d80699d..baa2a58 100644 --- a/test_p2p_stress/main.go +++ b/test_p2p_stress/main.go @@ -46,25 +46,25 @@ func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) { } serialized := salticidae.NewDataStreamFromBytes(buffer) hash := serialized.GetHash() - return salticidae.NewMsg( + return salticidae.NewMsgMovedFromByteArray( MSG_OPCODE_RAND, salticidae.NewByteArrayMovedFromDataStream(serialized)), hash } func msgRandUnserialize(msg salticidae.Msg) salticidae.DataStream { - return msg.GetPayload() + return msg.ConsumePayload() } func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg { serialized := salticidae.NewDataStream() hash.Serialize(serialized) - return salticidae.NewMsg( + return salticidae.NewMsgMovedFromByteArray( MSG_OPCODE_ACK, salticidae.NewByteArrayMovedFromDataStream(serialized)) } func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 { - p := msg.GetPayload() + p := msg.ConsumePayload() hash := salticidae.NewUInt256() hash.Unserialize(p) p.Free() @@ -73,11 +73,22 @@ func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 { type TestContext struct { timer salticidae.TimerEvent + timer_ctx *C.struct_timeout_callback_context_t state int hash salticidae.UInt256 ncompleted int } +func (self TestContext) Free() { + if self.timer != nil { + self.timer.Free() + C.free(unsafe.Pointer(self.timer_ctx)) + } + if self.hash != nil { + self.hash.Free() + } +} + type AppContext struct { addr salticidae.NetAddr ec salticidae.EventContext @@ -86,6 +97,16 @@ type AppContext struct { tc map[uint64] *TestContext } +func (self AppContext) Free() { + self.addr.Free() + self.net.Free() + self.tcall.Free() + for _, tc:= range self.tc { + tc.Free() + } + self.ec.Free() +} + func NewTestContext() TestContext { return TestContext { ncompleted: 0 } } @@ -107,8 +128,14 @@ func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) { func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) { msg, hash := msgRandSerialize(size) addr := conn.GetAddr() - app.getTC(addr2id(addr)).hash = hash + tc := app.getTC(addr2id(addr)) + addr.Free() + if tc.hash != nil { + salticidae.UInt256(tc.hash).Free() + } + tc.hash = hash app.net.AsMsgNetwork().SendMsg(msg, conn) + msg.Free() } var apps []AppContext @@ -133,10 +160,15 @@ func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) { //export onReceiveRand func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { msg := salticidae.Msg(_msg) - hash := msgRandUnserialize(msg).GetHash() + bytes := msgRandUnserialize(msg) + hash := bytes.GetHash() + bytes.Free() conn := salticidae.MsgNetworkConn(_conn) net := conn.GetNet() - net.SendMsg(msgAckSerialize(hash), conn) + ack := msgAckSerialize(hash) + net.SendMsg(ack, conn) + hash.Free() + ack.Free() } //export onReceiveAck @@ -147,12 +179,14 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd conn := salticidae.MsgNetworkConn(_conn) _addr := conn.GetAddr() addr := addr2id(_addr) + _addr.Free() tc := app.getTC(addr) if !hash.IsEq(tc.hash) { //fmt.Printf("%s %s\n", hash.GetHex(), tc.hash.GetHex()) panic("corrupted I/O!") } + hash.Free() if tc.state == seg_buff_size * 2 { sendRand(tc.state, app, conn) @@ -161,7 +195,12 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd ctx.app_id = C.int(id) ctx.addr_id = C.uint64_t(addr) ctx.conn = (*C.struct_msgnetwork_conn_t)(conn) + if tc.timer != nil { + tc.timer.Free() + C.free(unsafe.Pointer(tc.timer_ctx)) + } tc.timer = salticidae.NewTimerEvent(app.ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx)) + tc.timer_ctx = ctx t := rand.Float64() * 10 tc.timer.Add(t) fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t) @@ -182,6 +221,7 @@ func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata u if conn.GetMode() == salticidae.CONN_MODE_ACTIVE { addr := conn.GetAddr() tc := app.getTC(addr2id(addr)) + addr.Free() tc.state = 1 fmt.Printf("INFO: increasing phase\n") sendRand(tc.state, app, conn) @@ -216,7 +256,13 @@ func main() { salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i))) } netconfig := salticidae.NewPeerNetworkConfig() + nc := netconfig.AsMsgNetworkConfig() + nc.SegBuffSize(4096) + nc.NWorker(2) + netconfig.ConnTimeout(5) + netconfig.PingPeriod(2) apps = make([]AppContext, len(addrs)) + ids := make([](*C.int), len(addrs)) for i, addr := range addrs { ec := salticidae.NewEventContext() apps[i] = AppContext { @@ -226,14 +272,16 @@ func main() { tcall: salticidae.NewThreadCall(ec), tc: make(map[uint64] *TestContext), } - _i := (C.malloc(C.sizeof_int)) - *(*C.int)(_i) = C.int(i) + ids[i] = (*C.int)(C.malloc(C.sizeof_int)) + *ids[i] = C.int(i) + _i := unsafe.Pointer(ids[i]) net := apps[i].net.AsMsgNetwork() net.RegHandler(MSG_OPCODE_RAND, salticidae.MsgNetworkMsgCallback(C.onReceiveRand), _i) net.RegHandler(MSG_OPCODE_ACK, salticidae.MsgNetworkMsgCallback(C.onReceiveAck), _i) net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), _i) net.Start() } + netconfig.Free() threads.Add(len(apps)) for i, _ := range apps { @@ -247,6 +295,8 @@ func main() { } } a.ec.Dispatch() + a.Free() + C.free(unsafe.Pointer(ids[app_id])) threads.Done() }() } |