aboutsummaryrefslogblamecommitdiff
path: root/test_p2p_stress/main.go
blob: 095dc561e30575a81f2af9751eb0c1d474b2f67c (plain) (tree)
1
2
3
4
5
6
7
8
9
10

            
                      
                      

                                  
                                


                                                            
                                                        
                                                                




                                              
                                

                                                                                    



                        


          






                                              

 
       

                                                

 
                                                                                   













                                                                                                     

 
                                                                                    






                                                                         

 
                                                                           






                                                                                  

 
                                                                                   





                                                                    

 
                                        



                                                                                                

 
                         





                                                       


                        




                                     

 
                               




                                                            

 
                                   
                                                  


                                              
                                                                    

 
                                                                 






                                           

 
                                                                                           



                                                    



                          

                              
                


                                                         











                                                                                         


                      
                                                                                               







                                                                                 



                                                                                                     













                                                                                   
 



                                                     
 


                                       
 






















                                                                                                                            

 

                                                                                                 












                                                                                            

 
                     
                                                                                                       





                                                                                                                     

 

                                                                    


                                      


               
                                        







                                                                    


             

                                         
 

































                                                                                                              
 



















                                                           
 



                                                                                         
 
                     
 
package main

// #include <stdlib.h>
// #include <stdint.h>
// #include <arpa/inet.h>
// #include "salticidae/network.h"
// void onTerm(int sig, void *);
// void onReceiveRand(msg_t *, msgnetwork_conn_t *, void *);
// void onReceiveAck(msg_t *, msgnetwork_conn_t *, void *);
// void onStopLoop(threadcall_handle_t *, void *);
// void peerHandler(peernetwork_conn_t *, bool, void *);
// void errorHandler(SalticidaeCError *, bool, int32_t, void *);
// void onTimeout(timerev_t *, void *);
// typedef struct timeout_callback_context_t {
//     int app_id;
//     uint64_t addr_id;
//     msgnetwork_conn_t *conn;
// } timeout_callback_context_t;
// static timeout_callback_context_t *timeout_callback_context_new() {
//     timeout_callback_context_t *ctx = malloc(sizeof(timeout_callback_context_t));
//     ctx->conn = NULL;
//     return ctx;
// }
//
import "C"

import (
	"fmt"
	"github.com/Determinant/salticidae-go"
	"math/rand"
	"os"
	"strconv"
	"sync"
	"unsafe"
)

const (
	MSG_OPCODE_RAND salticidae.Opcode = iota
	MSG_OPCODE_ACK
)

func msgRandSerialize(view uint32, size int) (salticidae.Msg, salticidae.UInt256) {
	serialized := salticidae.NewDataStream(false)
	defer serialized.Free()
	serialized.PutU32(salticidae.ToLittleEndianU32(view))
	buffer := make([]byte, size)
	_, err := rand.Read(buffer)
	if err != nil {
		panic("rand source failed")
	}
	serialized.PutData(buffer)
	ba := salticidae.NewByteArrayFromBytes(buffer, false)
	defer ba.Free()
	payload := salticidae.NewByteArrayMovedFromDataStream(serialized, false)
	defer payload.Free()
	return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_RAND, payload, false), ba.GetHash(true)
}

func msgRandUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) {
	payload := msg.GetPayloadByMove()
	succ := true
	view = salticidae.FromLittleEndianU32(payload.GetU32(&succ))
	ba := salticidae.NewByteArrayCopiedFromDataStream(payload, false)
	defer ba.Free()
	hash = ba.GetHash(false)
	return
}

func msgAckSerialize(view uint32, hash salticidae.UInt256) salticidae.Msg {
	serialized := salticidae.NewDataStream(false)
	defer serialized.Free()
	serialized.PutU32(salticidae.ToLittleEndianU32(view))
	hash.Serialize(serialized)
	payload := salticidae.NewByteArrayMovedFromDataStream(serialized, false)
	defer payload.Free()
	return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_ACK, payload, false)
}

func msgAckUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) {
	payload := msg.GetPayloadByMove()
	hash = salticidae.NewUInt256(false)
	succ := true
	view = salticidae.FromLittleEndianU32(payload.GetU32(&succ))
	hash.Unserialize(payload)
	return
}

