aboutsummaryrefslogtreecommitdiff
path: root/test_p2p_stress/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'test_p2p_stress/main.go')
-rw-r--r--test_p2p_stress/main.go251
1 files changed, 251 insertions, 0 deletions
diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go
new file mode 100644
index 0000000..1cc6edf
--- /dev/null
+++ b/test_p2p_stress/main.go
@@ -0,0 +1,251 @@
+package main
+
+// #cgo CFLAGS: -I${SRCDIR}/../salticidae/include/
+// #include <stdlib.h>
+// #include <arpa/inet.h>
+// #include "salticidae/network.h"
+// void onTerm(int sig);
+// void onReceiveRand(msg_t *, msgnetwork_conn_t *, void *);
+// void onReceiveAck(msg_t *, msgnetwork_conn_t *, void *);
+// void onStopLoop(threadcall_handle_t *, void *);
+// void connHandler(msgnetwork_conn_t *, bool, void *);
+// void onTimeout(timerev_t *, void *);
+// typedef struct timeout_callback_context_t {
+// int app_id;
+// uint64_t addr_id;
+// msgnetwork_conn_t *conn;
+// } timerout_callback_context_t;
+import "C"
+
+import (
+ "salticidae-go"
+ "math/rand"
+ "fmt"
+ "sync"
+ "unsafe"
+)
+
+var ec salticidae.EventContext
+const (
+ MSG_OPCODE_RAND salticidae.Opcode = iota
+ MSG_OPCODE_ACK
+)
+
+func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) {
+ buffer := make([]byte, size)
+ _, err := rand.Read(buffer)
+ if err == nil {
+ panic("rand source failed")
+ }
+ serialized := salticidae.NewDataStreamFromBytes(buffer)
+ hash := serialized.GetHash()
+ return salticidae.NewMsg(
+ MSG_OPCODE_RAND,
+ salticidae.NewByteArrayMovedFromDataStream(serialized)), hash
+}
+
+func msgRandUnserialize(msg salticidae.Msg) salticidae.DataStream {
+ return msg.GetPayload()
+}
+
+func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg {
+ serialized := salticidae.NewDataStream()
+ hash.Serialize(serialized)
+ return salticidae.NewMsg(
+ MSG_OPCODE_ACK,
+ salticidae.NewByteArrayMovedFromDataStream(serialized))
+}
+
+func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 {
+ p := msg.GetPayload()
+ hash := salticidae.NewUInt256()
+ hash.Unserialize(p)
+ p.Free()
+ return hash
+}
+
+type TestContext struct {
+ timer salticidae.TimerEvent
+ state int
+ hash salticidae.UInt256
+ ncompleted int
+}
+
+type AppContext struct {
+ addr salticidae.NetAddr
+ ec salticidae.EventContext
+ net salticidae.PeerNetwork
+ tcall salticidae.ThreadCall
+ tc map[uint64] *TestContext
+}
+
+func NewTestContext() TestContext {
+ return TestContext { ncompleted: 0 }
+}
+
+func addr2id(addr salticidae.NetAddr) uint64 {
+ return uint64(addr.GetIP()) | (uint64(addr.GetPort()) << 32)
+}
+
+func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) {
+ msg, hash := msgRandSerialize(size)
+ addr := conn.GetAddr()
+ app.tc[addr2id(addr)].hash = hash
+ app.net.AsMsgNetwork().SendMsg(msg, conn)
+}
+
+var apps []AppContext
+var threads sync.WaitGroup
+
+var seg_buff_size = 4096
+
+//export onTimeout
+func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) {
+ ctx := (*C.struct_timeout_callback_context_t)(userdata)
+ app := &apps[ctx.app_id]
+ tc := app.tc[uint64(ctx.addr_id)]
+ tc.ncompleted++
+ app.net.AsMsgNetwork().Terminate(salticidae.MsgNetworkConn(ctx.conn))
+ var s string
+ for addr_id, v := range app.tc {
+ s += fmt.Sprintf(" %d(%d)", C.ntohs(C.ushort(addr_id >> 32)), v.ncompleted)
+ }
+ C.free(unsafe.Pointer(ctx))
+ fmt.Printf("INFO: %d completed:%s\n", C.ntohs(C.ushort(app.addr.GetPort())), s)
+}
+
+//export onReceiveRand
+func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
+ C.free(userdata)
+ msg := salticidae.Msg(_msg)
+ hash := msgRandUnserialize(msg).GetHash()
+ conn := salticidae.MsgNetworkConn(_conn)
+ net := conn.GetNet()
+ net.SendMsg(msgAckSerialize(hash), conn)
+}
+
+//export onReceiveAck
+func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
+ msg := salticidae.Msg(_msg)
+ hash := msgAckUnserialize(msg)
+ id := *(* int)(userdata)
+ app := &apps[id]
+ conn := salticidae.MsgNetworkConn(_conn)
+ _addr := conn.GetAddr()
+ addr := addr2id(_addr)
+ tc := app.tc[addr]
+ C.free(userdata)
+
+ if hash.IsEq(tc.hash) {
+ panic("corrupted I/O!")
+ }
+
+ if tc.state == seg_buff_size * 2 {
+ sendRand(tc.state, app, conn)
+ tc.state = -1
+ ctx := (*C.struct_timeout_callback_context_t)(C.malloc(C.sizeof_struct_timeout_callback_context_t))
+ ctx.app_id = C.int(id)
+ ctx.addr_id = C.uint64_t(addr)
+ ctx.conn = (*C.struct_msgnetwork_conn_t)(conn)
+ tc.timer = salticidae.NewTimerEvent(ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx))
+ t := rand.Float64() * 10
+ tc.timer.Add(t)
+ fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t)
+ } else if tc.state == -1 {
+ sendRand(rand.Int() % (seg_buff_size * 10), app, conn)
+ } else {
+ tc.state++
+ sendRand(tc.state, app, conn)
+ }
+}
+
+//export connHandler
+func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata unsafe.Pointer) {
+ conn := salticidae.MsgNetworkConn(_conn)
+ id := *(*int)(userdata)
+ C.free(userdata)
+ app := &apps[id]
+ if connected {
+ if conn.GetMode() == salticidae.CONN_MODE_ACTIVE {
+ addr := conn.GetAddr()
+ tc := app.tc[addr2id(addr)]
+ tc.state = 1
+ fmt.Printf("INFO: increasing phase\n")
+ sendRand(tc.state, app, conn)
+ }
+ }
+}
+
+//export onStopLoop
+func onStopLoop(_ *C.threadcall_handle_t, userdata unsafe.Pointer) {
+ ec := salticidae.EventContext(userdata)
+ ec.Stop()
+}
+
+//export onTerm
+func onTerm(_ C.int) {
+ for i, _ := range apps {
+ a := &apps[i]
+ a.tcall.AsyncCall(
+ salticidae.ThreadCallCallback(C.onStopLoop),
+ unsafe.Pointer(a.ec))
+ }
+ ec.Stop()
+}
+
+func main() {
+ ec = salticidae.NewEventContext()
+
+ var addrs []salticidae.NetAddr
+ for i := 0; i < 4; i++ {
+ fmt.Println("%s", "127.0.0.1:" + string(12345 + i))
+ addrs = append(addrs,
+ salticidae.NewAddrFromIPPortString("127.0.0.1:" + string(12345 + i)))
+ }
+ netconfig := salticidae.NewPeerNetworkConfig()
+ apps = make([]AppContext, len(addrs))
+ for i, addr := range addrs {
+ ec := salticidae.NewEventContext()
+ apps[i] = AppContext {
+ addr: addr,
+ ec: ec,
+ net: salticidae.NewPeerNetwork(ec, netconfig),
+ tcall: salticidae.ThreadCall(ec),
+ tc: make(map[uint64] *TestContext),
+ }
+ _i := (C.malloc(C.sizeof_int))
+ *(*C.int)(_i) = C.int(i)
+ net := apps[i].net.AsMsgNetwork()
+ net.RegHandler(MSG_OPCODE_RAND, salticidae.MsgNetworkMsgCallback(C.onReceiveRand), _i)
+ net.RegHandler(MSG_OPCODE_ACK, salticidae.MsgNetworkMsgCallback(C.onReceiveAck), _i)
+ net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), _i)
+ net.Start()
+ }
+
+ threads.Add(len(apps))
+ for i, _ := range apps {
+ go func() {
+ a := &apps[i]
+ a.net.Listen(a.addr)
+ for _, addr := range addrs {
+ if !addr.IsEq(a.addr) {
+ a.net.AddPeer(addr)
+ }
+ }
+ a.ec.Dispatch()
+ threads.Done()
+ }()
+ }
+ threads.Wait()
+
+ ev_int := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm))
+ ev_int.Add(salticidae.SIGINT)
+ ev_term := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm))
+ ev_term.Add(salticidae.SIGTERM)
+
+ ec.Dispatch()
+
+ ev_int.Free()
+ ev_term.Free()
+ ec.Free()
+}