From ea27f0de65f79d7ee56e964f6c966e7c43c66f86 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 28 Jun 2019 15:41:55 -0400 Subject: ... --- netaddr.go | 4 +- network.go | 20 ++++--- salticidae | 2 +- stream.go | 138 ++++++++++++++++++++++++++++++++++++------------ test_msgnet/main.go | 15 +++--- test_msgnet_tls/main.go | 15 +++--- test_p2p_stress/main.go | 43 ++++++++++----- 7 files changed, 163 insertions(+), 74 deletions(-) diff --git a/netaddr.go b/netaddr.go index 070bbe2..1b2eb63 100644 --- a/netaddr.go +++ b/netaddr.go @@ -28,9 +28,9 @@ type netAddrArray struct { type NetAddrArray = *netAddrArray // Create NetAddr from a TCP socket format string (e.g. 127.0.0.1:8888). -func NewAddrFromIPPortString(addr string) (res NetAddr) { +func NewAddrFromIPPortString(addr string, err *Error) (res NetAddr) { c_str := C.CString(addr) - res = &netAddr{ inner: C.netaddr_new_from_sipport(c_str) } + res = &netAddr{ inner: C.netaddr_new_from_sipport(c_str, err) } C.free(rawptr_t(c_str)) runtime.SetFinalizer(res, func(self NetAddr) { self.free() }) return diff --git a/network.go b/network.go index bb2ba4c..86c94dd 100644 --- a/network.go +++ b/network.go @@ -263,8 +263,8 @@ func PeerNetworkConnFromC(ptr CPeerNetworkConn) PeerNetworkConn { } var ( - ID_MODE_IP_BASED = PeerNetworkIdMode(C.ID_MODE_IP_BASED) - ID_MODE_IP_PORT_BASED = PeerNetworkIdMode(C.ID_MODE_IP_PORT_BASED) + ADDR_BASED = PeerNetworkIdMode(C.ID_MODE_ADDR_BASED) + CERT_BASED = PeerNetworkIdMode(C.ID_MODE_CERT_BASED) ) // The identity mode. ID_MODE_IP_BASED: a remote peer is identified by the IP @@ -333,15 +333,19 @@ func (self PeerNetwork) free() { C.peernetwork_free(self.inner) } // Add a peer to the list of known peers. The P2P network will try to keep // bi-direction connections to all known peers in the list (through // reconnection and connection deduplication). -func (self PeerNetwork) AddPeer(paddr NetAddr) { C.peernetwork_add_peer(self.inner, paddr.inner) } +func (self PeerNetwork) AddPeer(addr NetAddr) { C.peernetwork_add_peer(self.inner, addr.inner) } + +// Remove a peer from the list of known peers. The P2P network will teardown +// the existing bi-direction connection and the PeerHandler callback will not be called. +func (self PeerNetwork) DelPeer(addr NetAddr) { C.peernetwork_del_peer(self.inner, addr.inner) } // Test whether a peer is already in the list. -func (self PeerNetwork) HasPeer(paddr NetAddr) bool { return bool(C.peernetwork_has_peer(self.inner, paddr.inner)) } +func (self PeerNetwork) HasPeer(addr NetAddr) bool { return bool(C.peernetwork_has_peer(self.inner, addr.inner)) } // Get the connection of the known peer. The connection handle is returned. The // returned connection handle could be kept in your program. -func (self PeerNetwork) GetPeerConn(paddr NetAddr, err *Error) PeerNetworkConn { - res := PeerNetworkConnFromC(C.peernetwork_get_peer_conn(self.inner, paddr.inner, err)) +func (self PeerNetwork) GetPeerConn(addr NetAddr, err *Error) PeerNetworkConn { + res := PeerNetworkConnFromC(C.peernetwork_get_peer_conn(self.inner, addr.inner, err)) if res != nil { runtime.SetFinalizer(res, func(self PeerNetworkConn) { self.free() }) } @@ -407,8 +411,8 @@ func (self PeerNetwork) SendMsgDeferredByMove(msg Msg, addr NetAddr) { // Send a message to the given list of peers. The payload contained in the // given msg will be moved and sent. Thus, no methods of msg involving the // payload should be called afterwards. -func (self PeerNetwork) MulticastMsgByMove(msg Msg, paddrs []NetAddr) { - na := NewAddrArrayFromAddrs(paddrs) +func (self PeerNetwork) MulticastMsgByMove(msg Msg, addrs []NetAddr) { + na := NewAddrArrayFromAddrs(addrs) C.peernetwork_multicast_msg_by_move(self.inner, msg.inner, na.inner) } diff --git a/salticidae b/salticidae index d28bf1b..710e9e8 160000 --- a/salticidae +++ b/salticidae @@ -1 +1 @@ -Subproject commit d28bf1b0c8baec3c5ab40cfb988ff974f98da439 +Subproject commit 710e9e8961cf5039b425e66d2042942b7e4af0c8 diff --git a/stream.go b/stream.go index bbc7680..1cbe8ba 100644 --- a/stream.go +++ b/stream.go @@ -2,19 +2,45 @@ package salticidae // #include // #include "salticidae/stream.h" +// uint16_t _salti_htole16(uint16_t x) { return htole16(x); } +// uint32_t _salti_htole32(uint32_t x) { return htole32(x); } +// uint64_t _salti_htole64(uint64_t x) { return htole64(x); } +// +// uint16_t _salti_letoh16(uint16_t x) { return le16toh(x); } +// uint32_t _salti_letoh32(uint32_t x) { return le32toh(x); } +// uint64_t _salti_letoh64(uint64_t x) { return le64toh(x); } +// +// uint16_t _salti_htobe16(uint16_t x) { return htobe16(x); } +// uint32_t _salti_htobe32(uint32_t x) { return htobe32(x); } +// uint64_t _salti_htobe64(uint64_t x) { return htobe64(x); } +// +// uint16_t _salti_betoh16(uint16_t x) { return be16toh(x); } +// uint32_t _salti_betoh32(uint32_t x) { return be32toh(x); } +// uint64_t _salti_betoh64(uint64_t x) { return be64toh(x); } +// import "C" import "runtime" -type byteArray struct { - inner *C.bytearray_t -} +type CByteArray = *C.bytearray_t +// The C pointer to a ByteArray object. +type byteArray struct { inner CByteArray } // Array of binary data. type ByteArray = *byteArray +func ByteArrayFromC(ptr CByteArray) ByteArray { + return &byteArray{ inner: ptr } +} + +func byteArraySetFinalizer(res ByteArray) { + if res != nil { + runtime.SetFinalizer(res, func(self ByteArray) { self.free() }) + } +} + // Create an empty byte array (with zero contained bytes). func NewByteArray() ByteArray { - res := &byteArray{ inner: C.bytearray_new() } - runtime.SetFinalizer(res, func(self ByteArray) { self.free() }) + res := ByteArrayFromC(C.bytearray_new()) + byteArraySetFinalizer(res) return res } @@ -23,19 +49,39 @@ func (self ByteArray) free() { C.bytearray_free(self.inner) } // Create a byte array by taking out all data from src. Notice this is a // zero-copy operation that consumes and invalidates the data in src ("move" // semantics) so that no more operation should be done to src after this -// function call. -func NewByteArrayMovedFromDataStream(src DataStream) ByteArray { - res := &byteArray{ inner: C.bytearray_new_moved_from_datastream(src.inner) } - runtime.SetFinalizer(res, func(self ByteArray) { self.free() }) - return res +// function call. Also notice unlike copying, the entire DataStream buffer is +// moved (including the possibily consumed part). +func NewByteArrayMovedFromDataStream(src DataStream) (res ByteArray) { + res = ByteArrayFromC(C.bytearray_new_moved_from_datastream(src.inner)) + byteArraySetFinalizer(res) + return +} + +// Create a byte array by copying the remaining data from src. +func NewByteArrayCopiedFromDataStream(src DataStream) (res ByteArray) { + res = ByteArrayFromC(C.bytearray_new_copied_from_datastream(src.inner)) + byteArraySetFinalizer(res) + return } -func NewByteArrayFromHex(hex string) ByteArray { +func NewByteArrayFromHex(hex string) (res ByteArray) { c_str := C.CString(hex) - res := &byteArray{ inner: C.bytearray_new_from_hex(c_str) } + res = ByteArrayFromC(C.bytearray_new_from_hex(c_str)) C.free(rawptr_t(c_str)) - runtime.SetFinalizer(res, func(self ByteArray) { self.free() }) - return res + byteArraySetFinalizer(res) + return +} + +func NewByteArrayFromBytes(bytes []byte) (res ByteArray) { + size := len(bytes) + if size > 0 { + base := (*C.uint8_t)(&bytes[0]) + res = ByteArrayFromC(C.bytearray_new_from_bytes(base, C.size_t(size))) + } else { + res = ByteArrayFromC(C.bytearray_new()) + } + byteArraySetFinalizer(res) + return } // The C pointer to a DataStream object. @@ -55,17 +101,17 @@ func DataStreamFromC(ptr CDataStream) DataStream { } } -func dataStreamSetFinalizer(res DataStream) DataStream { +func dataStreamSetFinalizer(res DataStream) { if res != nil { runtime.SetFinalizer(res, func(self DataStream) { self.free() }) } - return res } // Create an empty DataStream. func NewDataStream() DataStream { res := DataStreamFromC(C.datastream_new()) - return dataStreamSetFinalizer(res) + dataStreamSetFinalizer(res) + return res } // Create a DataStream with data copied from bytes. @@ -77,7 +123,8 @@ func NewDataStreamFromBytes(bytes []byte) (res DataStream) { } else { res = DataStreamFromC(C.datastream_new()) } - return dataStreamSetFinalizer(res) + dataStreamSetFinalizer(res) + return } // Create a DataStream with content moved from a ByteArray. @@ -100,9 +147,10 @@ func (self DataStream) attach(ptr rawptr_t, obj interface{}) { self.attached[uin func (self DataStream) detach(ptr rawptr_t) { delete(self.attached, uintptr(ptr)) } // Make a copy of the object. -func (self DataStream) Copy() DataStream { - res := DataStreamFromC(C.datastream_copy(self.inner)) - return dataStreamSetFinalizer(res) +func (self DataStream) Copy() (res DataStream) { + res = DataStreamFromC(C.datastream_copy(self.inner)) + dataStreamSetFinalizer(res) + return } // TODO: datastream_data @@ -157,6 +205,23 @@ func (self DataStream) GetI32(succ *bool) int32 { return int32(C.datastream_get_ // Parse an int64 integer by consuming the stream (no byte order conversion). func (self DataStream) GetI64(succ *bool) int64 { return int64(C.datastream_get_i64(self.inner, (*C.bool)(succ))) } +func ToLittleEndianU16(x uint16) uint16 { return uint16(C._salti_htole16(C.uint16_t(x))); } +func ToLittleEndianU32(x uint32) uint32 { return uint32(C._salti_htole32(C.uint32_t(x))); } +func ToLittleEndianU64(x uint64) uint64 { return uint64(C._salti_htole64(C.uint64_t(x))); } + +func FromLittleEndianU16(x uint16) uint16 { return uint16(C._salti_letoh16(C.uint16_t(x))); } +func FromLittleEndianU32(x uint32) uint32 { return uint32(C._salti_letoh32(C.uint32_t(x))); } +func FromLittleEndianU64(x uint64) uint64 { return uint64(C._salti_letoh64(C.uint64_t(x))); } + +func ToBigEndianU16(x uint16) uint16 { return uint16(C._salti_htobe16(C.uint16_t(x))); } +func ToBigEndianU32(x uint32) uint32 { return uint32(C._salti_htobe32(C.uint32_t(x))); } +func ToBigEndianU64(x uint64) uint64 { return uint64(C._salti_htobe64(C.uint64_t(x))); } + +func FromBigEndianU16(x uint16) uint16 { return uint16(C._salti_betoh16(C.uint16_t(x))); } +func FromBigEndianU32(x uint32) uint32 { return uint32(C._salti_betoh32(C.uint32_t(x))); } +func FromBigEndianU64(x uint64) uint64 { return uint64(C._salti_betoh64(C.uint64_t(x))); } + + // The handle returned by GetDataInPlace. The Go slice returned by Get() is // valid only during the lifetime of the handle. type dataStreamBytes struct { @@ -182,30 +247,35 @@ func (self DataStream) GetDataInPlace(length int) DataStreamBytes { return res } -type uint256 struct { - inner *C.uint256_t -} - +// The C pointer to a UInt256 object. +type CUInt256 = *C.uint256_t +type uint256 struct { inner CUInt256 } // 256-bit integer. type UInt256 = *uint256 -// Create a 256-bit integer. -func NewUInt256() UInt256 { - res := &uint256{ inner: C.uint256_new() } - if res != nil { - runtime.SetFinalizer(res, func(self UInt256) { self.free() }) - } - return res +func UInt256FromC(ptr CUInt256) UInt256 { + return &uint256{ inner: ptr } } -func NewUInt256FromByteArray(bytes ByteArray) UInt256 { - res := &uint256{ inner: C.uint256_new_from_bytearray(bytes.inner) } +func uint256SetFinalizer(res UInt256) { if res != nil { runtime.SetFinalizer(res, func(self UInt256) { self.free() }) } +} + +// Create a 256-bit integer. +func NewUInt256() UInt256 { + res := &uint256{ inner: C.uint256_new() } + uint256SetFinalizer(res) return res } +func NewUInt256FromByteArray(bytes ByteArray) (res UInt256) { + res = &uint256{ inner: C.uint256_new_from_bytearray(bytes.inner) } + uint256SetFinalizer(res) + return +} + func (self UInt256) free() { C.uint256_free(self.inner) } func (self UInt256) IsNull() bool { return bool(C.uint256_is_null(self.inner)) } diff --git a/test_msgnet/main.go b/test_msgnet/main.go index cad1697..292fb8f 100644 --- a/test_msgnet/main.go +++ b/test_msgnet/main.go @@ -11,7 +11,6 @@ package main import "C" import ( - "encoding/binary" "os" "fmt" "unsafe" @@ -25,9 +24,7 @@ const ( func msgHelloSerialize(name string, text string) salticidae.Msg { serialized := salticidae.NewDataStream() - t := make([]byte, 4) - binary.LittleEndian.PutUint32(t, uint32(len(name))) - serialized.PutData(t) + serialized.PutU32(salticidae.ToLittleEndianU32(uint32(len(name)))) serialized.PutData([]byte(name)) serialized.PutData([]byte(text)) return salticidae.NewMsgMovedFromByteArray( @@ -36,8 +33,9 @@ func msgHelloSerialize(name string, text string) salticidae.Msg { func msgHelloUnserialize(msg salticidae.Msg) (name string, text string) { p := msg.GetPayloadByMove() - t := p.GetDataInPlace(4); length := binary.LittleEndian.Uint32(t.Get()); t.Release() - t = p.GetDataInPlace(int(length)); name = string(t.Get()); t.Release() + succ := true + length := salticidae.FromLittleEndianU32(p.GetU32(&succ)) + t := p.GetDataInPlace(int(length)); name = string(t.Get()); t.Release() t = p.GetDataInPlace(p.Size()); text = string(t.Get()); t.Release() return } @@ -139,9 +137,10 @@ func genMyNet(ec salticidae.EventContext, func main() { ec = salticidae.NewEventContext() + err := salticidae.NewError() - aliceAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345") - bobAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346") + aliceAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345", &err) + bobAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346", &err) alice = genMyNet(ec, "alice", aliceAddr, bobAddr) bob = genMyNet(ec, "bob", bobAddr, aliceAddr) diff --git a/test_msgnet_tls/main.go b/test_msgnet_tls/main.go index b14cedd..1d613fd 100644 --- a/test_msgnet_tls/main.go +++ b/test_msgnet_tls/main.go @@ -11,7 +11,6 @@ package main import "C" import ( - "encoding/binary" "os" "fmt" "unsafe" @@ -25,9 +24,7 @@ const ( func msgHelloSerialize(name string, text string) salticidae.Msg { serialized := salticidae.NewDataStream() - t := make([]byte, 4) - binary.LittleEndian.PutUint32(t, uint32(len(name))) - serialized.PutData(t) + serialized.PutU32(salticidae.ToLittleEndianU32(uint32(len(name)))) serialized.PutData([]byte(name)) serialized.PutData([]byte(text)) return salticidae.NewMsgMovedFromByteArray( @@ -36,8 +33,9 @@ func msgHelloSerialize(name string, text string) salticidae.Msg { func msgHelloUnserialize(msg salticidae.Msg) (name string, text string) { p := msg.GetPayloadByMove() - t := p.GetDataInPlace(4); length := binary.LittleEndian.Uint32(t.Get()); t.Release() - t = p.GetDataInPlace(int(length)); name = string(t.Get()); t.Release() + succ := true + length := salticidae.FromLittleEndianU32(p.GetU32(&succ)) + t := p.GetDataInPlace(int(length)); name = string(t.Get()); t.Release() t = p.GetDataInPlace(p.Size()); text = string(t.Get()); t.Release() return } @@ -151,9 +149,10 @@ func genMyNet(ec salticidae.EventContext, func main() { ec = salticidae.NewEventContext() + err := salticidae.NewError() - aliceAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345") - bobAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346") + aliceAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345", &err) + bobAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346", &err) alice = genMyNet(ec, "alice", "ed5a9a8c7429dcb235a88244bc69d43d16b35008ce49736b27aaa3042a674043", aliceAddr, bobAddr) bob = genMyNet(ec, "bob", "ef3bea4e72f4d0e85da7643545312e2ff6dded5e176560bdffb1e53b1cef4896", bobAddr, aliceAddr) diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go index 69f9d6c..06a1407 100644 --- a/test_p2p_stress/main.go +++ b/test_p2p_stress/main.go @@ -3,6 +3,7 @@ package main // #cgo CFLAGS: -I${SRCDIR}/../salticidae/include/ // #cgo LDFLAGS: ${SRCDIR}/../salticidae/libsalticidae.so -Wl,-rpath=${SRCDIR}/salticidae/ // #include +// #include // #include // #include "salticidae/network.h" // void onTerm(int sig, void *); @@ -40,36 +41,45 @@ const ( MSG_OPCODE_ACK ) -func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) { +func msgRandSerialize(view uint32, size int) (salticidae.Msg, salticidae.UInt256) { + serialized := salticidae.NewDataStream() + serialized.PutU32(salticidae.ToLittleEndianU32(view)) buffer := make([]byte, size) _, err := rand.Read(buffer) if err != nil { panic("rand source failed") } - serialized := salticidae.NewDataStreamFromBytes(buffer) - hash := serialized.GetHash() + serialized.PutData(buffer) + hash := salticidae.NewByteArrayFromBytes(buffer).GetHash() return salticidae.NewMsgMovedFromByteArray( MSG_OPCODE_RAND, salticidae.NewByteArrayMovedFromDataStream(serialized)), hash } -func msgRandUnserialize(msg salticidae.Msg) salticidae.DataStream { - return msg.GetPayloadByMove() +func msgRandUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) { + d := msg.GetPayloadByMove() + succ := true + view = salticidae.FromLittleEndianU32(d.GetU32(&succ)) + hash = salticidae.NewByteArrayCopiedFromDataStream(d).GetHash() + return } -func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg { +func msgAckSerialize(view uint32, hash salticidae.UInt256) salticidae.Msg { serialized := salticidae.NewDataStream() + serialized.PutU32(salticidae.ToLittleEndianU32(view)) hash.Serialize(serialized) return salticidae.NewMsgMovedFromByteArray( MSG_OPCODE_ACK, salticidae.NewByteArrayMovedFromDataStream(serialized)) } -func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 { +func msgAckUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) { p := msg.GetPayloadByMove() - hash := salticidae.NewUInt256() + hash = salticidae.NewUInt256() + succ := true + view = salticidae.FromLittleEndianU32(p.GetU32(&succ)) hash.Unserialize(p) - return hash + return } func checkError(err *salticidae.Error) { @@ -83,6 +93,7 @@ type TestContext struct { timer salticidae.TimerEvent timer_ctx *C.struct_timeout_callback_context_t state int + view uint32 hash salticidae.UInt256 ncompleted int } @@ -123,8 +134,8 @@ func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) { } func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) { - msg, hash := msgRandSerialize(size) tc := app.getTC(addr2id(conn.GetAddr())) + msg, hash := msgRandSerialize(tc.view, size) tc.hash = hash app.net.AsMsgNetwork().SendMsg(msg, conn) } @@ -156,19 +167,25 @@ func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, user msg := salticidae.MsgFromC(salticidae.CMsg(_msg)) conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)) net := conn.GetNet() - ack := msgAckSerialize(msgRandUnserialize(msg).GetHash()) + view, hash := msgRandUnserialize(msg) + ack := msgAckSerialize(view, hash) net.SendMsg(ack, conn) } //export onReceiveAck func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) { - hash := msgAckUnserialize(salticidae.MsgFromC(salticidae.CMsg(_msg))) + view, hash := msgAckUnserialize(salticidae.MsgFromC(salticidae.CMsg(_msg))) id := *(* int)(userdata) app := &apps[id] conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)) addr := addr2id(conn.GetAddr()) tc := app.getTC(addr) + if view != tc.view { + fmt.Printf("dropping stale MsgAck") + return + } + if !hash.IsEq(tc.hash) { panic("corrupted I/O!") } @@ -245,7 +262,7 @@ func main() { var addrs []salticidae.NetAddr for i := 0; i < 5; i++ { addrs = append(addrs, - salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i))) + salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i), &err)) } netconfig := salticidae.NewPeerNetworkConfig() nc := netconfig.AsMsgNetworkConfig() -- cgit v1.2.3