From d247c437f396709f62eadaf863b8a552d7142fc7 Mon Sep 17 00:00:00 2001 From: Determinant Date: Sun, 30 Jun 2019 17:03:51 -0400 Subject: fix NetAddrArray bug; add new example --- Makefile | 5 +- bench_network/main.go | 179 ++++++++++++++++++++++++++++++++++++++++++++++++++ network.go | 5 ++ salticidae | 2 +- 4 files changed, 189 insertions(+), 2 deletions(-) create mode 100644 bench_network/main.go diff --git a/Makefile b/Makefile index 7ac8971..25bcc8b 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .PHONY: all clean -all: build/test_msgnet build/test_p2p_stress build/test_msgnet_tls +all: build/test_msgnet build/test_p2p_stress build/test_msgnet_tls build/bench_network salticidae/libsalticidae.so: @@ -19,6 +19,9 @@ build/test_msgnet_tls: salticidae/libsalticidae.so test_msgnet_tls/main.go build/test_p2p_stress: salticidae/libsalticidae.so test_p2p_stress/main.go make -C salticidae/ go build -o $@ github.com/Determinant/salticidae-go/test_p2p_stress +build/bench_network: salticidae/libsalticidae.so bench_network/main.go + make -C salticidae/ + go build -o $@ github.com/Determinant/salticidae-go/bench_network clean: rm -rf build/ diff --git a/bench_network/main.go b/bench_network/main.go new file mode 100644 index 0000000..842fed2 --- /dev/null +++ b/bench_network/main.go @@ -0,0 +1,179 @@ +package main + +// #cgo CFLAGS: -I${SRCDIR}/../salticidae/include/ +// #include +// #include "salticidae/network.h" +// void onTerm(int sig, void *); +// void onBobStop(threadcall_handle_t *, void *); +// void onTrigger(threadcall_handle_t *, void *); +// bool connHandler(msgnetwork_conn_t *, bool, void *); +// void onReceiveBytes(msg_t *, msgnetwork_conn_t *, void *); +// void onPeriodStat(timerev_t *, void *); +import "C" + +import ( + "os" + "fmt" + "unsafe" + "github.com/Determinant/salticidae-go" +) + +const ( + MSG_OPCODE_BYTES salticidae.Opcode = iota +) + +func msgBytesSerialize(size int) salticidae.Msg { + serialized := salticidae.NewDataStream() + serialized.PutU32(salticidae.ToLittleEndianU32(uint32(size))) + serialized.PutData(make([]byte, size)) + return salticidae.NewMsgMovedFromByteArray( + MSG_OPCODE_BYTES, salticidae.NewByteArrayMovedFromDataStream(serialized)) +} + +func checkError(err *salticidae.Error) { + if err.GetCode() != 0 { + fmt.Printf("error during a sync call: %s\n", salticidae.StrError(err.GetCode())) + os.Exit(1) + } +} + +type MyNet struct { + id *C.int + net salticidae.MsgNetwork + conn salticidae.MsgNetworkConn + name string + evPeriodStat salticidae.TimerEvent + tcall salticidae.ThreadCall + nrecv uint32 + statTimeout float64 +} + +var ( + mynets []MyNet + ec salticidae.EventContext + tec salticidae.EventContext + bobThread chan struct{} +) + +//export onBobStop +func onBobStop(_ *C.threadcall_handle_t, userdata unsafe.Pointer) { + tec.Stop() +} + +//export onTerm +func onTerm(_ C.int, _ unsafe.Pointer) { + bob := &mynets[1] + bob.tcall.AsyncCall(salticidae.ThreadCallCallback(C.onBobStop), unsafe.Pointer(bob.id)) + ec.Stop() + <-bobThread +} + +//export onTrigger +func onTrigger(_ *C.threadcall_handle_t, userdata unsafe.Pointer) { + id := *(*int)(userdata) + mynet := &mynets[id] + mynet.net.SendMsg(msgBytesSerialize(256), mynet.conn) + if !mynet.conn.IsTerminated() { + mynet.tcall.AsyncCall(salticidae.ThreadCallCallback(C.onTrigger), userdata) + } +} + +//export onReceiveBytes +func onReceiveBytes(_ *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { + id := *(*int)(userdata) + mynet := &mynets[id] + mynet.nrecv++ +} + +//export connHandler +func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata unsafe.Pointer) C.bool { + conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)) + id := *(*int)(userdata) + mynet := &mynets[id] + if connected { + if conn.GetMode() == salticidae.CONN_MODE_ACTIVE { + fmt.Printf("[%s] connected, sending hello.\n", mynet.name) + mynet.conn = conn.Copy() + mynet.tcall.AsyncCall(salticidae.ThreadCallCallback(C.onTrigger), userdata) + } else { + fmt.Printf("[%s] passively connected, waiting for greetings.\n", mynet.name) + } + } else { + fmt.Printf("[%s] disconnected, retrying.\n", mynet.name) + err := salticidae.NewError() + mynet.net.Connect(conn.GetAddr(), false, &err) + } + return true +} + +//export onPeriodStat +func onPeriodStat(_ *C.timerev_t, userdata unsafe.Pointer) { + id := *(*int)(userdata) + mynet := &mynets[id] + fmt.Printf("%.2f mps\n", float64(mynet.nrecv) / mynet.statTimeout) + mynet.nrecv = 0 + mynet.evPeriodStat.Add(mynet.statTimeout) +} + +func genMyNet(ec salticidae.EventContext, name string, statTimeout float64, _id int) MyNet { + err := salticidae.NewError() + nc := salticidae.NewMsgNetworkConfig() + nc.QueueCapacity(65536) + nc.BurstSize(1000) + net := salticidae.NewMsgNetwork(ec, nc, &err); checkError(&err) + id := (*C.int)(C.malloc(C.sizeof_int)) + *id = C.int(_id) + n := MyNet { + id: id, + net: net, + conn: nil, + name: name, + evPeriodStat: salticidae.NewTimerEvent(ec, salticidae.TimerEventCallback(C.onPeriodStat), unsafe.Pointer(id)), + tcall: salticidae.NewThreadCall(ec), + nrecv: 0, + statTimeout: statTimeout, + } + n.net.RegHandler(MSG_OPCODE_BYTES, salticidae.MsgNetworkMsgCallback(C.onReceiveBytes), unsafe.Pointer(id)) + n.net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), unsafe.Pointer(id)) + if statTimeout > 0 { + n.evPeriodStat.Add(0) + } + return n +} + +func main() { + ec = salticidae.NewEventContext() + err := salticidae.NewError() + + aliceAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345", &err) + //bobAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346", &err) + + mynets = append(mynets, genMyNet(ec, "alice", 10, 0)) + alice := &mynets[0] + alice.net.Start() + alice.net.Listen(aliceAddr, &err); checkError(&err) + bobThread = make(chan struct{}) + + tec = salticidae.NewEventContext() + mynets = append(mynets, genMyNet(tec, "bob", -1, 1)) + + go func() { + bob := &mynets[1] + bob.net.Start() + bob.net.Connect(aliceAddr, false, &err); checkError(&err) + tec.Dispatch() + bobThread <-struct{}{} + }() + + ev_int := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm), nil) + ev_int.Add(salticidae.SIGINT) + ev_term := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm), nil) + ev_term.Add(salticidae.SIGTERM) + + ec.Dispatch() + + for i, _ := range mynets { + mynets[i].net.Stop() + C.free(unsafe.Pointer(mynets[i].id)) + } +} diff --git a/network.go b/network.go index 6e0d8c6..22e0c31 100644 --- a/network.go +++ b/network.go @@ -59,6 +59,11 @@ func (self MsgNetworkConn) GetAddr() NetAddr { return NetAddrFromC(C.msgnetwork_conn_get_addr(self.inner)) } +// Check if the connection has been terminated. +func (self MsgNetworkConn) IsTerminated() bool { + return bool(C.msgnetwork_conn_is_terminated(self.inner)) +} + // Get the certificate of the remote end of this connection. Use Copy() to make a // copy of the certificate if you want to use the certificate object beyond the // lifetime of the connection. diff --git a/salticidae b/salticidae index b06eef5..624550d 160000 --- a/salticidae +++ b/salticidae @@ -1 +1 @@ -Subproject commit b06eef583fb9084f29e416a1af7555de4a73f764 +Subproject commit 624550d7e1c032a4d3896ba002aa3ecb895f447c -- cgit v1.2.3-70-g09d2