aboutsummaryrefslogtreecommitdiff
path: root/test_p2p_stress
diff options
context:
space:
mode:
Diffstat (limited to 'test_p2p_stress')
-rw-r--r--test_p2p_stress/main.go68
1 files changed, 59 insertions, 9 deletions
diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go
index d80699d..baa2a58 100644
--- a/test_p2p_stress/main.go
+++ b/test_p2p_stress/main.go
@@ -46,25 +46,25 @@ func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) {
}
serialized := salticidae.NewDataStreamFromBytes(buffer)
hash := serialized.GetHash()
- return salticidae.NewMsg(
+ return salticidae.NewMsgMovedFromByteArray(
MSG_OPCODE_RAND,
salticidae.NewByteArrayMovedFromDataStream(serialized)), hash
}
func msgRandUnserialize(msg salticidae.Msg) salticidae.DataStream {
- return msg.GetPayload()
+ return msg.ConsumePayload()
}
func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg {
serialized := salticidae.NewDataStream()
hash.Serialize(serialized)
- return salticidae.NewMsg(
+ return salticidae.NewMsgMovedFromByteArray(
MSG_OPCODE_ACK,
salticidae.NewByteArrayMovedFromDataStream(serialized))
}
func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 {
- p := msg.GetPayload()
+ p := msg.ConsumePayload()
hash := salticidae.NewUInt256()
hash.Unserialize(p)
p.Free()
@@ -73,11 +73,22 @@ func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 {
type TestContext struct {
timer salticidae.TimerEvent
+ timer_ctx *C.struct_timeout_callback_context_t
state int
hash salticidae.UInt256
ncompleted int
}
+func (self TestContext) Free() {
+ if self.timer != nil {
+ self.timer.Free()
+ C.free(unsafe.Pointer(self.timer_ctx))
+ }
+ if self.hash != nil {
+ self.hash.Free()
+ }
+}
+
type AppContext struct {
addr salticidae.NetAddr
ec salticidae.EventContext
@@ -86,6 +97,16 @@ type AppContext struct {
tc map[uint64] *TestContext
}
+func (self AppContext) Free() {
+ self.addr.Free()
+ self.net.Free()
+ self.tcall.Free()
+ for _, tc:= range self.tc {
+ tc.Free()
+ }
+ self.ec.Free()
+}
+
func NewTestContext() TestContext {
return TestContext { ncompleted: 0 }
}
@@ -107,8 +128,14 @@ func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) {
func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) {
msg, hash := msgRandSerialize(size)
addr := conn.GetAddr()
- app.getTC(addr2id(addr)).hash = hash
+ tc := app.getTC(addr2id(addr))
+ addr.Free()
+ if tc.hash != nil {
+ salticidae.UInt256(tc.hash).Free()
+ }
+ tc.hash = hash
app.net.AsMsgNetwork().SendMsg(msg, conn)
+ msg.Free()
}
var apps []AppContext
@@ -133,10 +160,15 @@ func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) {
//export onReceiveRand
func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
msg := salticidae.Msg(_msg)
- hash := msgRandUnserialize(msg).GetHash()
+ bytes := msgRandUnserialize(msg)
+ hash := bytes.GetHash()
+ bytes.Free()
conn := salticidae.MsgNetworkConn(_conn)
net := conn.GetNet()
- net.SendMsg(msgAckSerialize(hash), conn)
+ ack := msgAckSerialize(hash)
+ net.SendMsg(ack, conn)
+ hash.Free()
+ ack.Free()
}
//export onReceiveAck
@@ -147,12 +179,14 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd
conn := salticidae.MsgNetworkConn(_conn)
_addr := conn.GetAddr()
addr := addr2id(_addr)
+ _addr.Free()
tc := app.getTC(addr)
if !hash.IsEq(tc.hash) {
//fmt.Printf("%s %s\n", hash.GetHex(), tc.hash.GetHex())
panic("corrupted I/O!")
}
+ hash.Free()
if tc.state == seg_buff_size * 2 {
sendRand(tc.state, app, conn)
@@ -161,7 +195,12 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd
ctx.app_id = C.int(id)
ctx.addr_id = C.uint64_t(addr)
ctx.conn = (*C.struct_msgnetwork_conn_t)(conn)
+ if tc.timer != nil {
+ tc.timer.Free()
+ C.free(unsafe.Pointer(tc.timer_ctx))
+ }
tc.timer = salticidae.NewTimerEvent(app.ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx))
+ tc.timer_ctx = ctx
t := rand.Float64() * 10
tc.timer.Add(t)
fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t)
@@ -182,6 +221,7 @@ func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata u
if conn.GetMode() == salticidae.CONN_MODE_ACTIVE {
addr := conn.GetAddr()
tc := app.getTC(addr2id(addr))
+ addr.Free()
tc.state = 1
fmt.Printf("INFO: increasing phase\n")
sendRand(tc.state, app, conn)
@@ -216,7 +256,13 @@ func main() {
salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i)))
}
netconfig := salticidae.NewPeerNetworkConfig()
+ nc := netconfig.AsMsgNetworkConfig()
+ nc.SegBuffSize(4096)
+ nc.NWorker(2)
+ netconfig.ConnTimeout(5)
+ netconfig.PingPeriod(2)
apps = make([]AppContext, len(addrs))
+ ids := make([](*C.int), len(addrs))
for i, addr := range addrs {
ec := salticidae.NewEventContext()
apps[i] = AppContext {
@@ -226,14 +272,16 @@ func main() {
tcall: salticidae.NewThreadCall(ec),
tc: make(map[uint64] *TestContext),
}
- _i := (C.malloc(C.sizeof_int))
- *(*C.int)(_i) = C.int(i)
+ ids[i] = (*C.int)(C.malloc(C.sizeof_int))
+ *ids[i] = C.int(i)
+ _i := unsafe.Pointer(ids[i])
net := apps[i].net.AsMsgNetwork()
net.RegHandler(MSG_OPCODE_RAND, salticidae.MsgNetworkMsgCallback(C.onReceiveRand), _i)
net.RegHandler(MSG_OPCODE_ACK, salticidae.MsgNetworkMsgCallback(C.onReceiveAck), _i)
net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), _i)
net.Start()
}
+ netconfig.Free()
threads.Add(len(apps))
for i, _ := range apps {
@@ -247,6 +295,8 @@ func main() {
}
}
a.ec.Dispatch()
+ a.Free()
+ C.free(unsafe.Pointer(ids[app_id]))
threads.Done()
}()
}