aboutsummaryrefslogtreecommitdiff
path: root/test_p2p_stress
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2019-06-11 00:04:12 -0400
committerDeterminant <[email protected]>2019-06-11 00:04:12 -0400
commitdf87359ec575622687b5f6b8319fb30ba4340caf (patch)
tree450e415c7836c5ed99a5c7fd17a72c7fefc6e725 /test_p2p_stress
parent7636ae38cff0f09783b64f174f67f9e018762a04 (diff)
...
Diffstat (limited to 'test_p2p_stress')
-rw-r--r--test_p2p_stress/main.go43
1 files changed, 25 insertions, 18 deletions
diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go
index 1cc6edf..79b28b0 100644
--- a/test_p2p_stress/main.go
+++ b/test_p2p_stress/main.go
@@ -23,6 +23,7 @@ import (
"fmt"
"sync"
"unsafe"
+ "strconv"
)
var ec salticidae.EventContext
@@ -34,7 +35,7 @@ const (
func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) {
buffer := make([]byte, size)
_, err := rand.Read(buffer)
- if err == nil {
+ if err != nil {
panic("rand source failed")
}
serialized := salticidae.NewDataStreamFromBytes(buffer)
@@ -87,10 +88,20 @@ func addr2id(addr salticidae.NetAddr) uint64 {
return uint64(addr.GetIP()) | (uint64(addr.GetPort()) << 32)
}
+func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) {
+ if tc, ok := self.tc[addr_id]; ok {
+ _tc = tc
+ } else {
+ _tc = new(TestContext)
+ self.tc[addr_id] = _tc
+ }
+ return
+}
+
func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) {
msg, hash := msgRandSerialize(size)
addr := conn.GetAddr()
- app.tc[addr2id(addr)].hash = hash
+ app.getTC(addr2id(addr)).hash = hash
app.net.AsMsgNetwork().SendMsg(msg, conn)
}
@@ -103,20 +114,18 @@ var seg_buff_size = 4096
func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) {
ctx := (*C.struct_timeout_callback_context_t)(userdata)
app := &apps[ctx.app_id]
- tc := app.tc[uint64(ctx.addr_id)]
+ tc := app.getTC(uint64(ctx.addr_id))
tc.ncompleted++
app.net.AsMsgNetwork().Terminate(salticidae.MsgNetworkConn(ctx.conn))
var s string
for addr_id, v := range app.tc {
s += fmt.Sprintf(" %d(%d)", C.ntohs(C.ushort(addr_id >> 32)), v.ncompleted)
}
- C.free(unsafe.Pointer(ctx))
fmt.Printf("INFO: %d completed:%s\n", C.ntohs(C.ushort(app.addr.GetPort())), s)
}
//export onReceiveRand
func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
- C.free(userdata)
msg := salticidae.Msg(_msg)
hash := msgRandUnserialize(msg).GetHash()
conn := salticidae.MsgNetworkConn(_conn)
@@ -126,17 +135,16 @@ func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, user
//export onReceiveAck
func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
- msg := salticidae.Msg(_msg)
- hash := msgAckUnserialize(msg)
+ hash := msgAckUnserialize(salticidae.Msg(_msg))
id := *(* int)(userdata)
app := &apps[id]
conn := salticidae.MsgNetworkConn(_conn)
_addr := conn.GetAddr()
addr := addr2id(_addr)
- tc := app.tc[addr]
- C.free(userdata)
+ tc := app.getTC(addr)
- if hash.IsEq(tc.hash) {
+ if !hash.IsEq(tc.hash) {
+ //fmt.Printf("%s %s\n", hash.GetHex(), tc.hash.GetHex())
panic("corrupted I/O!")
}
@@ -147,7 +155,7 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd
ctx.app_id = C.int(id)
ctx.addr_id = C.uint64_t(addr)
ctx.conn = (*C.struct_msgnetwork_conn_t)(conn)
- tc.timer = salticidae.NewTimerEvent(ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx))
+ tc.timer = salticidae.NewTimerEvent(app.ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx))
t := rand.Float64() * 10
tc.timer.Add(t)
fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t)
@@ -163,12 +171,11 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd
func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata unsafe.Pointer) {
conn := salticidae.MsgNetworkConn(_conn)
id := *(*int)(userdata)
- C.free(userdata)
app := &apps[id]
if connected {
if conn.GetMode() == salticidae.CONN_MODE_ACTIVE {
addr := conn.GetAddr()
- tc := app.tc[addr2id(addr)]
+ tc := app.getTC(addr2id(addr))
tc.state = 1
fmt.Printf("INFO: increasing phase\n")
sendRand(tc.state, app, conn)
@@ -190,6 +197,7 @@ func onTerm(_ C.int) {
salticidae.ThreadCallCallback(C.onStopLoop),
unsafe.Pointer(a.ec))
}
+ threads.Wait()
ec.Stop()
}
@@ -198,9 +206,8 @@ func main() {
var addrs []salticidae.NetAddr
for i := 0; i < 4; i++ {
- fmt.Println("%s", "127.0.0.1:" + string(12345 + i))
addrs = append(addrs,
- salticidae.NewAddrFromIPPortString("127.0.0.1:" + string(12345 + i)))
+ salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i)))
}
netconfig := salticidae.NewPeerNetworkConfig()
apps = make([]AppContext, len(addrs))
@@ -210,7 +217,7 @@ func main() {
addr: addr,
ec: ec,
net: salticidae.NewPeerNetwork(ec, netconfig),
- tcall: salticidae.ThreadCall(ec),
+ tcall: salticidae.NewThreadCall(ec),
tc: make(map[uint64] *TestContext),
}
_i := (C.malloc(C.sizeof_int))
@@ -224,8 +231,9 @@ func main() {
threads.Add(len(apps))
for i, _ := range apps {
+ app_id := i
go func() {
- a := &apps[i]
+ a := &apps[app_id]
a.net.Listen(a.addr)
for _, addr := range addrs {
if !addr.IsEq(a.addr) {
@@ -236,7 +244,6 @@ func main() {
threads.Done()
}()
}
- threads.Wait()
ev_int := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm))
ev_int.Add(salticidae.SIGINT)