From 1dfc133c0167fbba818b46883950bf87554f5dc1 Mon Sep 17 00:00:00 2001 From: Determinant Date: Mon, 17 Jun 2019 02:14:13 -0400 Subject: add unknown peer callback --- msg.go | 4 ++-- network.go | 7 ++++++- salticidae | 2 +- stream.go | 4 ++-- test_msgnet/main.go | 38 ++++++++++++++------------------------ test_p2p_stress/main.go | 24 +++++++++--------------- 6 files changed, 34 insertions(+), 45 deletions(-) diff --git a/msg.go b/msg.go index eb02fc3..0f0267d 100644 --- a/msg.go +++ b/msg.go @@ -5,11 +5,11 @@ package salticidae import "C" import "runtime" -type CMsg = *C.struct_msg_t +type CMsg = *C.msg_t type msg struct { inner CMsg } type Msg = *msg -func MsgFromC(ptr *C.struct_msg_t) Msg { return &msg{ inner: ptr } } +func MsgFromC(ptr *C.msg_t) Msg { return &msg{ inner: ptr } } func NewMsgMovedFromByteArray(opcode Opcode, _moved_payload ByteArray) Msg { res := &msg{ inner: C.msg_new_moved_from_bytearray(C._opcode_t(opcode), _moved_payload.inner) } diff --git a/network.go b/network.go index 5c63eb1..c30e3d3 100644 --- a/network.go +++ b/network.go @@ -115,7 +115,6 @@ func (self MsgNetwork) RegErrorHandler(callback MsgNetworkErrorCallback, userdat C.msgnetwork_reg_error_handler(self.inner, callback, userdata) } - func (self MsgNetworkConn) Copy() MsgNetworkConn { res := MsgNetworkConnFromC(C.msgnetwork_conn_copy(self.inner)) runtime.SetFinalizer(res, func(self MsgNetworkConn) { self.free() }) @@ -240,3 +239,9 @@ func (self PeerNetwork) MulticastMsgByMove(_moved_msg Msg, paddrs []NetAddr) { func (self PeerNetwork) Listen(listenAddr NetAddr, err *Error) { C.peernetwork_listen(self.inner, listenAddr.inner, err) } + +type MsgNetworkUnknownPeerCallback = C.msgnetwork_unknown_peer_callback_t + +func (self PeerNetwork) RegUnknownPeerHandler(callback MsgNetworkUnknownPeerCallback, userdata rawptr_t) { + C.peernetwork_reg_unknown_peer_handler(self.inner, callback, userdata) +} diff --git a/salticidae b/salticidae index 4dc7feb..c69b015 160000 --- a/salticidae +++ b/salticidae @@ -1 +1 @@ -Subproject commit 4dc7feb8b15e0a76cc95e5a80fce363cca637856 +Subproject commit c69b0150fe0c50b91c7c8a406f096de8e9cf62fc diff --git a/stream.go b/stream.go index 4fe7d7b..7d42020 100644 --- a/stream.go +++ b/stream.go @@ -6,7 +6,7 @@ import "C" import "runtime" type byteArray struct { - inner *C.struct_bytearray_t + inner *C.bytearray_t } type ByteArray = *byteArray @@ -25,7 +25,7 @@ func NewByteArrayMovedFromDataStream(src DataStream) ByteArray { } type dataStream struct { - inner *C.struct_datastream_t + inner *C.datastream_t } type DataStream = *dataStream diff --git a/test_msgnet/main.go b/test_msgnet/main.go index c2ae70d..4e52982 100644 --- a/test_msgnet/main.go +++ b/test_msgnet/main.go @@ -29,9 +29,8 @@ func msgHelloSerialize(name string, text string) salticidae.Msg { serialized.PutData(t) serialized.PutData([]byte(name)) serialized.PutData([]byte(text)) - s := salticidae.NewByteArrayMovedFromDataStream(serialized) return salticidae.NewMsgMovedFromByteArray( - MSG_OPCODE_HELLO, s) + MSG_OPCODE_HELLO, salticidae.NewByteArrayMovedFromDataStream(serialized)) } func msgHelloUnserialize(msg salticidae.Msg) (name string, text string) { @@ -43,8 +42,7 @@ func msgHelloUnserialize(msg salticidae.Msg) (name string, text string) { } func msgAckSerialize() salticidae.Msg { - s := salticidae.NewByteArray() - return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_ACK, s) + return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_ACK, salticidae.NewByteArray()) } func checkError(err *salticidae.Error) { @@ -108,7 +106,7 @@ func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, _ unsafe.P if conn.GetMode() == salticidae.CONN_MODE_ACTIVE { fmt.Printf("[%s] Connected, sending hello.\n", name) hello := msgHelloSerialize(name, "Hello there!") - (*n.net).SendMsgByMove(hello, conn) + n.net.SendMsgByMove(hello, conn) } else { fmt.Printf("[%s] Accepted, waiting for greetings.\n", name) } @@ -127,7 +125,8 @@ func errorHandler(_err *C.struct_SalticidaeCError, fatal C.bool, _ unsafe.Pointe fmt.Printf("Captured %s error during an async call: %s\n", s, salticidae.StrError(err.GetCode())) } -func genMyNet(ec salticidae.EventContext, name string) MyNet { +func genMyNet(ec salticidae.EventContext, name string, + myAddr salticidae.NetAddr, otherAddr salticidae.NetAddr) MyNet { netconfig := salticidae.NewMsgNetworkConfig() n := MyNet { net: salticidae.NewMsgNetwork(ec, netconfig), @@ -137,31 +136,22 @@ func genMyNet(ec salticidae.EventContext, name string) MyNet { n.net.RegHandler(MSG_OPCODE_ACK, salticidae.MsgNetworkMsgCallback(C.onReceiveAck), nil) n.net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), nil) n.net.RegErrorHandler(salticidae.MsgNetworkErrorCallback(C.errorHandler), nil) + + n.net.Start() + err := salticidae.NewError() + n.net.Listen(myAddr, &err); checkError(&err) + n.net.Connect(otherAddr, &err); checkError(&err) return n } func main() { ec = salticidae.NewEventContext() - err := salticidae.NewError() - - alice_addr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345") - bob_addr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346") - - alice = genMyNet(ec, "Alice") - bob = genMyNet(ec, "Bob") - - alice.net.Start() - bob.net.Start() - alice.net.Listen(alice_addr, &err) - checkError(&err) - bob.net.Listen(bob_addr, &err) - checkError(&err) + aliceAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345") + bobAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346") - alice.net.Connect(bob_addr, &err) - checkError(&err) - bob.net.Connect(alice_addr, &err) - checkError(&err) + alice = genMyNet(ec, "Alice", aliceAddr, bobAddr) + bob = genMyNet(ec, "Bob", bobAddr, aliceAddr) ev_int := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm), nil) ev_int.Add(salticidae.SIGINT) diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go index d6f3b71..251f2e1 100644 --- a/test_p2p_stress/main.go +++ b/test_p2p_stress/main.go @@ -87,17 +87,6 @@ type TestContext struct { ncompleted int } -func (self TestContext) Free() { - if self.timer != nil { - C.free(unsafe.Pointer(self.timer_ctx)) - } -} - -func (self AppContext) Free() { - for _, tc:= range self.tc { - tc.Free() - } -} type AppContext struct { addr salticidae.NetAddr @@ -107,6 +96,14 @@ type AppContext struct { tc map[uint64] *TestContext } +func (self AppContext) Free() { + for _, tc := range self.tc { + if tc.timer != nil { + C.free(unsafe.Pointer(tc.timer_ctx)) + } + } +} + func NewTestContext() TestContext { return TestContext { ncompleted: 0 } } @@ -157,11 +154,9 @@ func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) { //export onReceiveRand func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { msg := salticidae.MsgFromC(salticidae.CMsg(_msg)) - bytes := msgRandUnserialize(msg) - hash := bytes.GetHash() conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)) net := conn.GetNet() - ack := msgAckSerialize(hash) + ack := msgAckSerialize(msgRandUnserialize(msg).GetHash()) net.SendMsgByMove(ack, conn) } @@ -175,7 +170,6 @@ func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userd tc := app.getTC(addr) if !hash.IsEq(tc.hash) { - //fmt.Printf("%s %s\n", hash.GetHex(), tc.hash.GetHex()) panic("corrupted I/O!") } -- cgit v1.2.3