package main // #cgo CFLAGS: -I${SRCDIR}/../salticidae/include/ // #include // #include "salticidae/network.h" // void onTerm(int sig, void *); // void onBobStop(threadcall_handle_t *, void *); // void onTrigger(threadcall_handle_t *, void *); // bool connHandler(msgnetwork_conn_t *, bool, void *); // void onReceiveBytes(msg_t *, msgnetwork_conn_t *, void *); // void onPeriodStat(timerev_t *, void *); import "C" import ( "os" "fmt" "unsafe" "github.com/Determinant/salticidae-go" ) const ( MSG_OPCODE_BYTES salticidae.Opcode = iota ) func msgBytesSerialize(size int) (res salticidae.Msg) { serialized := salticidae.NewDataStream(false) serialized.PutU32(salticidae.ToLittleEndianU32(uint32(size))) serialized.PutData(make([]byte, size)) ba := salticidae.NewByteArrayMovedFromDataStream(serialized, false) serialized.Free() res = salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_BYTES, ba, false) ba.Free() return } func checkError(err *salticidae.Error) { if err.GetCode() != 0 { fmt.Printf("error during a sync call: %s\n", salticidae.StrError(err.GetCode())) os.Exit(1) } } type MyNet struct { id *C.int net salticidae.MsgNetwork conn salticidae.MsgNetworkConn name string evPeriodStat salticidae.TimerEvent tcall salticidae.ThreadCall nrecv uint32 statTimeout float64 } var ( mynets []MyNet ec salticidae.EventContext tec salticidae.EventContext bobThread chan struct{} ) //export onBobStop func onBobStop(_ *C.threadcall_handle_t, userdata unsafe.Pointer) { tec.Stop() } //export onTerm func onTerm(_ C.int, _ unsafe.Pointer) { bob := &mynets[1] bob.tcall.AsyncCall(salticidae.ThreadCallCallback(C.onBobStop), unsafe.Pointer(bob.id)) ec.Stop() <-bobThread } //export onTrigger func onTrigger(_ *C.threadcall_handle_t, userdata unsafe.Pointer) { id := *(*int)(userdata) mynet := &mynets[id] payload := msgBytesSerialize(256) mynet.net.SendMsg(payload, mynet.conn) payload.Free() if !mynet.conn.IsTerminated() { mynet.tcall.AsyncCall(salticidae.ThreadCallCallback(C.onTrigger), userdata) } } //export onReceiveBytes func onReceiveBytes(_ *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { id := *(*int)(userdata) mynet := &mynets[id] mynet.nrecv++ } //export connHandler func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata unsafe.Pointer) C.bool { conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)) id := *(*int)(userdata) mynet := &mynets[id] if connected { if conn.GetMode() == salticidae.CONN_MODE_ACTIVE { fmt.Printf("[%s] connected, sending hello.\n", mynet.name) mynet.conn = conn.Copy(true) mynet.tcall.AsyncCall(salticidae.ThreadCallCallback(C.onTrigger), userdata) } else { fmt.Printf("[%s] passively connected, waiting for greetings.\n", mynet.name) } } else { fmt.Printf("[%s] disconnected, retrying.\n", mynet.name) mynet.net.Connect(conn.GetAddr()) } return true } //export onPeriodStat func onPeriodStat(_ *C.timerev_t, userdata unsafe.Pointer) { id := *(*int)(userdata) mynet := &mynets[id] fmt.Printf("%.2f mps\n", float64(mynet.nrecv) / mynet.statTimeout) mynet.nrecv = 0 mynet.evPeriodStat.Add(mynet.statTimeout) } func genMyNet(ec salticidae.EventContext, name string, statTimeout float64, _id int) MyNet { err := salticidae.NewError() nc := salticidae.NewMsgNetworkConfig() nc.QueueCapacity(65536) nc.BurstSize(1000) net := salticidae.NewMsgNetwork(ec, nc, &err); checkError(&err) id := (*C.int)(C.malloc(C.sizeof_int)) *id = C.int(_id) n := MyNet { id: id, net: net, conn: nil, name: name, evPeriodStat: salticidae.NewTimerEvent(ec, salticidae.TimerEventCallback(C.onPeriodStat), unsafe.Pointer(id)), tcall: salticidae.NewThreadCall(ec), nrecv: 0, statTimeout: statTimeout, } n.net.RegHandler(MSG_OPCODE_BYTES, salticidae.MsgNetworkMsgCallback(C.onReceiveBytes), unsafe.Pointer(id)) n.net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), unsafe.Pointer(id)) if statTimeout > 0 { n.evPeriodStat.Add(0) } return n } func main() { ec = salticidae.NewEventContext() err := salticidae.NewError() aliceAddr := salticidae.NewNetAddrFromIPPortString("127.0.0.1:12345", true, &err) mynets = append(mynets, genMyNet(ec, "alice", 10, 0)) alice := &mynets[0] alice.net.Start() alice.net.Listen(aliceAddr, &err); checkError(&err) bobThread = make(chan struct{}) tec = salticidae.NewEventContext() mynets = append(mynets, genMyNet(tec, "bob", -1, 1)) go func() { bob := &mynets[1] bob.net.Start() bob.net.Connect(aliceAddr) tec.Dispatch() bobThread <-struct{}{} }() ev_int := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm), nil) ev_int.Add(salticidae.SIGINT) ev_term := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm), nil) ev_term.Add(salticidae.SIGTERM) ec.Dispatch() for i, _ := range mynets { mynets[i].net.Stop() C.free(unsafe.Pointer(mynets[i].id)) } }