aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--netaddr.go4
-rw-r--r--network.go20
m---------salticidae0
-rw-r--r--stream.go138
-rw-r--r--test_msgnet/main.go15
-rw-r--r--test_msgnet_tls/main.go15
-rw-r--r--test_p2p_stress/main.go43
7 files changed, 162 insertions, 73 deletions
diff --git a/netaddr.go b/netaddr.go
index 070bbe2..1b2eb63 100644
--- a/netaddr.go
+++ b/netaddr.go
@@ -28,9 +28,9 @@ type netAddrArray struct {
type NetAddrArray = *netAddrArray
// Create NetAddr from a TCP socket format string (e.g. 127.0.0.1:8888).
-func NewAddrFromIPPortString(addr string) (res NetAddr) {
+func NewAddrFromIPPortString(addr string, err *Error) (res NetAddr) {
c_str := C.CString(addr)
- res = &netAddr{ inner: C.netaddr_new_from_sipport(c_str) }
+ res = &netAddr{ inner: C.netaddr_new_from_sipport(c_str, err) }
C.free(rawptr_t(c_str))
runtime.SetFinalizer(res, func(self NetAddr) { self.free() })
return
diff --git a/network.go b/network.go
index bb2ba4c..86c94dd 100644
--- a/network.go
+++ b/network.go
@@ -263,8 +263,8 @@ func PeerNetworkConnFromC(ptr CPeerNetworkConn) PeerNetworkConn {
}
var (
- ID_MODE_IP_BASED = PeerNetworkIdMode(C.ID_MODE_IP_BASED)
- ID_MODE_IP_PORT_BASED = PeerNetworkIdMode(C.ID_MODE_IP_PORT_BASED)
+ ADDR_BASED = PeerNetworkIdMode(C.ID_MODE_ADDR_BASED)
+ CERT_BASED = PeerNetworkIdMode(C.ID_MODE_CERT_BASED)
)
// The identity mode. ID_MODE_IP_BASED: a remote peer is identified by the IP
@@ -333,15 +333,19 @@ func (self PeerNetwork) free() { C.peernetwork_free(self.inner) }
// Add a peer to the list of known peers. The P2P network will try to keep
// bi-direction connections to all known peers in the list (through
// reconnection and connection deduplication).
-func (self PeerNetwork) AddPeer(paddr NetAddr) { C.peernetwork_add_peer(self.inner, paddr.inner) }
+func (self PeerNetwork) AddPeer(addr NetAddr) { C.peernetwork_add_peer(self.inner, addr.inner) }
+
+// Remove a peer from the list of known peers. The P2P network will teardown
+// the existing bi-direction connection and the PeerHandler callback will not be called.
+func (self PeerNetwork) DelPeer(addr NetAddr) { C.peernetwork_del_peer(self.inner, addr.inner) }
// Test whether a peer is already in the list.
-func (self PeerNetwork) HasPeer(paddr NetAddr) bool { return bool(C.peernetwork_has_peer(self.inner, paddr.inner)) }
+func (self PeerNetwork) HasPeer(addr NetAddr) bool { return bool(C.peernetwork_has_peer(self.inner, addr.inner)) }
// Get the connection of the known peer. The connection handle is returned. The
// returned connection handle could be kept in your program.
-func (self PeerNetwork) GetPeerConn(paddr NetAddr, err *Error) PeerNetworkConn {
- res := PeerNetworkConnFromC(C.peernetwork_get_peer_conn(self.inner, paddr.inner, err))
+func (self PeerNetwork) GetPeerConn(addr NetAddr, err *Error) PeerNetworkConn {
+ res := PeerNetworkConnFromC(C.peernetwork_get_peer_conn(self.inner, addr.inner, err))
if res != nil {
runtime.SetFinalizer(res, func(self PeerNetworkConn) { self.free() })
}
@@ -407,8 +411,8 @@ func (self PeerNetwork) SendMsgDeferredByMove(msg Msg, addr NetAddr) {
// Send a message to the given list of peers. The payload contained in the
// given msg will be moved and sent. Thus, no methods of msg involving the
// payload should be called afterwards.
-func (self PeerNetwork) MulticastMsgByMove(msg Msg, paddrs []NetAddr) {
- na := NewAddrArrayFromAddrs(paddrs)
+func (self PeerNetwork) MulticastMsgByMove(msg Msg, addrs []NetAddr) {
+ na := NewAddrArrayFromAddrs(addrs)
C.peernetwork_multicast_msg_by_move(self.inner, msg.inner, na.inner)
}
diff --git a/salticidae b/salticidae
-Subproject d28bf1b0c8baec3c5ab40cfb988ff974f98da43
+Subproject 710e9e8961cf5039b425e66d2042942b7e4af0c
diff --git a/stream.go b/stream.go
index bbc7680..1cbe8ba 100644
--- a/stream.go
+++ b/stream.go
@@ -2,19 +2,45 @@ package salticidae
// #include <stdlib.h>
// #include "salticidae/stream.h"
+// uint16_t _salti_htole16(uint16_t x) { return htole16(x); }
+// uint32_t _salti_htole32(uint32_t x) { return htole32(x); }
+// uint64_t _salti_htole64(uint64_t x) { return htole64(x); }
+//
+// uint16_t _salti_letoh16(uint16_t x) { return le16toh(x); }
+// uint32_t _salti_letoh32(uint32_t x) { return le32toh(x); }
+// uint64_t _salti_letoh64(uint64_t x) { return le64toh(x); }
+//
+// uint16_t _salti_htobe16(uint16_t x) { return htobe16(x); }
+// uint32_t _salti_htobe32(uint32_t x) { return htobe32(x); }
+// uint64_t _salti_htobe64(uint64_t x) { return htobe64(x); }
+//
+// uint16_t _salti_betoh16(uint16_t x) { return be16toh(x); }
+// uint32_t _salti_betoh32(uint32_t x) { return be32toh(x); }
+// uint64_t _salti_betoh64(uint64_t x) { return be64toh(x); }
+//
import "C"
import "runtime"
-type byteArray struct {
- inner *C.bytearray_t
-}
+type CByteArray = *C.bytearray_t
+// The C pointer to a ByteArray object.
+type byteArray struct { inner CByteArray }
// Array of binary data.
type ByteArray = *byteArray
+func ByteArrayFromC(ptr CByteArray) ByteArray {
+ return &byteArray{ inner: ptr }
+}
+
+func byteArraySetFinalizer(res ByteArray) {
+ if res != nil {
+ runtime.SetFinalizer(res, func(self ByteArray) { self.free() })
+ }
+}
+
// Create an empty byte array (with zero contained bytes).
func NewByteArray() ByteArray {
- res := &byteArray{ inner: C.bytearray_new() }
- runtime.SetFinalizer(res, func(self ByteArray) { self.free() })
+ res := ByteArrayFromC(C.bytearray_new())
+ byteArraySetFinalizer(res)
return res
}
@@ -23,19 +49,39 @@ func (self ByteArray) free() { C.bytearray_free(self.inner) }
// Create a byte array by taking out all data from src. Notice this is a
// zero-copy operation that consumes and invalidates the data in src ("move"
// semantics) so that no more operation should be done to src after this
-// function call.
-func NewByteArrayMovedFromDataStream(src DataStream) ByteArray {
- res := &byteArray{ inner: C.bytearray_new_moved_from_datastream(src.inner) }
- runtime.SetFinalizer(res, func(self ByteArray) { self.free() })
- return res
+// function call. Also notice unlike copying, the entire DataStream buffer is
+// moved (including the possibily consumed part).
+func NewByteArrayMovedFromDataStream(src DataStream) (res ByteArray) {
+ res = ByteArrayFromC(C.bytearray_new_moved_from_datastream(src.inner))
+ byteArraySetFinalizer(res)
+ return
+}
+
+// Create a byte array by copying the remaining data from src.
+func NewByteArrayCopiedFromDataStream(src DataStream) (res ByteArray) {
+ res = ByteArrayFromC(C.bytearray_new_copied_from_datastream(src.inner))
+ byteArraySetFinalizer(res)
+ return
}
-func NewByteArrayFromHex(hex string) ByteArray {
+func NewByteArrayFromHex(hex string) (res ByteArray) {
c_str := C.CString(hex)
- res := &byteArray{ inner: C.bytearray_new_from_hex(c_str) }
+ res = ByteArrayFromC(C.bytearray_new_from_hex(c_str))
C.free(rawptr_t(c_str))
- runtime.SetFinalizer(res, func(self ByteArray) { self.free() })
- return res
+ byteArraySetFinalizer(res)
+ return
+}
+
+func NewByteArrayFromBytes(bytes []byte) (res ByteArray) {
+ size := len(bytes)
+ if size > 0 {
+ base := (*C.uint8_t)(&bytes[0])
+ res = ByteArrayFromC(C.bytearray_new_from_bytes(base, C.size_t(size)))
+ } else {
+ res = ByteArrayFromC(C.bytearray_new())
+ }
+ byteArraySetFinalizer(res)
+ return
}
// The C pointer to a DataStream object.
@@ -55,17 +101,17 @@ func DataStreamFromC(ptr CDataStream) DataStream {
}
}
-func dataStreamSetFinalizer(res DataStream) DataStream {
+func dataStreamSetFinalizer(res DataStream) {
if res != nil {
runtime.SetFinalizer(res, func(self DataStream) { self.free() })
}
- return res
}
// Create an empty DataStream.
func NewDataStream() DataStream {
res := DataStreamFromC(C.datastream_new())
- return dataStreamSetFinalizer(res)
+ dataStreamSetFinalizer(res)
+ return res
}
// Create a DataStream with data copied from bytes.
@@ -77,7 +123,8 @@ func NewDataStreamFromBytes(bytes []byte) (res DataStream) {
} else {
res = DataStreamFromC(C.datastream_new())
}
- return dataStreamSetFinalizer(res)
+ dataStreamSetFinalizer(res)
+ return
}
// Create a DataStream with content moved from a ByteArray.
@@ -100,9 +147,10 @@ func (self DataStream) attach(ptr rawptr_t, obj interface{}) { self.attached[uin
func (self DataStream) detach(ptr rawptr_t) { delete(self.attached, uintptr(ptr)) }
// Make a copy of the object.
-func (self DataStream) Copy() DataStream {
- res := DataStreamFromC(C.datastream_copy(self.inner))
- return dataStreamSetFinalizer(res)
+func (self DataStream) Copy() (res DataStream) {
+ res = DataStreamFromC(C.datastream_copy(self.inner))
+ dataStreamSetFinalizer(res)
+ return
}
// TODO: datastream_data
@@ -157,6 +205,23 @@ func (self DataStream) GetI32(succ *bool) int32 { return int32(C.datastream_get_
// Parse an int64 integer by consuming the stream (no byte order conversion).
func (self DataStream) GetI64(succ *bool) int64 { return int64(C.datastream_get_i64(self.inner, (*C.bool)(succ))) }
+func ToLittleEndianU16(x uint16) uint16 { return uint16(C._salti_htole16(C.uint16_t(x))); }
+func ToLittleEndianU32(x uint32) uint32 { return uint32(C._salti_htole32(C.uint32_t(x))); }
+func ToLittleEndianU64(x uint64) uint64 { return uint64(C._salti_htole64(C.uint64_t(x))); }
+
+func FromLittleEndianU16(x uint16) uint16 { return uint16(C._salti_letoh16(C.uint16_t(x))); }
+func FromLittleEndianU32(x uint32) uint32 { return uint32(C._salti_letoh32(C.uint32_t(x))); }
+func FromLittleEndianU64(x uint64) uint64 { return uint64(C._salti_letoh64(C.uint64_t(x))); }
+
+func ToBigEndianU16(x uint16) uint16 { return uint16(C._salti_htobe16(C.uint16_t(x))); }
+func ToBigEndianU32(x uint32) uint32 { return uint32(C._salti_htobe32(C.uint32_t(x))); }
+func ToBigEndianU64(x uint64) uint64 { return uint64(C._salti_htobe64(C.uint64_t(x))); }
+
+func FromBigEndianU16(x uint16) uint16 { return uint16(C._salti_betoh16(C.uint16_t(x))); }
+func FromBigEndianU32(x uint32) uint32 { return uint32(C._salti_betoh32(C.uint32_t(x))); }
+func FromBigEndianU64(x uint64) uint64 { return uint64(C._salti_betoh64(C.uint64_t(x))); }
+
+
// The handle returned by GetDataInPlace. The Go slice returned by Get() is
// valid only during the lifetime of the handle.
type dataStreamBytes struct {
@@ -182,30 +247,35 @@ func (self DataStream) GetDataInPlace(length int) DataStreamBytes {
return res
}
-type uint256 struct {
- inner *C.uint256_t
-}
-
+// The C pointer to a UInt256 object.
+type CUInt256 = *C.uint256_t
+type uint256 struct { inner CUInt256 }
// 256-bit integer.
type UInt256 = *uint256
-// Create a 256-bit integer.
-func NewUInt256() UInt256 {
- res := &uint256{ inner: C.uint256_new() }
- if res != nil {
- runtime.SetFinalizer(res, func(self UInt256) { self.free() })
- }
- return res
+func UInt256FromC(ptr CUInt256) UInt256 {
+ return &uint256{ inner: ptr }
}
-func NewUInt256FromByteArray(bytes ByteArray) UInt256 {
- res := &uint256{ inner: C.uint256_new_from_bytearray(bytes.inner) }
+func uint256SetFinalizer(res UInt256) {
if res != nil {
runtime.SetFinalizer(res, func(self UInt256) { self.free() })
}
+}
+
+// Create a 256-bit integer.
+func NewUInt256() UInt256 {
+ res := &uint256{ inner: C.uint256_new() }
+ uint256SetFinalizer(res)
return res
}
+func NewUInt256FromByteArray(bytes ByteArray) (res UInt256) {
+ res = &uint256{ inner: C.uint256_new_from_bytearray(bytes.inner) }
+ uint256SetFinalizer(res)
+ return
+}
+
func (self UInt256) free() { C.uint256_free(self.inner) }
func (self UInt256) IsNull() bool { return bool(C.uint256_is_null(self.inner)) }
diff --git a/test_msgnet/main.go b/test_msgnet/main.go
index cad1697..292fb8f 100644
--- a/test_msgnet/main.go
+++ b/test_msgnet/main.go
@@ -11,7 +11,6 @@ package main
import "C"
import (
- "encoding/binary"
"os"
"fmt"
"unsafe"
@@ -25,9 +24,7 @@ const (
func msgHelloSerialize(name string, text string) salticidae.Msg {
serialized := salticidae.NewDataStream()
- t := make([]byte, 4)
- binary.LittleEndian.PutUint32(t, uint32(len(name)))
- serialized.PutData(t)
+ serialized.PutU32(salticidae.ToLittleEndianU32(uint32(len(name))))
serialized.PutData([]byte(name))
serialized.PutData([]byte(text))
return salticidae.NewMsgMovedFromByteArray(
@@ -36,8 +33,9 @@ func msgHelloSerialize(name string, text string) salticidae.Msg {
func msgHelloUnserialize(msg salticidae.Msg) (name string, text string) {
p := msg.GetPayloadByMove()
- t := p.GetDataInPlace(4); length := binary.LittleEndian.Uint32(t.Get()); t.Release()
- t = p.GetDataInPlace(int(length)); name = string(t.Get()); t.Release()
+ succ := true
+ length := salticidae.FromLittleEndianU32(p.GetU32(&succ))
+ t := p.GetDataInPlace(int(length)); name = string(t.Get()); t.Release()
t = p.GetDataInPlace(p.Size()); text = string(t.Get()); t.Release()
return
}
@@ -139,9 +137,10 @@ func genMyNet(ec salticidae.EventContext,
func main() {
ec = salticidae.NewEventContext()
+ err := salticidae.NewError()
- aliceAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345")
- bobAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346")
+ aliceAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345", &err)
+ bobAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346", &err)
alice = genMyNet(ec, "alice", aliceAddr, bobAddr)
bob = genMyNet(ec, "bob", bobAddr, aliceAddr)
diff --git a/test_msgnet_tls/main.go b/test_msgnet_tls/main.go
index b14cedd..1d613fd 100644
--- a/test_msgnet_tls/main.go
+++ b/test_msgnet_tls/main.go
@@ -11,7 +11,6 @@ package main
import "C"
import (
- "encoding/binary"
"os"
"fmt"
"unsafe"
@@ -25,9 +24,7 @@ const (
func msgHelloSerialize(name string, text string) salticidae.Msg {
serialized := salticidae.NewDataStream()
- t := make([]byte, 4)
- binary.LittleEndian.PutUint32(t, uint32(len(name)))
- serialized.PutData(t)
+ serialized.PutU32(salticidae.ToLittleEndianU32(uint32(len(name))))
serialized.PutData([]byte(name))
serialized.PutData([]byte(text))
return salticidae.NewMsgMovedFromByteArray(
@@ -36,8 +33,9 @@ func msgHelloSerialize(name string, text string) salticidae.Msg {
func msgHelloUnserialize(msg salticidae.Msg) (name string, text string) {
p := msg.GetPayloadByMove()
- t := p.GetDataInPlace(4); length := binary.LittleEndian.Uint32(t.Get()); t.Release()
- t = p.GetDataInPlace(int(length)); name = string(t.Get()); t.Release()
+ succ := true
+ length := salticidae.FromLittleEndianU32(p.GetU32(&succ))
+ t := p.GetDataInPlace(int(length)); name = string(t.Get()); t.Release()
t = p.GetDataInPlace(p.Size()); text = string(t.Get()); t.Release()
return
}
@@ -151,9 +149,10 @@ func genMyNet(ec salticidae.EventContext,
func main() {
ec = salticidae.NewEventContext()
+ err := salticidae.NewError()
- aliceAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345")
- bobAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346")
+ aliceAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12345", &err)
+ bobAddr := salticidae.NewAddrFromIPPortString("127.0.0.1:12346", &err)
alice = genMyNet(ec, "alice", "ed5a9a8c7429dcb235a88244bc69d43d16b35008ce49736b27aaa3042a674043", aliceAddr, bobAddr)
bob = genMyNet(ec, "bob", "ef3bea4e72f4d0e85da7643545312e2ff6dded5e176560bdffb1e53b1cef4896", bobAddr, aliceAddr)
diff --git a/test_p2p_stress/main.go b/test_p2p_stress/main.go
index 69f9d6c..06a1407 100644
--- a/test_p2p_stress/main.go
+++ b/test_p2p_stress/main.go
@@ -3,6 +3,7 @@ package main
// #cgo CFLAGS: -I${SRCDIR}/../salticidae/include/
// #cgo LDFLAGS: ${SRCDIR}/../salticidae/libsalticidae.so -Wl,-rpath=${SRCDIR}/salticidae/
// #include <stdlib.h>
+// #include <stdint.h>
// #include <arpa/inet.h>
// #include "salticidae/network.h"
// void onTerm(int sig, void *);
@@ -40,36 +41,45 @@ const (
MSG_OPCODE_ACK
)
-func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) {
+func msgRandSerialize(view uint32, size int) (salticidae.Msg, salticidae.UInt256) {
+ serialized := salticidae.NewDataStream()
+ serialized.PutU32(salticidae.ToLittleEndianU32(view))
buffer := make([]byte, size)
_, err := rand.Read(buffer)
if err != nil {
panic("rand source failed")
}
- serialized := salticidae.NewDataStreamFromBytes(buffer)
- hash := serialized.GetHash()
+ serialized.PutData(buffer)
+ hash := salticidae.NewByteArrayFromBytes(buffer).GetHash()
return salticidae.NewMsgMovedFromByteArray(
MSG_OPCODE_RAND,
salticidae.NewByteArrayMovedFromDataStream(serialized)), hash
}
-func msgRandUnserialize(msg salticidae.Msg) salticidae.DataStream {
- return msg.GetPayloadByMove()
+func msgRandUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) {
+ d := msg.GetPayloadByMove()
+ succ := true
+ view = salticidae.FromLittleEndianU32(d.GetU32(&succ))
+ hash = salticidae.NewByteArrayCopiedFromDataStream(d).GetHash()
+ return
}
-func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg {
+func msgAckSerialize(view uint32, hash salticidae.UInt256) salticidae.Msg {
serialized := salticidae.NewDataStream()
+ serialized.PutU32(salticidae.ToLittleEndianU32(view))
hash.Serialize(serialized)
return salticidae.NewMsgMovedFromByteArray(
MSG_OPCODE_ACK,
salticidae.NewByteArrayMovedFromDataStream(serialized))
}
-func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 {
+func msgAckUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) {
p := msg.GetPayloadByMove()
- hash := salticidae.NewUInt256()
+ hash = salticidae.NewUInt256()
+ succ := true
+ view = salticidae.FromLittleEndianU32(p.GetU32(&succ))
hash.Unserialize(p)
- return hash
+ return
}
func checkError(err *salticidae.Error) {
@@ -83,6 +93,7 @@ type TestContext struct {
timer salticidae.TimerEvent
timer_ctx *C.struct_timeout_callback_context_t
state int
+ view uint32
hash salticidae.UInt256
ncompleted int
}
@@ -123,8 +134,8 @@ func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) {
}
func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) {
- msg, hash := msgRandSerialize(size)
tc := app.getTC(addr2id(conn.GetAddr()))
+ msg, hash := msgRandSerialize(tc.view, size)
tc.hash = hash
app.net.AsMsgNetwork().SendMsg(msg, conn)
}
@@ -156,19 +167,25 @@ func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, user
msg := salticidae.MsgFromC(salticidae.CMsg(_msg))
conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
net := conn.GetNet()
- ack := msgAckSerialize(msgRandUnserialize(msg).GetHash())
+ view, hash := msgRandUnserialize(msg)
+ ack := msgAckSerialize(view, hash)
net.SendMsg(ack, conn)
}
//export onReceiveAck
func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
- hash := msgAckUnserialize(salticidae.MsgFromC(salticidae.CMsg(_msg)))
+ view, hash := msgAckUnserialize(salticidae.MsgFromC(salticidae.CMsg(_msg)))
id := *(* int)(userdata)
app := &apps[id]
conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
addr := addr2id(conn.GetAddr())
tc := app.getTC(addr)
+ if view != tc.view {
+ fmt.Printf("dropping stale MsgAck")
+ return
+ }
+
if !hash.IsEq(tc.hash) {
panic("corrupted I/O!")
}
@@ -245,7 +262,7 @@ func main() {
var addrs []salticidae.NetAddr
for i := 0; i < 5; i++ {
addrs = append(addrs,
- salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i)))
+ salticidae.NewAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i), &err))
}
netconfig := salticidae.NewPeerNetworkConfig()
nc := netconfig.AsMsgNetworkConfig()