package main
// #cgo CFLAGS: -I${SRCDIR}/../salticidae/include/
// #include <stdlib.h>
// #include <arpa/inet.h>
// #include "salticidae/network.h"
// void onTerm(int sig);
// void onReceiveRand(msg_t *, msgnetwork_conn_t *, void *);
// void onReceiveAck(msg_t *, msgnetwork_conn_t *, void *);
// void onStopLoop(threadcall_handle_t *, void *);
// void connHandler(msgnetwork_conn_t *, bool, void *);
// void onTimeout(timerev_t *, void *);
// typedef struct timeout_callback_context_t {
// int app_id;
// uint64_t addr_id;
// msgnetwork_conn_t *conn;
// } timerout_callback_context_t;
import "C"
import (
"salticidae-go"
"math/rand"
"fmt"
"sync"
"unsafe"
)
var ec salticidae.EventContext
const (
MSG_OPCODE_RAND salticidae.Opcode = iota
MSG_OPCODE_ACK
)
func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) {
buffer := make([]byte, size)
_, err := rand.Read(buffer)
if err == nil {
panic("rand source failed")
}
serialized := salticidae.NewDataStreamFromBytes(buffer)
hash := serialized.GetHash()
return salticidae.NewMsg(
MSG_OPCODE_RAND,
salticidae.NewByteArrayMovedFromDataStream(serialized)), hash
}
func msgRandUnserialize(msg salticidae.Msg) salticidae.DataStream {
return msg.GetPayload()
}
func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg {
serialized := salticidae.NewDataStream()
hash.Serialize(serialized)
return salticidae.NewMsg(
MSG_OPCODE_ACK,
salticidae.NewByteArrayMovedFromDataStream(serialized))
}
func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 {
p := msg.GetPayload()
hash := salticidae.NewUInt256()
hash.Unserialize(p)
p.Free()
return hash
}
type TestContext struct {
timer salticidae.TimerEvent
state int
hash salticidae.UInt256
ncompleted int
}
type AppContext struct {
addr salticidae.NetAddr
ec salticidae.EventContext
net salticidae.PeerNetwork
tcall salticidae.ThreadCall
tc map[uint64] *TestContext
}
func NewTestContext() TestContext {
return TestContext { ncompleted: 0 }
}
func addr2id(addr salticidae.NetAddr) uint64 {
return uint64(addr.GetIP()) | (uint64(addr.GetPort()) << 32)
}
func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) {
msg, hash := msgRandSerialize(size)
addr := conn.GetAddr()
app.tc[addr2id(addr)].hash = hash
app.net.AsMsgNetwork().SendMsg(msg, conn)
}
var apps []AppContext
var threads sync.WaitGroup
var seg_buff_size = 4096
//export onTimeout
func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) {
ctx := (*C.struct_timeout_callback_context_t)(userdata)
app := &apps[ctx.app_id]
tc := app.tc[uint64(ctx.addr_id)]
tc.ncompleted++
app.net.AsMsgNetwork().Terminate(salticidae.MsgNetworkConn(ctx.conn))
var s string
for addr_id, v := range app.tc {
s += fmt.Sprintf(" %d(%d)", C.ntohs(C.ushort(addr_id >> 32)), v.ncompleted)
}
C.free(unsafe.Pointer(ctx))
fmt.Printf("INFO: %d completed:%s\n", C.ntohs(C.ushort(app.addr.GetPort())), s)
}
//export onReceiveRand
func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
C.free(userdata)
msg := salticidae.Msg(_msg)
hash := msgRandUnserialize(msg).GetHash()
conn := salticidae.MsgNetworkConn(_conn)
net := conn.GetNet()
net.SendMsg(msgAckSerialize(hash), conn)
}
//export onReceiveAck
func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
msg := salticidae.Msg(_msg)
hash := msgAckUnserialize(msg)
id := *(* int)(userdata)
app := &apps[id]
conn := salticidae.MsgNetworkConn(_conn)
_addr := conn.GetAddr()
addr := addr2id(_addr)
tc := app.tc[addr]
C.free(userdata)
if hash.IsEq(tc.hash) {
panic("corrupted I/O!")
}
if tc.state == seg_buff_size * 2 {
sendRand(tc.state, app, conn)
tc.state = -1
ctx := (*C.struct_timeout_callback_context_t)(C.malloc(C.sizeof_struct_timeout_callback_context_t))
ctx.app_id = C.int(id)
ctx.addr_id = C.uint64_t(addr)
ctx.conn = (*C.struct_msgnetwork_conn_t)(conn)
tc.timer = salticidae.NewTimerEvent(ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx))
t := rand.Float64() * 10
tc.timer.Add(t)
fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t)
} else if tc.state == -1 {
sendRand(rand.Int() % (seg_buff_size * 10), app, conn)
} else {
tc.state++
sendRand(tc.state, app, conn)
}
}
//export connHandler
func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata unsafe.Pointer) {
conn := salticidae.MsgNetworkConn(_conn)
id := *(*int)(userdata)
C.free(userdata)
app := &apps[id]
if connected {
if conn.GetMode() == salticidae.CONN_MODE_ACTIVE {
addr := conn.GetAddr()
tc := app.tc[addr2id(addr)]
tc.state = 1
fmt.Printf("INFO: increasing phase\n")
sendRand(tc.state, app, conn)
}
}
}
//export onStopLoop
func onStopLoop(_ *C.threadcall_handle_t, userdata unsafe.Pointer) {
ec := salticidae.EventContext(userdata)
ec.Stop()
}
//export onTerm
func onTerm(_ C.int) {
for i, _ := range apps {
a := &apps[i]
a.tcall.AsyncCall(
salticidae.ThreadCallCallback(C.onStopLoop),
unsafe.Pointer(a.ec))
}
ec.Stop()
}
func main() {
ec = salticidae.NewEventContext()
var addrs []salticidae.NetAddr
for i := 0; i < 4; i++ {
fmt.Println("%s", "127.0.0.1:" + string(12345 + i))
addrs = append(addrs,
salticidae.NewAddrFromIPPortString("127.0.0.1:" + string(12345 + i)))
}
netconfig := salticidae.NewPeerNetworkConfig()
apps = make([]AppContext, len(addrs))
for i, addr := range addrs {
ec := salticidae.NewEventContext()
apps[i] = AppContext {
addr: addr,
ec: ec,
net: salticidae.NewPeerNetwork(ec, netconfig),
tcall: salticidae.ThreadCall(ec),
tc: make(map[uint64] *TestContext),
}
_i := (C.malloc(C.sizeof_int))
*(*C.int)(_i) = C.int(i)
net := apps[i].net.AsMsgNetwork()
net.RegHandler(MSG_OPCODE_RAND, salticidae.MsgNetworkMsgCallback(C.onReceiveRand), _i)
net.RegHandler(MSG_OPCODE_ACK, salticidae.MsgNetworkMsgCallback(C.onReceiveAck), _i)
net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), _i)
net.Start()
}
threads.Add(len(apps))
for i, _ := range apps {
go func() {
a := &apps[i]
a.net.Listen(a.addr)
for _, addr := range addrs {
if !addr.IsEq(a.addr) {
a.net.AddPeer(addr)
}
}
a.ec.Dispatch()
threads.Done()
}()
}
threads.Wait()
ev_int := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm))
ev_int.Add(salticidae.SIGINT)
ev_term := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm))
ev_term.Add(salticidae.SIGTERM)
ec.Dispatch()
ev_int.Free()
ev_term.Free()
ec.Free()
}