aboutsummaryrefslogblamecommitdiff
path: root/test_p2p_stress/main.go
blob: d80699dbb43495105adc23540f9daa9e39acd63b (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16















                                                            
                                

                                                                                    



                        







                   
             










                                                                      
                   



















































                                                                     









                                                                 


                                                                          
                                        











                                                           
                                        





                                                                                   




                                                                                                      








                                                                                                     
                                                   




                                            
                         
 

                                                                





                                      
                                               


                                                      
                                                                                                                    














                                                                                                



                                                          
                                          




















                                                                    
                  







                                     
                             
                                                                                       








                                                          
                                                












                                                                                              
                   
                   
                              









                                        











                                                                                
package main

// #cgo CFLAGS: -I${SRCDIR}/../salticidae/include/
// #include <stdlib.h>
// #include <arpa/inet.h>
// #include "salticidae/network.h"
// void onTerm(int sig);
// void onReceiveRand(msg_t *, msgnetwork_conn_t *, void *);
// void onReceiveAck(msg_t *, msgnetwork_conn_t *, void *);
// void onStopLoop(threadcall_handle_t *, void *);
// void connHandler(msgnetwork_conn_t *, bool, void *);
// void onTimeout(timerev_t *, void *);
// typedef struct timeout_callback_context_t {
//     int app_id;
//     uint64_t addr_id;
//     msgnetwork_conn_t *conn;
// } timeout_callback_context_t;
// static timeout_callback_context_t *timeout_callback_context_new() {
//     timeout_callback_context_t *ctx = malloc(sizeof(timeout_callback_context_t));
//     ctx->conn = NULL;
//     return ctx;
// }
//
import "C"

import (
    "salticidae-go"
    "math/rand"
    "fmt"
    "sync"
    "unsafe"
    "strconv"
)

var ec salticidae.EventContext
const (
    MSG_OPCODE_RAND salticidae.Opcode = iota
    MSG_OPCODE_ACK
)

func msgRandSerialize(size int) (salticidae.Msg, salticidae.UInt256) {
    buffer := make([]byte, size)
    _, err := rand.Read(buffer)
    if err != nil {
        panic("rand source failed")
    }
    serialized := salticidae.NewDataStreamFromBytes(buffer)
    hash := serialized.GetHash()
    return salticidae.NewMsg(
        MSG_OPCODE_RAND,
        salticidae.NewByteArrayMovedFromDataStream(serialized)), hash
}

func msgRandUnserialize(msg salticidae.Msg) salticidae.DataStream {
    return msg.GetPayload()
}

func msgAckSerialize(hash salticidae.UInt256) salticidae.Msg {
    serialized := salticidae.NewDataStream()
    hash.Serialize(serialized)
    return salticidae.NewMsg(
        MSG_OPCODE_ACK,
        salticidae.NewByteArrayMovedFromDataStream(serialized))
}

func msgAckUnserialize(msg salticidae.Msg) salticidae.UInt256 {
    p := msg.GetPayload()
    hash := salticidae.NewUInt256()
    hash.Unserialize(p)
    p.Free()
    return hash
}

type TestContext struct {
    timer salticidae.TimerEvent
    state int
    hash salticidae.UInt256
    ncompleted int
}

type AppContext struct {
    addr salticidae.NetAddr
    ec salticidae.EventContext
    net salticidae.PeerNetwork
    tcall salticidae.ThreadCall
    tc map[uint64] *TestContext
}

func NewTestContext() TestContext {
    return TestContext { ncompleted: 0 }
}

func addr2id(addr salticidae.NetAddr) uint64 {
    return uint64(addr.GetIP()) | (uint64(addr.GetPort()) << 32)
}

func (self AppContext) getTC(addr_id uint64) (_tc *TestContext) {
    if tc, ok := self.tc[addr_id]; ok {
        _tc = tc
    } else {
        _tc = new(TestContext)
        self.tc[addr_id] = _tc
    }
    return
}

func sendRand(size int, app *AppContext, conn salticidae.MsgNetworkConn) {
    msg, hash := msgRandSerialize(size)
    addr := conn.GetAddr()
    app.getTC(addr2id(addr)).hash = hash
    app.net.AsMsgNetwork().SendMsg(msg, conn)
}

var apps []AppContext
var threads sync.WaitGroup

var seg_buff_size = 4096

//export onTimeout
func onTimeout(_ *C.timerev_t, userdata unsafe.Pointer) {
    ctx := (*C.struct_timeout_callback_context_t)(userdata)
    app := &apps[ctx.app_id]
    tc := app.getTC(uint64(ctx.addr_id))
    tc.ncompleted++
    app.net.AsMsgNetwork().Terminate(salticidae.MsgNetworkConn(ctx.conn))
    var s string
    for addr_id, v := range app.tc {
        s += fmt.Sprintf(" %d(%d)", C.ntohs(C.ushort(addr_id >> 32)), v.ncompleted)
    }
    fmt.Printf("INFO: %d completed:%s\n", C.ntohs(C.ushort(app.addr.GetPort())), s)
}

//export onReceiveRand
func onReceiveRand(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
    msg := salticidae.Msg(_msg)
    hash := msgRandUnserialize(msg).GetHash()
    conn := salticidae.MsgNetworkConn(_conn)
    net := conn.GetNet()
    net.SendMsg(msgAckSerialize(hash), conn)
}

//export onReceiveAck
func onReceiveAck(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, userdata unsafe.Pointer) {
    hash := msgAckUnserialize(salticidae.Msg(_msg))
    id := *(* int)(userdata)
    app := &apps[id]
    conn := salticidae.MsgNetworkConn(_conn)
    _addr := conn.GetAddr()
    addr := addr2id(_addr)
    tc := app.getTC(addr)

    if !hash.IsEq(tc.hash) {
        //fmt.Printf("%s %s\n", hash.GetHex(), tc.hash.GetHex())
        panic("corrupted I/O!")
    }

    if tc.state == seg_buff_size * 2 {
        sendRand(tc.state, app, conn)
        tc.state = -1
        ctx := C.timeout_callback_context_new()
        ctx.app_id = C.int(id)
        ctx.addr_id = C.uint64_t(addr)
        ctx.conn = (*C.struct_msgnetwork_conn_t)(conn)
        tc.timer = salticidae.NewTimerEvent(app.ec, salticidae.TimerEventCallback(C.onTimeout), unsafe.Pointer(ctx))
        t := rand.Float64() * 10
        tc.timer.Add(t)
        fmt.Printf("rand-bomboard phase, ending in %.2f secs\n", t)
    } else if tc.state