1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
|
package main
// #cgo CFLAGS: -I${SRCDIR}/../salticidae/include/
// #include <stdlib.h>
// #include "salticidae/network.h"
// void onTerm(int sig, void *);
// void onBobStop(threadcall_handle_t *, void *);
// void onTrigger(threadcall_handle_t *, void *);
// bool connHandler(msgnetwork_conn_t *, bool, void *);
// void onReceiveBytes(msg_t *, msgnetwork_conn_t *, void *);
// void onPeriodStat(timerev_t *, void *);
import "C"
import (
"os"
"fmt"
"unsafe"
"github.com/Determinant/salticidae-go"
)
const (
MSG_OPCODE_BYTES salticidae.Opcode = iota
)
func msgBytesSerialize(size int) (res salticidae.Msg) {
serialized := salticidae.NewDataStream(false)
serialized.PutU32(salticidae.ToLittleEndianU32(uint32(size)))
serialized.PutData(make([]byte, size))
ba := salticidae.NewByteArrayMovedFromDataStream(serialized, false)
serialized.Free()
res = salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_BYTES, ba, false)
ba.Free()
return
}
func checkError(err *salticidae.Error) {
if err.GetCode() != 0 {
fmt.Printf("error during a sync call: %s\n", salticidae.StrError(err.GetCode()))
os.Exit(1)
}
}
type MyNet struct {
id *C.int
net salticidae.MsgNetwork
conn salticidae.MsgNetworkConn
name string
evPeriodStat salticidae.TimerEvent
tcall salticidae.ThreadCall
nrecv uint32
statTimeout float64
}
var (
mynets []MyNet
ec salticidae.EventContext
tec salticidae.EventContext
bobThread chan struct{}
)
//export onBobStop
func onBobStop(_ *C.threadcall_handle_t, userdata unsafe.Pointer) {
tec.Stop()
}
//export onTerm
func onTerm(_ C.int, _ unsafe.Pointer) {
bob := &mynets[1]
bob.tcall.AsyncCall(salticidae.ThreadCallCallback(C.onBobStop), unsafe.Pointer(bob.id))
ec.Stop()
<-bobThread
}
//export onTrigger
func onTrigger(_ *C.threadcall_handle_t, userdata unsafe.Pointer) {
id := *(*int)(userdata)
mynet := &mynets[id]
payload := msgBytesSerialize(256)
mynet.net.SendMsg(payload, mynet.conn)
payload.Free()
if !mynet.conn.IsTerminated() {
mynet.tcall.AsyncCall(salticidae.ThreadCallCallback(C.onTrigger), userdata)
}
}
//export onReceiveBytes
func onReceiveBytes(_ *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
id := *(*int)(userdata)
mynet := &mynets[id]
mynet.nrecv++
}
//export connHandler
func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, userdata unsafe.Pointer) C.bool {
conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
id := *(*int)(userdata)
mynet := &mynets[id]
if connected {
if conn.GetMode() == salticidae.CONN_MODE_ACTIVE {
fmt.Printf("[%s] connected, sending hello.\n", mynet.name)
mynet.conn = conn.Copy(true)
mynet.tcall.AsyncCall(salticidae.ThreadCallCallback(C.onTrigger), userdata)
} else {
fmt.Printf("[%s] passively connected, waiting for greetings.\n", mynet.name)
}
} else {
fmt.Printf("[%s] disconnected, retrying.\n", mynet.name)
mynet.net.Connect(conn.GetAddr())
}
return true
}
//export onPeriodStat
func onPeriodStat(_ *C.timerev_t, userdata unsafe.Pointer) {
id := *(*int)(userdata)
mynet := &mynets[id]
fmt.Printf("%.2f mps\n", float64(mynet.nrecv) / mynet.statTimeout)
mynet.nrecv = 0
mynet.evPeriodStat.Add(mynet.statTimeout)
}
func genMyNet(ec salticidae.EventContext, name string, statTimeout float64, _id int) MyNet {
err := salticidae.NewError()
nc := salticidae.NewMsgNetworkConfig()
nc.QueueCapacity(65536)
nc.BurstSize(1000)
net := salticidae.NewMsgNetwork(ec, nc, &err); checkError(&err)
id := (*C.int)(C.malloc(C.sizeof_int))
*id = C.int(_id)
n := MyNet {
id: id,
net: net,
conn: nil,
name: name,
evPeriodStat: salticidae.NewTimerEvent(ec, salticidae.TimerEventCallback(C.onPeriodStat), unsafe.Pointer(id)),
tcall: salticidae.NewThreadCall(ec),
nrecv: 0,
statTimeout: statTimeout,
}
n.net.RegHandler(MSG_OPCODE_BYTES, salticidae.MsgNetworkMsgCallback(C.onReceiveBytes), unsafe.Pointer(id))
n.net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), unsafe.Pointer(id))
if statTimeout > 0 {
n.evPeriodStat.Add(0)
}
return n
}
func main() {
ec = salticidae.NewEventContext()
err := salticidae.NewError()
aliceAddr := salticidae.NewNetAddrFromIPPortString("127.0.0.1:12345", true, &err)
mynets = append(mynets, genMyNet(ec, "alice", 10, 0))
alice := &mynets[0]
alice.net.Start()
alice.net.Listen(aliceAddr, &err); checkError(&err)
bobThread = make(chan struct{})
tec = salticidae.NewEventContext()
mynets = append(mynets, genMyNet(tec, "bob", -1, 1))
go func() {
bob := &mynets[1]
bob.net.Start()
bob.net.Connect(aliceAddr)
tec.Dispatch()
bobThread <-struct{}{}
}()
ev_int := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm), nil)
ev_int.Add(salticidae.SIGINT)
ev_term := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm), nil)
ev_term.Add(salticidae.SIGTERM)
ec.Dispatch()
for i, _ := range mynets {
mynets[i].net.Stop()
C.free(unsafe.Pointer(mynets[i].id))
}
}
|