package main // #cgo CFLAGS: -I${SRCDIR}/../salticidae/include/ // #include // #include // #include "salticidae/network.h" // void onTerm(int sig); // void onReceiveRand(msg_t *, msgnetwork_conn_t *, void *); // void onReceiveAck(msg_t *, msgnetwork_conn_t *, void *); // void onStopLoop(threadcall_handle_t *, void *); // void connHandler(msgnetwork_conn_t *, bool, void *); // void onTimeout(timerev_t *, void *); // typedef struct timeout_callback_context_t { // int app_id; // uint64_t addr_id; // msgnetwork_conn_t *conn; // } timerout_callback_context_t; import "C" import ( "salticidae-go" "math/rand" "fmt" "sync" "unsafe" ) var ec salticidae.EventContext const ( MSG_OPCODE_RAND salticidae.Opcode = iota MSG_OPCODE_ACK ) func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) { buffer := make([]byte, size) _, err := rand.Read(buffer) if err == nil { panic("rand source failed") } serialized := salticidae.NewDataStreamFromBytes(buffer) hash := serialized.GetHash() return salticidae.NewMsg( MSG_OPCODE_RAND, salticidae.NewByteArrayMovedFromDataStream(serialized)), hash } func msgRandUnserialize(msg salticidae.Msg) salticidae.DataStream { return msg.GetPayload() } func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg { serialized := salticidae.NewDataStream() hash.Serialize(serialized) return salticidae.NewMsg( MSG_OPCODE_ACK, salticidae.NewByteArrayMovedFromDataStream(serialized)) } func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 { p := msg.GetPayload() hash := salticidae.NewUInt256() hash.Unserialize(p) p.Free() return hash } type TestContext struct { timer salticidae.TimerEvent state int hash salticidae.UInt256 ncompleted int } type AppContext struct { addr salticidae.NetAddr ec salticidae.EventContext net salticidae.PeerNetwork tcall salticidae.ThreadCall tc map[uint64] *TestContext } func NewTestContext() TestContext { return TestContext { ncompleted: 0 } } func addr2id(addr salticidae.NetAddr) uint64 { return uint64(addr.GetIP()) | (uint64(addr.GetPort()) << 32) } func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) { msg, hash := msgRandSerialize(size) addr := conn.GetAddr() app.tc[addr2id(addr)].hash = hash app.net.AsMsgNetwork().SendMsg(msg, conn) } var apps []AppContext var threads sync.WaitGroup var seg_buff_size = 4096 //export onTimeout func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) { ctx := (*C.struct_timeout_callback_context_t)(userdata) app := &apps[ctx.app_id] tc := app.tc[uint64(ctx.addr_id)] tc.ncompleted++ app.net.AsMsgNetwork().Terminate(salticidae.MsgNetworkConn(ctx.conn)) var s string for addr_id, v := range app.tc { s += fmt.Sprintf(" %d(%d)", C.ntohs(C.ushort(addr_id >> 32)), v.ncompleted) } C.free(unsafe.Pointer(ctx)) fmt.Printf("INFO: %d completed:%s\n", C.ntohs(C.ushort(app.addr.GetPort())), s) } //export onReceiveRand func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { C.free(userdata) msg := salticidae.Msg(_msg) hash := msgRandUnserialize(msg).GetHash() conn := salticidae.MsgNetworkConn(_conn) net := conn.GetNet() net.SendMsg(msgAckSerialize(hash), conn) } //export onReceiveAck func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { msg := salticidae.Msg(_msg) hash := msgAckUnserialize(msg) id := *(* int)(userdata) app := &apps[id] conn := salticidae.MsgNetworkConn(_conn) _addr := conn.GetAddr() addr := addr2id(_addr) tc := app.tc[addr] C.free(userdata) if hash.IsEq(tc.hash) { panic("corrupted I/O!") } if tc.state == seg_buff_size * 2 { sendRand(tc.state, app, conn) tc.state = -1 ctx := (*C.struct_timeout_callback_context_t)(C.malloc(C.sizeof_struct_timeout_callback_context_t)) ctx.app_id = C.int(id) ctx.addr_id = C.uint64_t(addr) ctx.conn = (*C.struct_msgnetwork_conn_t)(conn) tc.timer = salticidae.NewTimerEvent(ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx)) t := rand.Float64() * 10 tc.timer.Add(t) fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t) } else if tc.state == -1 { sendRand(rand.Int() % (seg_buff_size * 10), app, conn) } else { tc.state++ sendRand(tc.state, app, conn) } } //export connHandler func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata unsafe.Pointer) { conn := salticidae.MsgNetworkConn(_conn) id := *(*int)(userdata) C.free(userdata) app := &apps[id] if connected { if conn.GetMode() == salticidae.CONN_MODE_ACTIVE { addr := conn.GetAddr() tc := app.tc[addr2id(addr)] tc.state = 1 fmt.Printf("INFO: increasing phase\n") sendRand(tc.state, app, conn) } } } //export onStopLoop func onStopLoop(_ *C.threadcall_handle_t, userdata unsafe.Pointer) { ec := salticidae.EventContext(userdata) ec.Stop() } //export onTerm func onTerm(_ C.int) { for i, _ := range apps { a := &apps[i] a.tcall.AsyncCall( salticidae.ThreadCallCallback(C.onStopLoop), unsafe.Pointer(a.ec)) } ec.Stop() } func main() { ec = salticidae.NewEventContext() var addrs []salticidae.NetAddr for i := 0; i < 4; i++ { fmt.Println("%s", "127.0.0.1:" + string(12345 + i)) addrs = append(addrs, salticidae.NewAddrFromIPPortString("127.0.0.1:" + string(12345 + i))) } netconfig := salticidae.NewPeerNetworkConfig() apps = make([]AppContext, len(addrs)) for i, addr := range addrs { ec := salticidae.NewEventContext() apps[i] = AppContext { addr: addr, ec: ec, net: salticidae.NewPeerNetwork(ec, netconfig), tcall: salticidae.ThreadCall(ec), tc: make(map[uint64] *TestContext), } _i := (C.malloc(C.sizeof_int)) *(*C.int)(_i) = C.int(i) net := apps[i].net.AsMsgNetwork() net.RegHandler(MSG_OPCODE_RAND, salticidae.MsgNetworkMsgCallback(C.onReceiveRand), _i) net.RegHandler(MSG_OPCODE_ACK, salticidae.MsgNetworkMsgCallback(C.onReceiveAck), _i) net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), _i) net.Start() } threads.Add(len(apps)) for i, _ := range apps { go func() { a := &apps[i] a.net.Listen(a.addr) for _, addr := range addrs { if !addr.IsEq(a.addr) { a.net.AddPeer(addr) } } a.ec.Dispatch() threads.Done() }() } threads.Wait() ev_int := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm)) ev_int.Add(salticidae.SIGINT) ev_term := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm)) ev_term.Add(salticidae.SIGTERM) ec.Dispatch() ev_int.Free() ev_term.Free() ec.Free() }