aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile1
-rw-r--r--msg.go8
-rw-r--r--network.go59
m---------salticidae0
-rw-r--r--stream.go1
-rw-r--r--test_msgnet/main.go6
-rw-r--r--test_p2p_stress/main.go68
7 files changed, 124 insertions, 19 deletions
diff --git a/Makefile b/Makefile
index be70995..fd1399b 100644
--- a/Makefile
+++ b/Makefile
@@ -11,3 +11,4 @@ build:
build/test_msgnet: salticidae/libsalticidae.so
go build -o $@ salticidae-go/test_msgnet
+ go build -o $@ salticidae-go/test_p2p_stress
diff --git a/msg.go b/msg.go
index d846399..c3c0811 100644
--- a/msg.go
+++ b/msg.go
@@ -6,14 +6,14 @@ import "C"
type Msg = *C.struct_msg_t
-func NewMsg(opcode Opcode, _moved_payload ByteArray) Msg {
- return C.msg_new(C._opcode_t(opcode), _moved_payload)
+func NewMsgMovedFromByteArray(opcode Opcode, _moved_payload ByteArray) Msg {
+ return C.msg_new_moved_from_bytearray(C._opcode_t(opcode), _moved_payload)
}
func (self Msg) Free() { C.msg_free(self) }
-func (self Msg) GetPayload() DataStream {
- return C.msg_get_payload(self)
+func (self Msg) ConsumePayload() DataStream {
+ return C.msg_consume_payload(self)
}
func (self Msg) GetOpcode() Opcode {
diff --git a/network.go b/network.go
index 262e460..c8fae40 100644
--- a/network.go
+++ b/network.go
@@ -34,6 +34,30 @@ func NewMsgNetworkConfig() MsgNetworkConfig { return C.msgnetwork_config_new() }
func (self MsgNetworkConfig) Free() { C.msgnetwork_config_free(self) }
+func (self MsgNetworkConfig) BurstSize(size int) {
+ C.msgnetwork_config_burst_size(self, C.size_t(size))
+}
+
+func (self MsgNetworkConfig) MaxListenBacklog(backlog int) {
+ C.msgnetwork_config_max_listen_backlog(self, C.int(backlog))
+}
+
+func (self MsgNetworkConfig) ConnServerTimeout(timeout float64) {
+ C.msgnetwork_config_conn_server_timeout(self, C.double(timeout))
+}
+
+func (self MsgNetworkConfig) SegBuffSize(size int) {
+ C.msgnetwork_config_seg_buff_size(self, C.size_t(size))
+}
+
+func (self MsgNetworkConfig) NWorker(nworker int) {
+ C.msgnetwork_config_nworker(self, C.size_t(nworker))
+}
+
+func (self MsgNetworkConfig) QueueCapacity(capacity int) {
+ C.msgnetwork_config_queue_capacity(self, C.size_t(capacity))
+}
+
func NewMsgNetwork(ec EventContext, config MsgNetworkConfig) MsgNetwork {
return C.msgnetwork_new(ec, config)
}
@@ -61,10 +85,39 @@ type PeerNetwork = *C.struct_peernetwork_t
type PeerNetworkConn = *C.struct_peernetwork_conn_t
+type PeerNetworkIdMode = C.peernetwork_id_mode_t
+
+var (
+ ID_MODE_IP_BASED = PeerNetworkIdMode(C.ID_MODE_IP_BASED)
+ ID_MODE_IP_PORT_BASED = PeerNetworkIdMode(C.ID_MODE_IP_PORT_BASED)
+)
+
type PeerNetworkConfig = *C.struct_peernetwork_config_t
func NewPeerNetworkConfig() PeerNetworkConfig { return C.peernetwork_config_new() }
+func (self PeerNetworkConfig) Free() { C.peernetwork_config_free(self) }
+
+func (self PeerNetworkConfig) RetryConnDelay(t_sec float64) {
+ C.peernetwork_config_retry_conn_delay(self, C.double(t_sec))
+}
+
+func (self PeerNetworkConfig) PingPeriod(t_sec float64) {
+ C.peernetwork_config_ping_period(self, C.double(t_sec))
+}
+
+func (self PeerNetworkConfig) ConnTimeout(t_sec float64) {
+ C.peernetwork_config_conn_timeout(self, C.double(t_sec))
+}
+
+func (self PeerNetworkConfig) IdMode(mode PeerNetworkIdMode) {
+ C.peernetwork_config_id_mode(self, mode)
+}
+
+func (self PeerNetworkConfig) AsMsgNetworkConfig() MsgNetworkConfig {
+ return C.peernetwork_config_as_msgnetwork_config(self)
+}
+
func NewPeerNetwork(ec EventContext, config PeerNetworkConfig) PeerNetwork {
return C.peernetwork_new(ec, config)
}
@@ -81,11 +134,11 @@ func (self PeerNetwork) AsMsgNetwork() MsgNetwork { return C.peernetwork_as_msgn
func NewMsgNetworkConnFromPeerNetWorkConn(conn PeerNetworkConn) MsgNetworkConn { return C.msgnetwork_conn_new_from_peernetwork_conn(conn) }
-func (self PeerNetwork) SendMsg(_moved_msg Msg, paddr NetAddr) { C.peernetwork_send_msg(self, _moved_msg, paddr) }
+func (self PeerNetwork) SendMsgByMove(_moved_msg Msg, paddr NetAddr) { C.peernetwork_send_msg_by_move(self, _moved_msg, paddr) }
-func (self PeerNetwork) MulticastMsg(_moved_msg Msg, paddrs []NetAddr) {
+func (self PeerNetwork) MulticastMsgByMove(_moved_msg Msg, paddrs []NetAddr) {
na := NewAddrArrayFromAddrs(paddrs)
- C.peernetwork_multicast_msg(self, _moved_msg, na)
+ C.peernetwork_multicast_msg_by_move(self, _moved_msg, na)
}
func (self PeerNetwork) Listen(listenAddr NetAddr) { C.peernetwork_listen(self, listenAddr) }
diff --git a/salticidae b/salticidae
-Subproject ef377f5d85503451a16d50fbc535a7537a28b57
+Subproject 047791d61340f8b727be06d37b613b3914dec95
diff --git a/stream.go b/stream.go
index 6a5d549..94b9281 100644
--- a/stream.go
+++ b/stream.go
@@ -79,6 +79,7 @@ func (self DataStream) GetDataInPlace(length int) []byte {
type UInt256 = *C.uint256_t
func NewUInt256() UInt256 { return C.uint256_new() }
+func (self UInt256) Free() { C.uint256_free(self) }
func (self UInt256) UInt256IsNull() bool { return bool(C.uint256_is_null(self)) }
func (self UInt256) UInt256IsEq(other UInt256) bool { return bool(C.uint256_is_eq(self, other)) }
func (self UInt256) Serialize(s DataStream) { C.uint256_serialize(self, s) }
diff --git a/test_msgnet/main.go b/test_msgnet/main.go
index 230bd83..1ebdc74 100644
--- a/test_msgnet/main.go
+++ b/test_msgnet/main.go
@@ -38,13 +38,13 @@ func msgHelloSerialize(name string, text string) salticidae.Msg {
serialized.PutData(t)
serialized.PutData([]byte(name))
serialized.PutData([]byte(text))
- return salticidae.NewMsg(
+ return salticidae.NewMsgMovedFromByteArray(
MSG_OPCODE_HELLO,
salticidae.NewByteArrayMovedFromDataStream(serialized))
}
func msgHelloUnserialize(msg salticidae.Msg) MsgHello {
- p := msg.GetPayload()
+ p := msg.ConsumePayload()
length := binary.LittleEndian.Uint32(p.GetDataInPlace(4))
name := string(p.GetDataInPlace(int(length)))
text := string(p.GetDataInPlace(p.Size()))
@@ -53,7 +53,7 @@ func msgHelloUnserialize(msg salticidae.Msg) MsgHello {
}
func msgAckSerialize() salticidae.Msg {
- return salticidae.NewMsg(MSG_OPCODE_ACK, salticidae.NewByteArray())
+ return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_ACK, salticidae.NewByteArray())
}
type MyNet struct {
diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go
index d80699d..baa2a58 100644
--- a/test_p2p_stress/main.go
+++ b/test_p2p_stress/main.go
@@ -46,25 +46,25 @@ func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) {
}
serialized := salticidae.NewDataStreamFromBytes(buffer)
hash := serialized.GetHash()
- return salticidae.NewMsg(
+ return salticidae.NewMsgMovedFromByteArray(
MSG_OPCODE_RAND,
salticidae.NewByteArrayMovedFromDataStream(serialized)), hash
}
func msgRandUnserialize(msg salticidae.Msg) salticidae.DataStream {
- return msg.GetPayload()
+ return msg.ConsumePayload()
}
func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg {
serialized := salticidae.NewDataStream()
hash.Serialize(serialized)
- return salticidae.NewMsg(
+ return salticidae.NewMsgMovedFromByteArray(
MSG_OPCODE_ACK,
salticidae.NewByteArrayMovedFromDataStream(serialized))
}
func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 {
- p := msg.GetPayload()
+ p := msg.ConsumePayload()
hash := salticidae.NewUInt256()
hash.Unserialize(p)
p.Free()
@@ -73,11 +73,22 @@ func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 {
type TestContext struct {
timer salticidae.TimerEvent
+ timer_ctx *C.struct_timeout_callback_context_t
state int
hash salticidae.UInt256
ncompleted int
}
+func (self TestContext) Free() {
+ if self.timer != nil {
+ self.timer.Free()
+ C.free(unsafe.Pointer(self.timer_ctx))
+ }
+ if self.hash != nil {
+ self.hash.Free()
+ }
+}
+
type AppContext struct {
addr salticidae.NetAddr
ec salticidae.EventContext
@@ -86,6 +97,16 @@ type AppContext struct {
tc map[uint64] *TestContext
}
+func (self AppContext) Free() {
+ self.addr.Free()
+ self.net.Free()
+ self.tcall.Free()
+ for _, tc:= range self.tc {
+ tc.Free()
+ }
+ self.ec.Free()
+}
+
func NewTestContext() TestContext {
return TestContext { ncompleted: 0 }
}
@@ -107,8 +128,14 @@ func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) {
func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) {
msg, hash := msgRandSerialize(size)
addr := conn.GetAddr()
- app.getTC(addr2id(addr)).hash = hash
+ tc := app.getTC(addr2id(addr))
+ addr.Free()
+ if tc.hash != nil {
+ salticidae.UInt256(tc.hash).Free()
+ }
+ tc.hash = hash
app.net.AsMsgNetwork().SendMsg(msg, conn)
+ msg.Free()
}
var apps []AppContext
@@ -133,10 +160,15 @@ func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) {
//export onReceiveRand
func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
msg := salticidae.Msg(_msg)
- hash := msgRandUnserialize(msg).GetHash()
+ bytes := msgRandUnserialize(msg)
+ hash := bytes.GetHash()
+ bytes.Free()
conn := salticidae.MsgNetworkConn(_conn)
net := conn.GetNet()
- net.SendMsg(msgAckSerialize(hash), conn)
+ ack := msgAckSerialize(hash)
+ net.SendMsg(ack, conn)
+ hash.Free()
+ ack.Free()
}
//export onReceiveAck
@@ -147,12 +179,14 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd
conn := salticidae.MsgNetworkConn(_conn)
_addr := conn.GetAddr()
addr := addr2id(_addr)
+ _addr.Free()
tc := app.getTC(addr)
if !hash.IsEq(tc.hash) {
//fmt.Printf("%s %s\n", hash.GetHex(), tc.hash.GetHex())
panic("corrupted I/O!")
}
+ hash.Free()
if tc.state == seg_buff_size * 2 {
sendRand(tc.state, app, conn)
@@ -161,7 +195,12 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd
ctx.app_id = C.int(id)
ctx.addr_id = C.uint64_t(addr)
ctx.conn = (*C.struct_msgnetwork_conn_t)(conn)
+ if tc.timer != nil {
+ tc.timer.Free()
+ C.free(unsafe.Pointer(tc.timer_ctx))
+ }
tc.timer = salticidae.NewTimerEvent(app.ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx))
+ tc.timer_ctx = ctx
t := rand.Float64() * 10
tc.timer.Add(t)
fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t)
@@ -182,6 +221,7 @@ func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata u
if conn.GetMode() == salticidae.CONN_MODE_ACTIVE {
addr := conn.GetAddr()
tc := app.getTC(addr2id(addr))
+ addr.Free()
tc.state = 1
fmt.Printf("INFO: increasing phase\n")
sendRand(tc.state, app, conn)
@@ -216,7 +256,13 @@ func main() {
salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i)))
}
netconfig := salticidae.NewPeerNetworkConfig()
+ nc := netconfig.AsMsgNetworkConfig()
+ nc.SegBuffSize(4096)
+ nc.NWorker(2)
+ netconfig.ConnTimeout(5)
+ netconfig.PingPeriod(2)
apps = make([]AppContext, len(addrs))
+ ids := make([](*C.int), len(addrs))
for i, addr := range addrs {
ec := salticidae.NewEventContext()
apps[i] = AppContext {
@@ -226,14 +272,16 @@ func main() {
tcall: salticidae.NewThreadCall(ec),
tc: make(map[uint64] *TestContext),
}
- _i := (C.malloc(C.sizeof_int))
- *(*C.int)(_i) = C.int(i)
+ ids[i] = (*C.int)(C.malloc(C.sizeof_int))
+ *ids[i] = C.int(i)
+ _i := unsafe.Pointer(ids[i])
net := apps[i].net.AsMsgNetwork()
net.RegHandler(MSG_OPCODE_RAND, salticidae.MsgNetworkMsgCallback(C.onReceiveRand), _i)
net.RegHandler(MSG_OPCODE_ACK, salticidae.MsgNetworkMsgCallback(C.onReceiveAck), _i)
net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), _i)
net.Start()
}
+ netconfig.Free()
threads.Add(len(apps))
for i, _ := range apps {
@@ -247,6 +295,8 @@ func main() {
}
}
a.ec.Dispatch()
+ a.Free()
+ C.free(unsafe.Pointer(ids[app_id]))
threads.Done()
}()
}