diff options
Diffstat (limited to 'test_p2p_stress')
-rw-r--r-- | test_p2p_stress/main.go | 43 |
1 files changed, 25 insertions, 18 deletions
diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go index 1cc6edf..79b28b0 100644 --- a/test_p2p_stress/main.go +++ b/test_p2p_stress/main.go @@ -23,6 +23,7 @@ import ( "fmt" "sync" "unsafe" + "strconv" ) var ec salticidae.EventContext @@ -34,7 +35,7 @@ const ( func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) { buffer := make([]byte, size) _, err := rand.Read(buffer) - if err == nil { + if err != nil { panic("rand source failed") } serialized := salticidae.NewDataStreamFromBytes(buffer) @@ -87,10 +88,20 @@ func addr2id(addr salticidae.NetAddr) uint64 { return uint64(addr.GetIP()) | (uint64(addr.GetPort()) << 32) } +func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) { + if tc, ok := self.tc[addr_id]; ok { + _tc = tc + } else { + _tc = new(TestContext) + self.tc[addr_id] = _tc + } + return +} + func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) { msg, hash := msgRandSerialize(size) addr := conn.GetAddr() - app.tc[addr2id(addr)].hash = hash + app.getTC(addr2id(addr)).hash = hash app.net.AsMsgNetwork().SendMsg(msg, conn) } @@ -103,20 +114,18 @@ var seg_buff_size = 4096 func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) { ctx := (*C.struct_timeout_callback_context_t)(userdata) app := &apps[ctx.app_id] - tc := app.tc[uint64(ctx.addr_id)] + tc := app.getTC(uint64(ctx.addr_id)) tc.ncompleted++ app.net.AsMsgNetwork().Terminate(salticidae.MsgNetworkConn(ctx.conn)) var s string for addr_id, v := range app.tc { s += fmt.Sprintf(" %d(%d)", C.ntohs(C.ushort(addr_id >> 32)), v.ncompleted) } - C.free(unsafe.Pointer(ctx)) fmt.Printf("INFO: %d completed:%s\n", C.ntohs(C.ushort(app.addr.GetPort())), s) } //export onReceiveRand func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { - C.free(userdata) msg := salticidae.Msg(_msg) hash := msgRandUnserialize(msg).GetHash() conn := salticidae.MsgNetworkConn(_conn) @@ -126,17 +135,16 @@ func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, user //export onReceiveAck func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { - msg := salticidae.Msg(_msg) - hash := msgAckUnserialize(msg) + hash := msgAckUnserialize(salticidae.Msg(_msg)) id := *(* int)(userdata) app := &apps[id] conn := salticidae.MsgNetworkConn(_conn) _addr := conn.GetAddr() addr := addr2id(_addr) - tc := app.tc[addr] - C.free(userdata) + tc := app.getTC(addr) - if hash.IsEq(tc.hash) { + if !hash.IsEq(tc.hash) { + //fmt.Printf("%s %s\n", hash.GetHex(), tc.hash.GetHex()) panic("corrupted I/O!") } @@ -147,7 +155,7 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd ctx.app_id = C.int(id) ctx.addr_id = C.uint64_t(addr) ctx.conn = (*C.struct_msgnetwork_conn_t)(conn) - tc.timer = salticidae.NewTimerEvent(ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx)) + tc.timer = salticidae.NewTimerEvent(app.ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx)) t := rand.Float64() * 10 tc.timer.Add(t) fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t) @@ -163,12 +171,11 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata unsafe.Pointer) { conn := salticidae.MsgNetworkConn(_conn) id := *(*int)(userdata) - C.free(userdata) app := &apps[id] if connected { if conn.GetMode() == salticidae.CONN_MODE_ACTIVE { addr := conn.GetAddr() - tc := app.tc[addr2id(addr)] + tc := app.getTC(addr2id(addr)) tc.state = 1 fmt.Printf("INFO: increasing phase\n") sendRand(tc.state, app, conn) @@ -190,6 +197,7 @@ func onTerm(_ C.int) { salticidae.ThreadCallCallback(C.onStopLoop), unsafe.Pointer(a.ec)) } + threads.Wait() ec.Stop() } @@ -198,9 +206,8 @@ func main() { var addrs []salticidae.NetAddr for i := 0; i < 4; i++ { - fmt.Println("%s", "127.0.0.1:" + string(12345 + i)) addrs = append(addrs, - salticidae.NewAddrFromIPPortString("127.0.0.1:" + string(12345 + i))) + salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i))) } netconfig := salticidae.NewPeerNetworkConfig() apps = make([]AppContext, len(addrs)) @@ -210,7 +217,7 @@ func main() { addr: addr, ec: ec, net: salticidae.NewPeerNetwork(ec, netconfig), - tcall: salticidae.ThreadCall(ec), + tcall: salticidae.NewThreadCall(ec), tc: make(map[uint64] *TestContext), } _i := (C.malloc(C.sizeof_int)) @@ -224,8 +231,9 @@ func main() { threads.Add(len(apps)) for i, _ := range apps { + app_id := i go func() { - a := &apps[i] + a := &apps[app_id] a.net.Listen(a.addr) for _, addr := range addrs { if !addr.IsEq(a.addr) { @@ -236,7 +244,6 @@ func main() { threads.Done() }() } - threads.Wait() ev_int := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm)) ev_int.Add(salticidae.SIGINT) |