func checkError(err *salticidae.Error) {
	if err.GetCode() != 0 {
		fmt.Printf("error during a sync call: %s\n", salticidae.StrError(err.GetCode()))
		os.Exit(1)
	}
}

type TestContext struct {
	timer      salticidae.TimerEvent
	timer_ctx  *C.struct_timeout_callback_context_t
	state      int
	view       uint32
	hash       salticidae.UInt256
	ncompleted int
}

type AppContext struct {
	addr  salticidae.NetAddr
	ec    salticidae.EventContext
	net   salticidae.PeerNetwork
	tcall salticidae.ThreadCall
	tc    map[uint64]*TestContext
}

func (self AppContext) Free() {
	for _, tc := range self.tc {
		if tc.timer != nil {
			C.free(unsafe.Pointer(tc.timer_ctx))
		}
	}
}

func NewTestContext() TestContext {
	return TestContext{view: 0, ncompleted: 0}
}

func addr2id(addr salticidae.NetAddr) uint64 {
	return uint64(addr.GetIP()) | (uint64(addr.GetPort()) << 32)
}

func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) {
	if tc, ok := self.tc[addr_id]; ok {
		_tc = tc
	} else {
		_tc = new(TestContext)
		self.tc[addr_id] = _tc
	}
	return
}

func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn, tc *TestContext) {
	msg, hash := msgRandSerialize(tc.view, size)
	defer msg.Free()
	tc.hash = hash
	app.net.AsMsgNetwork().SendMsg(msg, conn)
}

var apps []AppContext
var threads sync.WaitGroup
var segBuffSize = 4096
var ec salticidae.EventContext
var ids []*C.int

//export onTimeout
func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) {
	ctx := (*C.struct_timeout_callback_context_t)(userdata)
	app := &apps[int(ctx.app_id)]
	tc := app.getTC(uint64(ctx.addr_id))
	tc.ncompleted++
	app.net.AsMsgNetwork().Terminate(
		salticidae.MsgNetworkConnFromC(
			salticidae.CMsgNetworkConn(ctx.conn)))
	var s string
	for addr_id, v := range app.tc {
		s += fmt.Sprintf(" %d(%d)", C.ntohs(C.ushort(addr_id>>32)), v.ncompleted)
	}
	fmt.Printf("INFO: %d completed:%s\n", C.ntohs(C.ushort(app.addr.GetPort())), s)
}

//export onReceiveRand
func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
	msg := salticidae.MsgFromC(salticidae.CMsg(_msg))
	conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
	net := conn.GetNet()
	view, hash := msgRandUnserialize(msg)
	defer hash.Free()
	ack := msgAckSerialize(view, hash)
	defer ack.Free()
	net.SendMsg(ack, conn)
}

//export onReceiveAck
func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
	view, hash := msgAckUnserialize(salticidae.MsgFromC(salticidae.CMsg(_msg)))
	defer hash.Free()
	id := int(*(*C.int)(userdata))
	app := &apps[id]
	conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
	pconn := salticidae.NewPeerNetworkConnFromMsgNetworkConnUnsafe(conn, false)
	defer pconn.Free()
	addr := pconn.GetPeerAddr(false)
	defer addr.Free()
	if addr.IsNull() {
		return
	}
	addrID := addr2id(addr)
	tc := app.getTC(addrID)

	if view != tc.view {
		fmt.Printf("dropping stale MsgAck\n")
		return
	}

	if !hash.IsEq(tc.hash) {
		panic("corrupted I/O!")
	}

	if tc.state == segBuffSize*2 {
		sendRand(tc.state, app, conn, tc)
		tc.state = -1
		ctx := C.timeout_callback_context_new()
		ctx.app_id = C.int(id)
		ctx.addr_id = C.uint64_t(addrID)
		ctx.conn = C.msgnetwork_conn_copy(_conn)
		if tc.timer != nil {
			C.msgnetwork_conn_free(tc.timer_ctx.conn)
			C.free(unsafe.Pointer(tc.timer_ctx))
			tc.timer.Del()
		}
		tc.timer = salticidae.NewTimerEvent(app.ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx))
		tc.timer_ctx = ctx
		t := rand.Float64() * 10
		tc.timer.Add(t)
		fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t)
	} else if tc.state == -1 {
		sendRand(rand.Int()%(segBuffSize*10), app, conn, tc)
	} else {
		tc.state++
		sendRand(tc.state, app, conn, tc)
	}
}

