aboutsummaryrefslogtreecommitdiff
path: root/test_p2p_stress
diff options
context:
space:
mode:
Diffstat (limited to 'test_p2p_stress')
-rw-r--r--test_p2p_stress/main.go72
1 files changed, 37 insertions, 35 deletions
diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go
index 29cfbfa..6f14371 100644
--- a/test_p2p_stress/main.go
+++ b/test_p2p_stress/main.go
@@ -11,7 +11,7 @@ package main
// void onReceiveAck(msg_t *, msgnetwork_conn_t *, void *);
// void onStopLoop(threadcall_handle_t *, void *);
// void peerHandler(peernetwork_conn_t *, bool, void *);
-// void errorHandler(SalticidaeCError *, bool, void *);
+// void errorHandler(SalticidaeCError *, bool, int32_t, void *);
// void onTimeout(timerev_t *, void *);
// typedef struct timeout_callback_context_t {
// int app_id;
@@ -42,7 +42,7 @@ const (
)
func msgRandSerialize(view uint32, size int) (salticidae.Msg, salticidae.UInt256) {
- serialized := salticidae.NewDataStream(true)
+ serialized := salticidae.NewDataStream(false); defer serialized.Free()
serialized.PutU32(salticidae.ToLittleEndianU32(view))
buffer := make([]byte, size)
_, err := rand.Read(buffer)
@@ -50,27 +50,26 @@ func msgRandSerialize(view uint32, size int) (salticidae.Msg, salticidae.UInt256
panic("rand source failed")
}
serialized.PutData(buffer)
- hash := salticidae.NewByteArrayFromBytes(buffer, true).GetHash(true)
- return salticidae.NewMsgMovedFromByteArray(
- MSG_OPCODE_RAND,
- salticidae.NewByteArrayMovedFromDataStream(serialized, true), true), hash
+ ba := salticidae.NewByteArrayFromBytes(buffer, false); defer ba.Free()
+ payload := salticidae.NewByteArrayMovedFromDataStream(serialized, false); defer payload.Free()
+ return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_RAND, payload, true), ba.GetHash(true)
}
func msgRandUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) {
d := msg.GetPayloadByMove()
succ := true
view = salticidae.FromLittleEndianU32(d.GetU32(&succ))
- hash = salticidae.NewByteArrayCopiedFromDataStream(d, true).GetHash(true)
+ ba := salticidae.NewByteArrayCopiedFromDataStream(d, false); defer ba.Free()
+ hash = ba.GetHash(true)
return
}
func msgAckSerialize(view uint32, hash salticidae.UInt256) salticidae.Msg {
- serialized := salticidae.NewDataStream(true)
+ serialized := salticidae.NewDataStream(false); defer serialized.Free()
serialized.PutU32(salticidae.ToLittleEndianU32(view))
hash.Serialize(serialized)
- return salticidae.NewMsgMovedFromByteArray(
- MSG_OPCODE_ACK,
- salticidae.NewByteArrayMovedFromDataStream(serialized, true), true)
+ payload := salticidae.NewByteArrayMovedFromDataStream(serialized, false); defer payload.Free()
+ return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_ACK, payload, true)
}
func msgAckUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) {
@@ -133,8 +132,7 @@ func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) {
return
}
-func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) {
- tc := app.getTC(addr2id(conn.GetAddr()))
+func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn, tc *TestContext) {
msg, hash := msgRandSerialize(tc.view, size)
tc.hash = hash
app.net.AsMsgNetwork().SendMsg(msg, conn)
@@ -149,7 +147,7 @@ var ids []*C.int
//export onTimeout
func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) {
ctx := (*C.struct_timeout_callback_context_t)(userdata)
- app := &apps[ctx.app_id]
+ app := &apps[int(ctx.app_id)]
tc := app.getTC(uint64(ctx.addr_id))
tc.ncompleted++
app.net.AsMsgNetwork().Terminate(
@@ -163,7 +161,7 @@ func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) {
}
//export onReceiveRand
-func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
+func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
msg := salticidae.MsgFromC(salticidae.CMsg(_msg))
conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
net := conn.GetNet()
@@ -175,14 +173,17 @@ func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, user
//export onReceiveAck
func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
view, hash := msgAckUnserialize(salticidae.MsgFromC(salticidae.CMsg(_msg)))
- id := *(* int)(userdata)
+ id := int(*(*C.int)(userdata))
app := &apps[id]
conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
- addr := addr2id(conn.GetAddr())
- tc := app.getTC(addr)
+ pconn := salticidae.NewPeerNetworkConnFromMsgNetworkConnUnsafe(conn, false); defer pconn.Free()
+ addr := pconn.GetPeerAddr(false); defer addr.Free()
+ if addr.IsNull() { return }
+ addrID := addr2id(addr)
+ tc := app.getTC(addrID)
if view != tc.view {
- fmt.Printf("dropping stale MsgAck")
+ fmt.Printf("dropping stale MsgAck\n")
return
}
@@ -191,11 +192,11 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd
}
if tc.state == segBuffSize * 2 {
- sendRand(tc.state, app, conn)
+ sendRand(tc.state, app, conn, tc)
tc.state = -1
ctx := C.timeout_callback_context_new()
ctx.app_id = C.int(id)
- ctx.addr_id = C.uint64_t(addr)
+ ctx.addr_id = C.uint64_t(addrID)
ctx.conn = C.msgnetwork_conn_copy(_conn)
if tc.timer != nil {
C.msgnetwork_conn_free(tc.timer_ctx.conn)
@@ -207,39 +208,40 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd
tc.timer.Add(t)
fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t)
} else if tc.state == -1 {
- sendRand(rand.Int() % (segBuffSize * 10), app, conn)
+ sendRand(rand.Int() % (segBuffSize * 10), app, conn, tc)
} else {
tc.state++
- sendRand(tc.state, app, conn)
+ sendRand(tc.state, app, conn, tc)
}
}
//export peerHandler
func peerHandler(_conn *C.struct_peernetwork_conn_t, connected C.bool, userdata unsafe.Pointer) {
if connected {
- conn := salticidae.NewMsgNetworkConnFromPeerNetworkConn(salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn)))
- id := *(*int)(userdata)
+ pconn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))
+ conn := salticidae.NewMsgNetworkConnFromPeerNetworkConn(pconn, false); defer conn.Free()
+ id := int(*(*C.int)(userdata))
app := &apps[id]
- if conn.GetMode() == salticidae.CONN_MODE_ACTIVE {
- tc := app.getTC(addr2id(conn.GetAddr()))
- tc.state = 1
- fmt.Printf("INFO: increasing phase\n")
- sendRand(tc.state, app, conn)
- }
+ addr := pconn.GetPeerAddr(false); defer addr.Free()
+ tc := app.getTC(addr2id(addr))
+ tc.state = 1
+ tc.view++
+ sendRand(tc.state, app, conn, tc)
}
}
//export errorHandler
-func errorHandler(_err *C.struct_SalticidaeCError, fatal C.bool, _ unsafe.Pointer) {
+func errorHandler(_err *C.struct_SalticidaeCError, fatal C.bool, asyncID C.int32_t, _ unsafe.Pointer) {
err := (*salticidae.Error)(unsafe.Pointer(_err))
s := "recoverable"
if fatal { s = "fatal" }
- fmt.Printf("Captured %s error during an async call: %s\n", s, salticidae.StrError(err.GetCode()))
+ fmt.Printf("Captured %s error during an async call %d: %s\n", s, asyncID, salticidae.StrError(err.GetCode()))
}
//export onStopLoop
func onStopLoop(_ *C.threadcall_handle_t, userdata unsafe.Pointer) {
- ec := apps[*(*int)(userdata)].ec
+ id := int(*(*C.int)(userdata))
+ ec := apps[id].ec
ec.Stop()
}
@@ -262,7 +264,7 @@ func main() {
var addrs []salticidae.NetAddr
for i := 0; i < 5; i++ {
addrs = append(addrs,
- salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i), &err))
+ salticidae.NewNetAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i), true, &err))
}
netconfig := salticidae.NewPeerNetworkConfig()
nc := netconfig.AsMsgNetworkConfig()