aboutsummaryrefslogblamecommitdiff
path: root/test_p2p_stress/main.go
blob: 103ec4b156c5845f8329863d63763ec3f765009f (plain) (tree)
1
2
3
4
5
6
7
8
9


                                                  
                                                                                          
                      
                      

                                  
                                


                                                            
                                                        
                                                                




                                              
                                

                                                                                    



                        


          
                                          
               
        


            
             

 




                                            
                                                                                   
                                                                          
                                                         

                                
                   

                                   
                              

                                                                                                  
                                                                                                 

 
                                                                                    
                                     
                


                                                                                      
          

 
                                                                           
                                                                          
                                                         
                              
                                                                                                  
                                                                              

 
                                                                                   

                                       
                

                                                                
          

 






                                                                                        

                               
                                                  
             
               











                               







                                                
                                   
                                                 





                                                                









                                                                 
                                                                                           
                                                                  
                  
                                             



                          

                              
                



                                                           
                                 
                                        
                   


                                                  



                                                                                   



                                                                                   
                                                                                               
                                                     
                                                                             
                        

                                                            
                          



                                                                                                     
                                                                                                  
                                  
                    
                                                                             




                                                                                                   
 
                        
                                             


              
                            


                               
                                    
                                         
                     
                                               
                              
                                        
                                                
                            
                                                     
                                                
                          
         
                                                                                                                    
                          



                                                                   
                                                                

                  
                                         


     

                                                                                                 
                  


                                                                                                
                        




                                                           


     
                     
                                                                                                       


                                                    
                                                                                                                 

 

                                                                    

                                  



               
                                        



                                                        
                                   
     
                  




                                     
                                

                                  
                            
                             
                                                                                                      

                                                  
                                        
                               


                            
                                         
                                      

                                          
                                                                               


                              
                     
                                                

                                               


                                                 


                                                                                               
                                                                                 

                                                                                    



                            
                   
                   
                                        
                              
                                                        





                                        
                                       

                                               


                          
 
                                                                                    
                                 
                                                                                     


                                   
 
package main

// #cgo CFLAGS: -I${SRCDIR}/../salticidae/include/
// #cgo LDFLAGS: ${SRCDIR}/../salticidae/libsalticidae.so -Wl,-rpath=${SRCDIR}/salticidae/
// #include <stdlib.h>
// #include <stdint.h>
// #include <arpa/inet.h>
// #include "salticidae/network.h"
// void onTerm(int sig, void *);
// void onReceiveRand(msg_t *, msgnetwork_conn_t *, void *);
// void onReceiveAck(msg_t *, msgnetwork_conn_t *, void *);
// void onStopLoop(threadcall_handle_t *, void *);
// void peerHandler(peernetwork_conn_t *, bool, void *);
// void errorHandler(SalticidaeCError *, bool, int32_t, void *);
// void onTimeout(timerev_t *, void *);
// typedef struct timeout_callback_context_t {
//     int app_id;
//     uint64_t addr_id;
//     msgnetwork_conn_t *conn;
// } timeout_callback_context_t;
// static timeout_callback_context_t *timeout_callback_context_new() {
//     timeout_callback_context_t *ctx = malloc(sizeof(timeout_callback_context_t));
//     ctx->conn = NULL;
//     return ctx;
// }
//
import "C"

import (
    "github.com/Determinant/salticidae-go"
    "math/rand"
    "os"
    "fmt"
    "sync"
    "unsafe"
    "strconv"
)

const (
    MSG_OPCODE_RAND salticidae.Opcode = iota
    MSG_OPCODE_ACK
)

func msgRandSerialize(view uint32, size int) (salticidae.Msg, salticidae.UInt256) {
    serialized := salticidae.NewDataStream(false); defer serialized.Free()
    serialized.PutU32(salticidae.ToLittleEndianU32(view))
    buffer := make([]byte, size)
    _, err := rand.Read(buffer)
    if err != nil {
        panic("rand source failed")
    }
    serialized.PutData(buffer)
    ba := salticidae.NewByteArrayFromBytes(buffer, false); defer ba.Free()
    payload := salticidae.NewByteArrayMovedFromDataStream(serialized, false); defer payload.Free()
    return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_RAND, payload, false), ba.GetHash(true)
}

func msgRandUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) {
    payload := msg.GetPayloadByMove()
    succ := true
    view = salticidae.FromLittleEndianU32(payload.GetU32(&succ))
    ba := salticidae.NewByteArrayCopiedFromDataStream(payload, false); defer ba.Free()
    hash = ba.GetHash(false)
    return
}

