diff options
Diffstat (limited to 'test_p2p_stress')
-rw-r--r-- | test_p2p_stress/main.go | 72 |
1 files changed, 37 insertions, 35 deletions
diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go index 29cfbfa..6f14371 100644 --- a/test_p2p_stress/main.go +++ b/test_p2p_stress/main.go @@ -11,7 +11,7 @@ package main // void onReceiveAck(msg_t *, msgnetwork_conn_t *, void *); // void onStopLoop(threadcall_handle_t *, void *); // void peerHandler(peernetwork_conn_t *, bool, void *); -// void errorHandler(SalticidaeCError *, bool, void *); +// void errorHandler(SalticidaeCError *, bool, int32_t, void *); // void onTimeout(timerev_t *, void *); // typedef struct timeout_callback_context_t { // int app_id; @@ -42,7 +42,7 @@ const ( ) func msgRandSerialize(view uint32, size int) (salticidae.Msg, salticidae.UInt256) { - serialized := salticidae.NewDataStream(true) + serialized := salticidae.NewDataStream(false); defer serialized.Free() serialized.PutU32(salticidae.ToLittleEndianU32(view)) buffer := make([]byte, size) _, err := rand.Read(buffer) @@ -50,27 +50,26 @@ func msgRandSerialize(view uint32, size int) (salticidae.Msg, salticidae.UInt256 panic("rand source failed") } serialized.PutData(buffer) - hash := salticidae.NewByteArrayFromBytes(buffer, true).GetHash(true) - return salticidae.NewMsgMovedFromByteArray( - MSG_OPCODE_RAND, - salticidae.NewByteArrayMovedFromDataStream(serialized, true), true), hash + ba := salticidae.NewByteArrayFromBytes(buffer, false); defer ba.Free() + payload := salticidae.NewByteArrayMovedFromDataStream(serialized, false); defer payload.Free() + return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_RAND, payload, true), ba.GetHash(true) } func msgRandUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) { d := msg.GetPayloadByMove() succ := true view = salticidae.FromLittleEndianU32(d.GetU32(&succ)) - hash = salticidae.NewByteArrayCopiedFromDataStream(d, true).GetHash(true) + ba := salticidae.NewByteArrayCopiedFromDataStream(d, false); defer ba.Free() + hash = ba.GetHash(true) return } func msgAckSerialize(view uint32, hash salticidae.UInt256) salticidae.Msg { - serialized := salticidae.NewDataStream(true) + serialized := salticidae.NewDataStream(false); defer serialized.Free() serialized.PutU32(salticidae.ToLittleEndianU32(view)) hash.Serialize(serialized) - return salticidae.NewMsgMovedFromByteArray( - MSG_OPCODE_ACK, - salticidae.NewByteArrayMovedFromDataStream(serialized, true), true) + payload := salticidae.NewByteArrayMovedFromDataStream(serialized, false); defer payload.Free() + return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_ACK, payload, true) } func msgAckUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) { @@ -133,8 +132,7 @@ func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) { return } -func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) { - tc := app.getTC(addr2id(conn.GetAddr())) +func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn, tc *TestContext) { msg, hash := msgRandSerialize(tc.view, size) tc.hash = hash app.net.AsMsgNetwork().SendMsg(msg, conn) @@ -149,7 +147,7 @@ var ids []*C.int //export onTimeout func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) { ctx := (*C.struct_timeout_callback_context_t)(userdata) - app := &apps[ctx.app_id] + app := &apps[int(ctx.app_id)] tc := app.getTC(uint64(ctx.addr_id)) tc.ncompleted++ app.net.AsMsgNetwork().Terminate( @@ -163,7 +161,7 @@ func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) { } //export onReceiveRand -func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { +func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) { msg := salticidae.MsgFromC(salticidae.CMsg(_msg)) conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)) net := conn.GetNet() @@ -175,14 +173,17 @@ func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, user //export onReceiveAck func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { view, hash := msgAckUnserialize(salticidae.MsgFromC(salticidae.CMsg(_msg))) - id := *(* int)(userdata) + id := int(*(*C.int)(userdata)) app := &apps[id] conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)) - addr := addr2id(conn.GetAddr()) - tc := app.getTC(addr) + pconn := salticidae.NewPeerNetworkConnFromMsgNetworkConnUnsafe(conn, false); defer pconn.Free() + addr := pconn.GetPeerAddr(false); defer addr.Free() + if addr.IsNull() { return } + addrID := addr2id(addr) + tc := app.getTC(addrID) if view != tc.view { - fmt.Printf("dropping stale MsgAck") + fmt.Printf("dropping stale MsgAck\n") return } @@ -191,11 +192,11 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd } if tc.state == segBuffSize * 2 { - sendRand(tc.state, app, conn) + sendRand(tc.state, app, conn, tc) tc.state = -1 ctx := C.timeout_callback_context_new() ctx.app_id = C.int(id) - ctx.addr_id = C.uint64_t(addr) + ctx.addr_id = C.uint64_t(addrID) ctx.conn = C.msgnetwork_conn_copy(_conn) if tc.timer != nil { C.msgnetwork_conn_free(tc.timer_ctx.conn) @@ -207,39 +208,40 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd tc.timer.Add(t) fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t) } else if tc.state == -1 { - sendRand(rand.Int() % (segBuffSize * 10), app, conn) + sendRand(rand.Int() % (segBuffSize * 10), app, conn, tc) } else { tc.state++ - sendRand(tc.state, app, conn) + sendRand(tc.state, app, conn, tc) } } //export peerHandler func peerHandler(_conn *C.struct_peernetwork_conn_t, connected C.bool, userdata unsafe.Pointer) { if connected { - conn := salticidae.NewMsgNetworkConnFromPeerNetworkConn(salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))) - id := *(*int)(userdata) + pconn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn)) + conn := salticidae.NewMsgNetworkConnFromPeerNetworkConn(pconn, false); defer conn.Free() + id := int(*(*C.int)(userdata)) app := &apps[id] - if conn.GetMode() == salticidae.CONN_MODE_ACTIVE { - tc := app.getTC(addr2id(conn.GetAddr())) - tc.state = 1 - fmt.Printf("INFO: increasing phase\n") - sendRand(tc.state, app, conn) - } + addr := pconn.GetPeerAddr(false); defer addr.Free() + tc := app.getTC(addr2id(addr)) + tc.state = 1 + tc.view++ + sendRand(tc.state, app, conn, tc) } } //export errorHandler -func errorHandler(_err *C.struct_SalticidaeCError, fatal C.bool, _ unsafe.Pointer) { +func errorHandler(_err *C.struct_SalticidaeCError, fatal C.bool, asyncID C.int32_t, _ unsafe.Pointer) { err := (*salticidae.Error)(unsafe.Pointer(_err)) s := "recoverable" if fatal { s = "fatal" } - fmt.Printf("Captured %s error during an async call: %s\n", s, salticidae.StrError(err.GetCode())) + fmt.Printf("Captured %s error during an async call %d: %s\n", s, asyncID, salticidae.StrError(err.GetCode())) } //export onStopLoop func onStopLoop(_ *C.threadcall_handle_t, userdata unsafe.Pointer) { - ec := apps[*(*int)(userdata)].ec + id := int(*(*C.int)(userdata)) + ec := apps[id].ec ec.Stop() } @@ -262,7 +264,7 @@ func main() { var addrs []salticidae.NetAddr for i := 0; i < 5; i++ { addrs = append(addrs, - salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i), &err)) + salticidae.NewNetAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i), true, &err)) } netconfig := salticidae.NewPeerNetworkConfig() nc := netconfig.AsMsgNetworkConfig() |