From ea27f0de65f79d7ee56e964f6c966e7c43c66f86 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 28 Jun 2019 15:41:55 -0400 Subject: ... --- test_p2p_stress/main.go | 43 ++++++++++++++++++++++++++++++------------- 1 file changed, 30 insertions(+), 13 deletions(-) (limited to 'test_p2p_stress/main.go') diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go index 69f9d6c..06a1407 100644 --- a/test_p2p_stress/main.go +++ b/test_p2p_stress/main.go @@ -3,6 +3,7 @@ package main // #cgo CFLAGS: -I${SRCDIR}/../salticidae/include/ // #cgo LDFLAGS: ${SRCDIR}/../salticidae/libsalticidae.so -Wl,-rpath=${SRCDIR}/salticidae/ // #include +// #include // #include // #include "salticidae/network.h" // void onTerm(int sig, void *); @@ -40,36 +41,45 @@ const ( MSG_OPCODE_ACK ) -func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) { +func msgRandSerialize(view uint32, size int) (salticidae.Msg, salticidae.UInt256) { + serialized := salticidae.NewDataStream() + serialized.PutU32(salticidae.ToLittleEndianU32(view)) buffer := make([]byte, size) _, err := rand.Read(buffer) if err != nil { panic("rand source failed") } - serialized := salticidae.NewDataStreamFromBytes(buffer) - hash := serialized.GetHash() + serialized.PutData(buffer) + hash := salticidae.NewByteArrayFromBytes(buffer).GetHash() return salticidae.NewMsgMovedFromByteArray( MSG_OPCODE_RAND, salticidae.NewByteArrayMovedFromDataStream(serialized)), hash } -func msgRandUnserialize(msg salticidae.Msg) salticidae.DataStream { - return msg.GetPayloadByMove() +func msgRandUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) { + d := msg.GetPayloadByMove() + succ := true + view = salticidae.FromLittleEndianU32(d.GetU32(&succ)) + hash = salticidae.NewByteArrayCopiedFromDataStream(d).GetHash() + return } -func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg { +func msgAckSerialize(view uint32, hash salticidae.UInt256) salticidae.Msg { serialized := salticidae.NewDataStream() + serialized.PutU32(salticidae.ToLittleEndianU32(view)) hash.Serialize(serialized) return salticidae.NewMsgMovedFromByteArray( MSG_OPCODE_ACK, salticidae.NewByteArrayMovedFromDataStream(serialized)) } -func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 { +func msgAckUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) { p := msg.GetPayloadByMove() - hash := salticidae.NewUInt256() + hash = salticidae.NewUInt256() + succ := true + view = salticidae.FromLittleEndianU32(p.GetU32(&succ)) hash.Unserialize(p) - return hash + return } func checkError(err *salticidae.Error) { @@ -83,6 +93,7 @@ type TestContext struct { timer salticidae.TimerEvent timer_ctx *C.struct_timeout_callback_context_t state int + view uint32 hash salticidae.UInt256 ncompleted int } @@ -123,8 +134,8 @@ func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) { } func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) { - msg, hash := msgRandSerialize(size) tc := app.getTC(addr2id(conn.GetAddr())) + msg, hash := msgRandSerialize(tc.view, size) tc.hash = hash app.net.AsMsgNetwork().SendMsg(msg, conn) } @@ -156,19 +167,25 @@ func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, user msg := salticidae.MsgFromC(salticidae.CMsg(_msg)) conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)) net := conn.GetNet() - ack := msgAckSerialize(msgRandUnserialize(msg).GetHash()) + view, hash := msgRandUnserialize(msg) + ack := msgAckSerialize(view, hash) net.SendMsg(ack, conn) } //export onReceiveAck func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { - hash := msgAckUnserialize(salticidae.MsgFromC(salticidae.CMsg(_msg))) + view, hash := msgAckUnserialize(salticidae.MsgFromC(salticidae.CMsg(_msg))) id := *(* int)(userdata) app := &apps[id] conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)) addr := addr2id(conn.GetAddr()) tc := app.getTC(addr) + if view != tc.view { + fmt.Printf("dropping stale MsgAck") + return + } + if !hash.IsEq(tc.hash) { panic("corrupted I/O!") } @@ -245,7 +262,7 @@ func main() { var addrs []salticidae.NetAddr for i := 0; i < 5; i++ { addrs = append(addrs, - salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i))) + salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i), &err)) } netconfig := salticidae.NewPeerNetworkConfig() nc := netconfig.AsMsgNetworkConfig() -- cgit v1.2.3-70-g09d2