func msgAckSerialize(view uint32, hash salticidae.UInt256) salticidae.Msg {
    serialized := salticidae.NewDataStream(false); defer serialized.Free()
    serialized.PutU32(salticidae.ToLittleEndianU32(view))
    hash.Serialize(serialized)
    payload := salticidae.NewByteArrayMovedFromDataStream(serialized, false); defer payload.Free()
    return salticidae.NewMsgMovedFromByteArray(MSG_OPCODE_ACK, payload, false)
}

func msgAckUnserialize(msg salticidae.Msg) (view uint32, hash salticidae.UInt256) {
    payload := msg.GetPayloadByMove()
    hash = salticidae.NewUInt256(false)
    succ := true
    view = salticidae.FromLittleEndianU32(payload.GetU32(&succ))
    hash.Unserialize(payload)
    return
}

func checkError(err *salticidae.Error) {
    if err.GetCode() != 0 {
        fmt.Printf("error during a sync call: %s\n", salticidae.StrError(err.GetCode()))
        os.Exit(1)
    }
}

type TestContext struct {
    timer salticidae.TimerEvent
    timer_ctx *C.struct_timeout_callback_context_t
    state int
    view uint32
    hash salticidae.UInt256
    ncompleted int
}

type AppContext struct {
    addr salticidae.NetAddr
    ec salticidae.EventContext
    net salticidae.PeerNetwork
    tcall salticidae.ThreadCall
    tc map[uint64] *TestContext
}

func (self AppContext) Free() {
    for _, tc := range self.tc {
        if tc.timer != nil {
            C.free(unsafe.Pointer(tc.timer_ctx))
        }
    }
}

func NewTestContext() TestContext {
    return TestContext { view: 0, ncompleted: 0 }
}

func addr2id(addr salticidae.NetAddr) uint64 {
    return uint64(addr.GetIP()) | (uint64(addr.GetPort()) << 32)
}

func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) {
    if tc, ok := self.tc[addr_id]; ok {
        _tc = tc
    } else {
        _tc = new(TestContext)
        self.tc[addr_id] = _tc
    }
    return
}

func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn, tc *TestContext) {
    msg, hash := msgRandSerialize(tc.view, size); defer msg.Free()
    tc.hash = hash
    app.net.AsMsgNetwork().SendMsg(msg, conn)
}

var apps []AppContext
var threads sync.WaitGroup
var segBuffSize = 4096
var ec salticidae.EventContext
var ids []*C.int

//export onTimeout
func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) {
    ctx := (*C.struct_timeout_callback_context_t)(userdata)
    app := &apps[int(ctx.app_id)]
    tc := app.getTC(uint64(ctx.addr_id))
    tc.ncompleted++
    app.net.AsMsgNetwork().Terminate(
        salticidae.MsgNetworkConnFromC(
            salticidae.CMsgNetworkConn(ctx.conn)))
    var s string
    for addr_id, v := range app.tc {
        s += fmt.Sprintf(" %d(%d)", C.ntohs(C.ushort(addr_id >> 32)), v.ncompleted)
    }
    fmt.Printf("INFO: %d completed:%s\n", C.ntohs(C.ushort(app.addr.GetPort())), s)
}

//export onReceiveRand
func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.Pointer) {
    msg := salticidae.MsgFromC(salticidae.CMsg(_msg))
    conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
    net := conn.GetNet()
    view, hash := msgRandUnserialize(msg); defer hash.Free()
    ack := msgAckSerialize(view, hash); defer ack.Free()
    net.SendMsg(ack, conn)
}

//export onReceiveAck
func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
    view, hash := msgAckUnserialize(salticidae.MsgFromC(salticidae.CMsg(_msg))); defer hash.Free()
    id := int(*(*C.int)(userdata))
    app := &apps[id]
    conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
    pconn := salticidae.NewPeerNetworkConnFromMsgNetworkConnUnsafe(conn, false); defer pconn.Free()
    addr := pconn.GetPeerAddr(false); defer addr.Free()
    if addr.IsNull() { return }
    addrID := addr2id(addr)
    tc := app.getTC(addrID)

    if view != tc.view {
        fmt.Printf("dropping stale MsgAck\n")
        return
    }

    if !hash.IsEq(tc.hash) {
        panic("corrupted I/O!")
    }

    if tc.state == segBuffSize * 2 {
        sendRand(tc.state, app, conn, tc)
        tc.state = -1
        ctx := C.timeout_callback_context_new()
        ctx.app_id = C.int(id)
        ctx.addr_id = C.uint64_t(addrID)
        ctx.conn = C.msgnetwork_conn_copy(_conn)
        if tc.timer != nil {
            C.msgnetwork_conn_free(tc.timer_ctx.conn)
            C.free(unsafe.Pointer(tc.timer_ctx))
            tc.timer.Del()
        }
        tc.timer = salticidae.NewTimerEvent(app.ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx))
        tc.timer_ctx = ctx
        t := rand.Float64() * 10
        tc.timer.Add(t)
        fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t)
    } else if tc.state == -1 {
        sendRand(rand.Int() % (segBuffSize * 10), app, conn, tc)
    } else {
        tc.state++
        sendRand(tc.state, app, conn, tc)
    }
}

