From 85b9affbce70ac4b5922158802b227a42c42a203 Mon Sep 17 00:00:00 2001
From: Determinant <ted.sybil@gmail.com>
Date: Mon, 1 Jul 2019 18:36:41 -0400
Subject: preserve outgoing messages with best effort

---
 include/salticidae/conn.h    |   1 +
 include/salticidae/network.h | 159 +++++++++++++++++++++++++++----------------
 2 files changed, 101 insertions(+), 59 deletions(-)

diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 316d9cf..7f74a87 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -513,6 +513,7 @@ class ConnPool {
         {
             auto &conn = it.second;
             conn->stop();
+            conn->set_terminated();
             release_conn(conn);
         }
     }
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 4725b95..c59bbb3 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -271,13 +271,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
     };
 
     private:
-    struct Peer;
+    struct PeerConn;
 
     public:
     class Conn: public MsgNet::Conn {
         friend PeerNetwork;
-        Peer *peer;
-        BoxObj<Peer> _dead_peer;
+        PeerConn *peer;
+        BoxObj<PeerConn> _dead_peer;
         TimerEvent ev_timeout;
 
         void reset_timeout(double timeout);
@@ -308,7 +308,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
     using unknown_peer_callback_t = std::function<void(const NetAddr &claimed_addr, const X509 *cert)>;
 
     private:
-    class Peer {
+    class PeerConn {
         friend PeerNetwork;
         /** connection addr, may be different due to passive mode */
         uint256_t peer_id;
@@ -326,8 +326,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
         bool inbound_handshake;
         double ping_period;
 
-        Peer() = delete;
-        Peer(const uint256_t &peer_id,
+        PeerConn() = delete;
+        PeerConn(const uint256_t &peer_id,
             conn_t conn, conn_t inbound_conn, conn_t outbound_conn,
             const PeerNetwork *pn):
                 peer_id(peer_id),
@@ -335,13 +335,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
                 inbound_conn(inbound_conn),
                 outbound_conn(outbound_conn),
                 ev_ping_timer(
-                    TimerEvent(pn->disp_ec, std::bind(&Peer::ping_timer, this, _1))),
+                    TimerEvent(pn->disp_ec, std::bind(&PeerConn::ping_timer, this, _1))),
                 connected(false),
                 outbound_handshake(false),
                 inbound_handshake(false),
                 ping_period(pn->ping_period) {}
-        Peer &operator=(const Peer &) = delete;
-        Peer(const Peer &) = delete;
+        PeerConn &operator=(const PeerConn &) = delete;
+        PeerConn(const PeerConn &) = delete;
 
         void reset_ping_timer();
         void send_ping();
@@ -351,15 +351,24 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
                 ev_ping_timer.del();
         }
         public:
-        ~Peer() {
+        ~PeerConn() {
             if (inbound_conn) inbound_conn->peer = nullptr;
             if (outbound_conn) outbound_conn->peer = nullptr;
         }
     };
 
+    struct Peer {
+        uint256_t peer_id;
+        TimerEvent ev_retry_timer;
+        conn_t peer_conn;
+
+        Peer(const conn_t &conn):
+            peer_id(), ev_retry_timer(), peer_conn(conn) {}
+    };
+
     std::unordered_map<NetAddr, conn_t> pending_peers;
-    std::unordered_map<NetAddr, std::pair<uint256_t, TimerEvent>> known_peers;
-    std::unordered_map<uint256_t, BoxObj<Peer>> pid2peer;
+    std::unordered_map<NetAddr, BoxObj<Peer>> known_peers;
+    std::unordered_map<uint256_t, BoxObj<PeerConn>> pid2peer;
 
     using pinfo_slock_t = std::shared_lock<std::shared_timed_mutex>;
     using pinfo_ulock_t = std::shared_lock<std::shared_timed_mutex>;
@@ -407,9 +416,10 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
     void pong_handler(MsgPong &&msg, const conn_t &conn);
     void _ping_msg_cb(const conn_t &conn, uint16_t port);
     void _pong_msg_cb(const conn_t &conn, uint16_t port);
-    bool check_handshake(Peer *peer);
+    void check_handshake(PeerConn *peer);
+    void move_peer_buffer(conn_t &old_conn, const conn_t &new_conn);
     void replace_conn(const conn_t &conn);
-    void start_active_conn(const NetAddr &addr);
+    conn_t start_active_conn(const NetAddr &addr);
     static void tcall_reset_timeout(ConnPool::Worker *worker,
                                     const conn_t &conn, double timeout);
     inline conn_t _get_peer_conn(const NetAddr &addr) const;
@@ -636,6 +646,18 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
     pinfo_ulock_t _g(known_peers_lock);
     auto it = known_peers.find(addr);
     if (it == known_peers.end()) return;
+    auto &pp = it->second;
+    if (conn == pp->peer_conn)
+    {
+        pinfo_ulock_t __g(pid2peer_lock);
+        auto it2 = pid2peer.find(pp->peer_id);
+        if (it2 != pid2peer.end())
+        {
+            auto &p = it2->second;
+            if (p->connected && p->conn != conn)
+                move_peer_buffer(pp->peer_conn, p->conn);
+        }
+    }
     if (p)
     {
         if (conn != p->conn) return;
@@ -645,8 +667,8 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
         p->connected = false;
         p->outbound_handshake = false;
         p->inbound_handshake = false;
-        known_peers[p->peer_addr] = std::make_pair(uint256_t(), TimerEvent());
-        Peer *peer = nullptr;
+        pp->peer_id = uint256_t();
+        PeerConn *peer = nullptr;
         {
             pinfo_ulock_t __g(pid2peer_lock);
             auto it2 = pid2peer.find(p->peer_id);
@@ -661,9 +683,9 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
     }
     else
     {
-        if (!it->second.first.is_null()) return;
+        if (!it->second->peer_id.is_null()) return;
     }
-    auto &ev_retry_timer = it->second.second;
+    auto &ev_retry_timer = it->second->ev_retry_timer;
     ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) {
         try {
             start_active_conn(addr);
@@ -673,14 +695,14 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
 }
 
 template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::reset_ping_timer() {
+void PeerNetwork<O, _, __>::PeerConn::reset_ping_timer() {
     assert(ev_ping_timer);
     ev_ping_timer.del();
     ev_ping_timer.add(gen_rand_timeout(ping_period));
 }
 
 template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::send_ping() {
+void PeerNetwork<O, _, __>::PeerConn::send_ping() {
     auto pn = conn->get_net();
     ping_timer_ok = false;
     pong_msg_ok = false;
@@ -689,7 +711,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() {
 }
 
 template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::ping_timer(TimerEvent &) {
+void PeerNetwork<O, _, __>::PeerConn::ping_timer(TimerEvent &) {
     ping_timer_ok = true;
     if (pong_msg_ok)
     {
@@ -699,10 +721,25 @@ void PeerNetwork<O, _, __>::Peer::ping_timer(TimerEvent &) {
 }
 
 template<typename O, O _, O __>
-bool PeerNetwork<O, _, __>::check_handshake(Peer *p) {
+void PeerNetwork<O, _, __>::move_peer_buffer(conn_t &old_conn, const conn_t &new_conn) {
+    assert(old_conn->is_terminated() && old_conn != new_conn);
+    for (;;)
+    {
+        bytearray_t buff_seg = old_conn->send_buffer.move_pop();
+        if (!buff_seg.size()) break;
+        new_conn->write(std::move(buff_seg));
+    }
+    old_conn = new_conn;
+    this->user_tcall->async_call([this, new_conn](ThreadCall::Handle &) {
+        if (peer_cb) peer_cb(new_conn, true);
+    });
+}
+
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::check_handshake(PeerConn *p) {
     if (!(p->inbound_handshake && p->outbound_handshake) ||
             p->connected)
-        return false;
+        return;
     p->clear_all_events();
     if (p->inbound_conn && p->inbound_conn != p->conn)
         p->inbound_conn->peer = nullptr;
@@ -714,29 +751,33 @@ bool PeerNetwork<O, _, __>::check_handshake(Peer *p) {
     p->send_ping();
     {
         pinfo_ulock_t _g(known_peers_lock);
-        known_peers[p->peer_addr] = std::make_pair(p->peer_id, TimerEvent());
+        auto &pp = known_peers[p->peer_addr];
+        pp->peer_id = p->peer_id;
+        pp->ev_retry_timer.del();
+        auto &old_conn = pp->peer_conn;
+        auto &conn = p->conn;
+        if (old_conn != conn)
+        {
+            if (old_conn->is_terminated())
+                move_peer_buffer(old_conn, conn);
+            else
+                this->disp_terminate(old_conn);
+        }
     }
     pending_peers.erase(p->conn->get_addr());
-    if (p->connected)
+    auto color_begin = "";
+    auto color_end = "";
+    if (logger.is_tty())
     {
-        auto color_begin = "";
-        auto color_end = "";
-        if (logger.is_tty())
-        {
-            color_begin = TTY_COLOR_BLUE;
-            color_end = TTY_COLOR_RESET;
-        }
-        SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection %s <-> %s via %s",
-            color_begin,
-            std::string(listen_addr).c_str(),
-            std::string(p->peer_addr).c_str(),
-            std::string(*(p->conn)).c_str(),
-            color_end);
+        color_begin = TTY_COLOR_BLUE;
+        color_end = TTY_COLOR_RESET;
     }
-    this->user_tcall->async_call([this, conn=p->conn](ThreadCall::Handle &) {
-        if (peer_cb) peer_cb(conn, true);
-    });
-    return true;
+    SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection %s <-> %s via %s",
+        color_begin,
+        std::string(listen_addr).c_str(),
+        std::string(p->peer_addr).c_str(),
+        std::string(*(p->conn)).c_str(),
+        color_end);
 }
 
 template<typename O, O _, O __>
@@ -756,9 +797,10 @@ void PeerNetwork<O, _, __>::replace_conn(const conn_t &conn) {
 }
 
 template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
+typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
     auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr));
     replace_conn(conn);
+    return conn;
 }
 
 template<typename O, O _, O __>
@@ -766,12 +808,7 @@ inline typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::_get_peer_c
     auto it = known_peers.find(addr);
     if (it == known_peers.end())
         throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
-    const auto &peer_id = it->second.first;
-    if (peer_id.is_null())
-        throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY);
-    auto it2 = pid2peer.find(peer_id);
-    assert(it2 != pid2peer.end());
-    return it2->second->conn;
+    return it->second->peer_conn;
 }
 /* end: functions invoked by the dispatcher */
 
@@ -827,7 +864,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
                     else
                     {
                         it = pid2peer.insert(std::make_pair(peer_id,
-                            new Peer(peer_id, conn, conn, nullptr, this))).first;
+                            new PeerConn(peer_id, conn, conn, nullptr, this))).first;
                     }
                     auto p = it->second.get();
                     p->inbound_handshake = true;
@@ -895,7 +932,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
                     else
                     {
                         it = pid2peer.insert(std::make_pair(peer_id,
-                            new Peer(peer_id, conn, nullptr, conn, this))).first;
+                            new PeerConn(peer_id, conn, nullptr, conn, this))).first;
                     }
                     auto p = it->second.get();
                     p->outbound_handshake = true;