//export peerHandler
func peerHandler(_conn *C.struct_peernetwork_conn_t, connected C.bool, userdata unsafe.Pointer) {
	if connected {
		pconn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))
		conn := salticidae.NewMsgNetworkConnFromPeerNetworkConn(pconn, false)
		defer conn.Free()
		id := int(*(*C.int)(userdata))
		app := &apps[id]
		addr := pconn.GetPeerAddr(false)
		defer addr.Free()
		tc := app.getTC(addr2id(addr))
		tc.state = 1
		tc.view++
		sendRand(tc.state, app, conn, tc)
	}
}

//export errorHandler
func errorHandler(_err *C.struct_SalticidaeCError, fatal C.bool, asyncID C.int32_t, _ unsafe.Pointer) {
	err := (*salticidae.Error)(unsafe.Pointer(_err))
	s := "recoverable"
	if fatal {
		s = "fatal"
	}
	fmt.Printf("Captured %s error during an async call %d: %s\n", s, asyncID, salticidae.StrError(err.GetCode()))
}

//export onStopLoop
func onStopLoop(_ *C.threadcall_handle_t, userdata unsafe.Pointer) {
	id := int(*(*C.int)(userdata))
	ec := apps[id].ec
	ec.Stop()
}

//export onTerm
func onTerm(_ C.int, _ unsafe.Pointer) {
	for i, _ := range apps {
		a := &apps[i]
		a.tcall.AsyncCall(
			salticidae.ThreadCallCallback(C.onStopLoop),
			unsafe.Pointer(ids[i]))
	}
	threads.Wait()
	ec.Stop()
}

func main() {
	ec = salticidae.NewEventContext()
	err := salticidae.NewError()

	var addrs []salticidae.NetAddr
	for i := 0; i < 5; i++ {
		addrs = append(addrs,
			salticidae.NewNetAddrFromIPPortString("127.0.0.1:"+strconv.Itoa(12345+i), true, &err))
	}
	netconfig := salticidae.NewPeerNetworkConfig()
	nc := netconfig.AsMsgNetworkConfig()
	nc.SegBuffSize(segBuffSize)
	nc.NWorker(2)
	netconfig.ConnTimeout(5)
	netconfig.PingPeriod(2)
	apps = make([]AppContext, len(addrs))
	ids = make([](*C.int), len(addrs))
	for i, addr := range addrs {
		ec := salticidae.NewEventContext()
		net := salticidae.NewPeerNetwork(ec, netconfig, &err)
		checkError(&err)
		apps[i] = AppContext{
			addr:  addr,
			ec:    ec,
			net:   net,
			tcall: salticidae.NewThreadCall(ec),
			tc:    make(map[uint64]*TestContext),
		}
		ids[i] = (*C.int)(C.malloc(C.sizeof_int))
		*ids[i] = C.int(i)
		_i := unsafe.Pointer(ids[i])
		mnet := net.AsMsgNetwork()
		mnet.RegHandler(MSG_OPCODE_RAND, salticidae.MsgNetworkMsgCallback(C.onReceiveRand), _i)
		mnet.RegHandler(MSG_OPCODE_ACK, salticidae.MsgNetworkMsgCallback(C.onReceiveAck), _i)
		net.RegPeerHandler(salticidae.PeerNetworkPeerCallback(C.peerHandler), _i)
		mnet.RegErrorHandler(salticidae.MsgNetworkErrorCallback(C.errorHandler), _i)
		mnet.Start()
	}

	threads.Add(len(apps))
	for i, _ := range apps {
		app_id := i
		go func() {
			err := salticidae.NewError()
			a := &apps[app_id]
			a.net.Listen(a.addr, &err)
			checkError(&err)
			for _, addr := range addrs {
				if !addr.IsEq(a.addr) {
					a.net.AddPeer(addr)
				}
			}
			a.ec.Dispatch()
			a.net.AsMsgNetwork().Stop()
			a.Free()
			C.free(unsafe.Pointer(ids[app_id]))
			threads.Done()
		}()
	}

	ev_int := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm), nil)
	ev_int.Add(salticidae.SIGINT)
	ev_term := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm), nil)
	ev_term.Add(salticidae.SIGTERM)

	ec.Dispatch()
}