//export peerHandler
func peerHandler(_conn *C.struct_peernetwork_conn_t, connected C.bool, userdata unsafe.Pointer) {
    if connected {
        pconn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))
        conn := salticidae.NewMsgNetworkConnFromPeerNetworkConn(pconn, false); defer conn.Free()
        id := int(*(*C.int)(userdata))
        app := &apps[id]
        addr := pconn.GetPeerAddr(false); defer addr.Free()
        tc := app.getTC(addr2id(addr))
        tc.state = 1
        tc.view++
        sendRand(tc.state, app, conn, tc)
    }
}

//export errorHandler
func errorHandler(_err *C.struct_SalticidaeCError, fatal C.bool, asyncID C.int32_t, _ unsafe.Pointer) {
    err := (*salticidae.Error)(unsafe.Pointer(_err))
    s := "recoverable"
    if fatal { s = "fatal" }
    fmt.Printf("Captured %s error during an async call %d: %s\n", s, asyncID, salticidae.StrError(err.GetCode()))
}

//export onStopLoop
func onStopLoop(_ *C.threadcall_handle_t, userdata unsafe.Pointer) {
    id := int(*(*C.int)(userdata))
    ec := apps[id].ec
    ec.Stop()
}

//export onTerm
func onTerm(_ C.int, _ unsafe.Pointer) {
    for i, _ := range apps {
        a := &apps[i]
        a.tcall.AsyncCall(
            salticidae.ThreadCallCallback(C.onStopLoop),
            unsafe.Pointer(ids[i]))
    }
    threads.Wait()
    ec.Stop()
}

func main() {
    ec = salticidae.NewEventContext()
    err := salticidae.NewError()

    var addrs []salticidae.NetAddr
    for i := 0; i < 5; i++ {
        addrs = append(addrs,
            salticidae.NewNetAddrFromIPPortString("127.0.0.1:" + strconv.Itoa(12345 + i), true, &err))
    }
    netconfig := salticidae.NewPeerNetworkConfig()
    nc := netconfig.AsMsgNetworkConfig()
    nc.SegBuffSize(segBuffSize)
    nc.NWorker(2)
    netconfig.ConnTimeout(5)
    netconfig.PingPeriod(2)
    apps = make([]AppContext, len(addrs))
    ids = make([](*C.int), len(addrs))
    for i, addr := range addrs {
        ec := salticidae.NewEventContext()
        net := salticidae.NewPeerNetwork(ec, netconfig, &err); checkError(&err)
        apps[i] = AppContext {
            addr: addr,
            ec: ec,
            net: net,
            tcall: salticidae.NewThreadCall(ec),
            tc: make(map[uint64] *TestContext),
        }
        ids[i] = (*C.int)(C.malloc(C.sizeof_int))
        *ids[i] = C.int(i)
        _i := unsafe.Pointer(ids[i])
        mnet := net.AsMsgNetwork()
        mnet.RegHandler(MSG_OPCODE_RAND, salticidae.MsgNetworkMsgCallback(C.onReceiveRand), _i)
        mnet.RegHandler(MSG_OPCODE_ACK, salticidae.MsgNetworkMsgCallback(C.onReceiveAck), _i)
        net.RegPeerHandler(salticidae.PeerNetworkPeerCallback(C.peerHandler), _i)
        mnet.RegErrorHandler(salticidae.MsgNetworkErrorCallback(C.errorHandler), _i)
        mnet.Start()
    }

    threads.Add(len(apps))
    for i, _ := range apps {
        app_id := i
        go func() {
            err := salticidae.NewError()
            a := &apps[app_id]
            a.net.Listen(a.addr, &err); checkError(&err)
            for _, addr := range addrs {
                if !addr.IsEq(a.addr) {
                    a.net.AddPeer(addr)
                }
            }
            a.ec.Dispatch()
            a.net.AsMsgNetwork().Stop()
            a.Free()
            C.free(unsafe.Pointer(ids[app_id]))
            threads.Done()
        }()
    }

    ev_int := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm), nil)
    ev_int.Add(salticidae.SIGINT)
    ev_term := salticidae.NewSigEvent(ec, salticidae.SigEventCallback(C.onTerm), nil)
    ev_term.Add(salticidae.SIGTERM)

    ec.Dispatch()
}