aboutsummaryrefslogblamecommitdiff
path: root/bench_network/main.go
blob: d7d09256eec01f3bbbd1a9f5b6ffa3f0a790da8b (plain) (tree)























                                                             

                                                       

                                                                 




                                                                          











































                                                                                           


                                          



















                                                                                                       
                                        





                                                                                        
                                         










































                                                                                                                      
                                                                                     












                                                         
                                  















                                                                                     
package main

// #cgo CFLAGS: -I${SRCDIR}/../salticidae/include/
// #include <stdlib.h>
// #include "salticidae/network.h"
// void onTerm(int sig, void *);
// void onBobStop(threadcall_handle_t *, void *);
// void onTrigger(threadcall_handle_t *, void *);
// bool connHandler(msgnetwork_conn_t *, bool, void *);
// void onReceiveBytes(msg_t *, msgnetwork_conn_t *, void *);
// void onPeriodStat(timerev_t *, void *);
import "C"

import (
    "os"
    "fmt"
    "unsafe"
    "github.com/Determinant/salticidae-go"
)

const (
    MSG_OPCODE_BYTES salticidae.Opcode = iota
)

func msgBytesSerialize(size int) (res salticidae.Msg) {
    serialized := salticidae.NewDataStream(false)
    serialized.PutU32(salticidae.ToLittleEndianU32(uint32(size)))
    serialized.PutData(make([]byte, size))
    ba := salticidae.NewByteArrayMovedFromDataStream(serialized, false)
    serialized.Free()
    res = salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_BYTES, ba, false)
    ba.Free()
    return
}

func checkError(err *salticidae.Error) {
    if err.GetCode() != 0 {
        fmt.Printf("error during a sync call: %s\n", salticidae.StrError(err.GetCode()))
        os.Exit(1)
    }
}

type MyNet struct {
    id *C.int
    net salticidae.MsgNetwork
    conn salticidae.MsgNetworkConn
    name string
    evPeriodStat salticidae.TimerEvent
    tcall salticidae.ThreadCall
    nrecv uint32
    statTimeout float64
}

var (
    mynets []MyNet
    ec salticidae.EventContext
    tec salticidae.EventContext
    bobThread chan struct{}
)

//export onBobStop
func onBobStop(_ *C.threadcall_handle_t, userdata unsafe.Pointer) {
    tec.Stop()
}

//export onTerm
func onTerm(_ C.int, _ unsafe.Pointer) {
    bob := &mynets[1]
    bob.tcall.AsyncCall(salticidae.ThreadCallCallback(C.onBobStop), unsafe.Pointer(bob.id))
    ec.Stop()
    <-bobThread
}

//export onTrigger
func onTrigger(_ *C.threadcall_handle_t, userdata unsafe.Pointer) {
    id := *(*int)(userdata)
    mynet := &mynets[id]
    payload := msgBytesSerialize(256)
    mynet.net.SendMsg(payload, mynet.conn)
    payload.Free()
    if !mynet.conn.IsTerminated() {
        mynet.tcall.AsyncCall(salticidae.ThreadCallCallback(C.onTrigger), userdata)
    }
}

//export onReceiveBytes
func onReceiveBytes(_ *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
    id := *(*int)(userdata)
    mynet := &mynets[id]
    mynet.nrecv++
}

//export connHandler
func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata unsafe.Pointer) C.bool {
    conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
    id := *(*int)(userdata)
    mynet := &mynets[id]
    if connected {
        if conn.GetMode() == salticidae.CONN_MODE_ACTIVE {
            fmt.Printf("[%s] connected, sending hello.\n", mynet.name)
            mynet.conn = conn.Copy(true)
            mynet.tcall.AsyncCall(salticidae.ThreadCallCallback(C.onTrigger), userdata)
        } else {
            fmt.Printf("[%s] passively connected, waiting for greetings.\n", mynet.name)
        }
    } else {
        fmt.Printf("[%s] disconnected, retrying.\n", mynet.name)
        mynet.net.Connect(conn.GetAddr())
    }
    return true
}

//export onPeriodStat
func onPeriodStat(_ *C.timerev_t, userdata unsafe.Pointer) {
    id := *(*int)(userdata)
    mynet := &mynets[id]
    fmt.Printf("%.2f mps\n", float64(mynet.nrecv) / mynet.statTimeout)
    mynet.nrecv = 0
    mynet.evPeriodStat.Add(mynet.statTimeout)
}

func genMyNet(ec salticidae.EventContext, name string, statTimeout float64, _id int) MyNet {
    err := salticidae.NewError()
    nc := salticidae.NewMsgNetworkConfig()
    nc.QueueCapacity(65536)
    nc.BurstSize(1000)
    net := salticidae.NewMsgNetwork(ec, nc, &err); checkError(&err)
    id := (*C.int)(C.malloc(C.sizeof_int))
    *id = C.int(_id)
    n := MyNet {
        id: id,
        net: net,
        conn: nil,
        name: name,
        evPeriodStat: salticidae.NewTimerEvent(ec, salticidae.TimerEventCallback(C.onPeriodStat), unsafe.Pointer(id)),
        tcall: salticidae.NewThreadCall(ec),
        nrecv: 0,
        statTimeout: statTimeout,
    }
    n.net.RegHandler(MSG_OPCODE_BYTES, salticidae.MsgNetworkMsgCallback(C.onReceiveBytes), unsafe.Pointer(id))
    n.net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), unsafe.Pointer(id))
    if statTimeout > 0 {
        n.evPeriodStat.Add(0)
    }
    return n
}

func main() {
    ec = salticidae.NewEventContext()
    err := salticidae.NewError()

    aliceAddr := salticidae.NewNetAddrFromIPPortString("127.0.0.1:12345", true, &err)

    mynets = append(mynets, genMyNet(ec, "alice", 10, 0))
    alice := &mynets[0]
    alice.net.Start()
    alice.net.Listen(aliceAddr, &err); checkError(&err)
    bobThread = make(chan struct{})

    tec = salticidae.NewEventContext()
    mynets = append(mynets, genMyNet(tec, "bob", -1, 1))

    go func() {
        bob := &mynets[1]
        bob.net.Start()
        bob.net.Connect(aliceAddr)
        tec.Dispatch()
        bobThread <-struct{}{}
    }()

    ev_int := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm), nil)
    ev_int.Add(salticidae.SIGINT)
    ev_term := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm), nil)
    ev_term.Add(salticidae.SIGTERM)

    ec.Dispatch()

    for i, _ := range mynets {
        mynets[i].net.Stop()
        C.free(unsafe.Pointer(mynets[i].id))
    }
}