@@ -960,11 +997,15 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
     this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
         try {
             pinfo_ulock_t _g(known_peers_lock);
-            if (!known_peers.insert(std::make_pair(addr,
-                    std::make_pair(uint256_t(), TimerEvent()))).second)
+            if (known_peers.count(addr))
                 throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS);
-            if (!pending_peers.count(addr))
-                start_active_conn(addr);
+            auto it = pending_peers.find(addr);
+            conn_t conn;
+            if (it == pending_peers.end())
+                conn = start_active_conn(addr);
+            else
+                conn = it->second;
+            known_peers.insert(std::make_pair(addr, new Peer(conn)));
         } catch (const PeerNetworkError &) {
             this->recoverable_error(std::current_exception());
         } catch (...) { this->disp_error_cb(std::current_exception()); }
@@ -980,7 +1021,7 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
             auto it = known_peers.find(addr);
             if (it == known_peers.end())
                 throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
-            auto peer_id = it->second.first;
+            auto peer_id = it->second->peer_id;
             known_peers.erase(it);
             auto it2 = pending_peers.find(addr);
             if (it2 != pending_peers.end())
@@ -992,7 +1033,7 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
             auto it3 = pid2peer.find(peer_id);
             if (it3 != pid2peer.end())
             {
-                auto p = it3->second.get();
+                auto &p = it3->second;
                 this->disp_terminate(p->conn);
                 pid2peer.erase(it3);
             }
@@ -1015,12 +1056,12 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const {
             auto it = known_peers.find(addr);
             if (it == known_peers.end())
                 throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
-            if (it->second.first.is_null())
+            if (it->second->peer_id.is_null())
             {
                 conn = nullptr;
                 return;
             }
-            auto it2 = pid2peer.find(it->second.first);
+            auto it2 = pid2peer.find(it->second->peer_id);
             assert(it2 != pid2peer.end());
             conn = it2->second->conn;
         } catch (const PeerNetworkError &) {
-- 
cgit v1.2.3-70-g09d2