diff options
author | Determinant <[email protected]> | 2019-06-10 21:33:00 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2019-06-10 21:33:00 -0400 |
commit | 35bc0201cbc4ac42fa4ef72af429607e7d5079a5 (patch) | |
tree | d98b5138c691f744fe1cce000de7925f63eb8e00 /test_p2p_stress | |
parent | 334fbbc9e4ed2131101062d8fbe1db4afae7c0aa (diff) |
...
Diffstat (limited to 'test_p2p_stress')
-rw-r--r-- | test_p2p_stress/main.go | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go new file mode 100644 index 0000000..1cc6edf --- /dev/null +++ b/test_p2p_stress/main.go @@ -0,0 +1,251 @@ +package main + +// #cgo CFLAGS: -I${SRCDIR}/../salticidae/include/ +// #include <stdlib.h> +// #include <arpa/inet.h> +// #include "salticidae/network.h" +// void onTerm(int sig); +// void onReceiveRand(msg_t *, msgnetwork_conn_t *, void *); +// void onReceiveAck(msg_t *, msgnetwork_conn_t *, void *); +// void onStopLoop(threadcall_handle_t *, void *); +// void connHandler(msgnetwork_conn_t *, bool, void *); +// void onTimeout(timerev_t *, void *); +// typedef struct timeout_callback_context_t { +// int app_id; +// uint64_t addr_id; +// msgnetwork_conn_t *conn; +// } timerout_callback_context_t; +import "C" + +import ( + "salticidae-go" + "math/rand" + "fmt" + "sync" + "unsafe" +) + +var ec salticidae.EventContext +const ( + MSG_OPCODE_RAND salticidae.Opcode = iota + MSG_OPCODE_ACK +) + +func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) { + buffer := make([]byte, size) + _, err := rand.Read(buffer) + if err == nil { + panic("rand source failed") + } + serialized := salticidae.NewDataStreamFromBytes(buffer) + hash := serialized.GetHash() + return salticidae.NewMsg( + MSG_OPCODE_RAND, + salticidae.NewByteArrayMovedFromDataStream(serialized)), hash +} + +func msgRandUnserialize(msg salticidae.Msg) salticidae.DataStream { + return msg.GetPayload() +} + +func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg { + serialized := salticidae.NewDataStream() + hash.Serialize(serialized) + return salticidae.NewMsg( + MSG_OPCODE_ACK, + salticidae.NewByteArrayMovedFromDataStream(serialized)) +} + +func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 { + p := msg.GetPayload() + hash := salticidae.NewUInt256() + hash.Unserialize(p) + p.Free() + return hash +} + +type TestContext struct { + timer salticidae.TimerEvent + state int + hash salticidae.UInt256 + ncompleted int +} + +type AppContext struct { + addr salticidae.NetAddr + ec salticidae.EventContext + net salticidae.PeerNetwork + tcall salticidae.ThreadCall + tc map[uint64] *TestContext +} + +func NewTestContext() TestContext { + return TestContext { ncompleted: 0 } +} + +func addr2id(addr salticidae.NetAddr) uint64 { + return uint64(addr.GetIP()) | (uint64(addr.GetPort()) << 32) +} + +func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) { + msg, hash := msgRandSerialize(size) + addr := conn.GetAddr() + app.tc[addr2id(addr)].hash = hash + app.net.AsMsgNetwork().SendMsg(msg, conn) +} + +var apps []AppContext +var threads sync.WaitGroup + +var seg_buff_size = 4096 + +//export onTimeout +func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) { + ctx := (*C.struct_timeout_callback_context_t)(userdata) + app := &apps[ctx.app_id] + tc := app.tc[uint64(ctx.addr_id)] + tc.ncompleted++ + app.net.AsMsgNetwork().Terminate(salticidae.MsgNetworkConn(ctx.conn)) + var s string + for addr_id, v := range app.tc { + s += fmt.Sprintf(" %d(%d)", C.ntohs(C.ushort(addr_id >> 32)), v.ncompleted) + } + C.free(unsafe.Pointer(ctx)) + fmt.Printf("INFO: %d completed:%s\n", C.ntohs(C.ushort(app.addr.GetPort())), s) +} + +//export onReceiveRand +func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { + C.free(userdata) + msg := salticidae.Msg(_msg) + hash := msgRandUnserialize(msg).GetHash() + conn := salticidae.MsgNetworkConn(_conn) + net := conn.GetNet() + net.SendMsg(msgAckSerialize(hash), conn) +} + +//export onReceiveAck +func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { + msg := salticidae.Msg(_msg) + hash := msgAckUnserialize(msg) + id := *(* int)(userdata) + app := &apps[id] + conn := salticidae.MsgNetworkConn(_conn) + _addr := conn.GetAddr() + addr := addr2id(_addr) + tc := app.tc[addr] + C.free(userdata) + + if hash.IsEq(tc.hash) { + panic("corrupted I/O!") + } + + if tc.state == seg_buff_size * 2 { + sendRand(tc.state, app, conn) + tc.state = -1 + ctx := (*C.struct_timeout_callback_context_t)(C.malloc(C.sizeof_struct_timeout_callback_context_t)) + ctx.app_id = C.int(id) + ctx.addr_id = C.uint64_t(addr) + ctx.conn = (*C.struct_msgnetwork_conn_t)(conn) + tc.timer = salticidae.NewTimerEvent(ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx)) + t := rand.Float64() * 10 + tc.timer.Add(t) + fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t) + } else if tc.state == -1 { + sendRand(rand.Int() % (seg_buff_size * 10), app, conn) + } else { + tc.state++ + sendRand(tc.state, app, conn) + } +} + +//export connHandler +func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata unsafe.Pointer) { + conn := salticidae.MsgNetworkConn(_conn) + id := *(*int)(userdata) + C.free(userdata) + app := &apps[id] + if connected { + if conn.GetMode() == salticidae.CONN_MODE_ACTIVE { + addr := conn.GetAddr() + tc := app.tc[addr2id(addr)] + tc.state = 1 + fmt.Printf("INFO: increasing phase\n") + sendRand(tc.state, app, conn) + } + } +} + +//export onStopLoop +func onStopLoop(_ *C.threadcall_handle_t, userdata unsafe.Pointer) { + ec := salticidae.EventContext(userdata) + ec.Stop() +} + +//export onTerm +func onTerm(_ C.int) { + for i, _ := range apps { + a := &apps[i] + a.tcall.AsyncCall( + salticidae.ThreadCallCallback(C.onStopLoop), + unsafe.Pointer(a.ec)) + } + ec.Stop() +} + +func main() { + ec = salticidae.NewEventContext() + + var addrs []salticidae.NetAddr + for i := 0; i < 4; i++ { + fmt.Println("%s", "127.0.0.1:" + string(12345 + i)) + addrs = append(addrs, + salticidae.NewAddrFromIPPortString("127.0.0.1:" + string(12345 + i))) + } + netconfig := salticidae.NewPeerNetworkConfig() + apps = make([]AppContext, len(addrs)) + for i, addr := range addrs { + ec := salticidae.NewEventContext() + apps[i] = AppContext { + addr: addr, + ec: ec, + net: salticidae.NewPeerNetwork(ec, netconfig), + tcall: salticidae.ThreadCall(ec), + tc: make(map[uint64] *TestContext), + } + _i := (C.malloc(C.sizeof_int)) + *(*C.int)(_i) = C.int(i) + net := apps[i].net.AsMsgNetwork() + net.RegHandler(MSG_OPCODE_RAND, salticidae.MsgNetworkMsgCallback(C.onReceiveRand), _i) + net.RegHandler(MSG_OPCODE_ACK, salticidae.MsgNetworkMsgCallback(C.onReceiveAck), _i) + net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), _i) + net.Start() + } + + threads.Add(len(apps)) + for i, _ := range apps { + go func() { + a := &apps[i] + a.net.Listen(a.addr) + for _, addr := range addrs { + if !addr.IsEq(a.addr) { + a.net.AddPeer(addr) + } + } + a.ec.Dispatch() + threads.Done() + }() + } + threads.Wait() + + ev_int := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm)) + ev_int.Add(salticidae.SIGINT) + ev_term := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm)) + ev_term.Add(salticidae.SIGTERM) + + ec.Dispatch() + + ev_int.Free() + ev_term.Free() + ec.Free() +} |