aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h49
-rw-r--r--include/salticidae/network.h4
-rw-r--r--src/conn.cpp47
3 files changed, 47 insertions, 53 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index f352320..a254505 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -46,7 +46,7 @@
namespace salticidae {
-class RingBuffer {
+class SegBuffer {
struct buffer_entry_t {
bytearray_t data;
bytearray_t::iterator offset;
@@ -67,39 +67,39 @@ class RingBuffer {
size_t length() const { return data.end() - offset; }
};
- std::list<buffer_entry_t> ring;
+ std::list<buffer_entry_t> buffer;
size_t _size;
public:
- RingBuffer(): _size(0) {}
- ~RingBuffer() { clear(); }
+ SegBuffer(): _size(0) {}
+ ~SegBuffer() { clear(); }
- void swap(RingBuffer &other) {
- std::swap(ring, other.ring);
+ void swap(SegBuffer &other) {
+ std::swap(buffer, other.buffer);
std::swap(_size, other._size);
}
- RingBuffer(const RingBuffer &other):
- ring(other.ring), _size(other._size) {}
+ SegBuffer(const SegBuffer &other):
+ buffer(other.buffer), _size(other._size) {}
- RingBuffer(RingBuffer &&other):
- ring(std::move(other.ring)), _size(other._size) {
+ SegBuffer(SegBuffer &&other):
+ buffer(std::move(other.buffer)), _size(other._size) {
other._size = 0;
}
- RingBuffer &operator=(RingBuffer &&other) {
+ SegBuffer &operator=(SegBuffer &&other) {
if (this != &other)
{
- RingBuffer tmp(std::move(other));
+ SegBuffer tmp(std::move(other));
tmp.swap(*this);
}
return *this;
}
- RingBuffer &operator=(const RingBuffer &other) {
+ SegBuffer &operator=(const SegBuffer &other) {
if (this != &other)
{
- RingBuffer tmp(other);
+ SegBuffer tmp(other);
tmp.swap(*this);
}
return *this;
@@ -107,13 +107,13 @@ class RingBuffer {
void push(bytearray_t &&data) {
_size += data.size();
- ring.push_back(buffer_entry_t(std::move(data)));
+ buffer.push_back(buffer_entry_t(std::move(data)));
}
bytearray_t pop(size_t len) {
bytearray_t res;
- auto i = ring.begin();
- while (len && i != ring.end())
+ auto i = buffer.begin();
+ while (len && i != buffer.end())
{
size_t copy_len = std::min(i->length(), len);
res.insert(res.end(), i->offset, i->offset + copy_len);
@@ -122,15 +122,16 @@ class RingBuffer {
if (i->offset == i->data.end())
i++;
}
- ring.erase(ring.begin(), i);
+ buffer.erase(buffer.begin(), i);
_size -= res.size();
return std::move(res);
}
size_t size() const { return _size; }
+ bool empty() const { return buffer.empty(); }
void clear() {
- ring.clear();
+ buffer.clear();
_size = 0;
}
};
@@ -162,8 +163,8 @@ class ConnPool {
ConnMode mode;
NetAddr addr;
- RingBuffer send_buffer;
- RingBuffer recv_buffer;
+ SegBuffer send_buffer;
+ SegBuffer recv_buffer;
Event ev_read;
Event ev_write;
@@ -190,7 +191,7 @@ class ConnPool {
int get_fd() const { return fd; }
const NetAddr &get_addr() const { return addr; }
ConnMode get_mode() const { return mode; }
- RingBuffer &read() { return recv_buffer; }
+ SegBuffer &read() { return recv_buffer; }
/** Set the buffer size used for send/receive data. */
void set_seg_buff_size(size_t size) { seg_buff_size = size; }
@@ -212,7 +213,7 @@ class ConnPool {
protected:
/** Close the IO and clear all on-going or planned events. */
- virtual void close() {
+ virtual void on_close() {
ev_read.clear();
ev_write.clear();
ev_connect.clear();
@@ -260,7 +261,7 @@ class ConnPool {
for (auto it: pool)
{
conn_t conn = it.second;
- conn->close();
+ conn->on_close();
}
}
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 7b05bdb..0ea7455 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -191,9 +191,9 @@ class PeerNetwork: public MsgNetwork<MsgType> {
pn(pn) {}
protected:
- void close() override {
+ void on_close() override {
ev_timeout.clear();
- MsgNet::Conn::close();
+ MsgNet::Conn::on_close();
}
void on_setup() override;
diff --git a/src/conn.cpp b/src/conn.cpp
index ba197a0..bfd7c30 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -48,21 +48,20 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) {
if (!(events & EV_WRITE)) return;
auto conn = self(); /* pin the connection */
ssize_t ret = seg_buff_size;
- while (ret == (ssize_t)seg_buff_size)
+ while (!send_buffer.empty() && ret == (ssize_t)seg_buff_size)
{
- if (!send_buffer.size()) /* nothing to write */
- break;
bytearray_t buff_seg = send_buffer.pop(seg_buff_size);
ssize_t size = buff_seg.size();
ret = send(fd, buff_seg.data(), size, MSG_NOSIGNAL);
- SALTICIDAE_LOG_DEBUG("socket sent %d bytes", ret);
- if (ret < size)
+ SALTICIDAE_LOG_DEBUG("socket sent %zd bytes", ret);
+ size -= ret;
+ if (size > 0)
{
- if (ret < 0) /* nothing is sent */
+ if (ret < 1) /* nothing is sent */
{
/* rewind the whole buff_seg */
send_buffer.push(std::move(buff_seg));
- if (errno != EWOULDBLOCK)
+ if (ret < 0 && errno != EWOULDBLOCK)
{
SALTICIDAE_LOG_INFO("reason: %s", strerror(errno));
terminate();
@@ -73,8 +72,8 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) {
{
/* rewind the leftover */
bytearray_t left_over;
- left_over.resize(size - ret);
- memmove(left_over.data(), buff_seg.data() + ret, size - ret);
+ left_over.resize(size);
+ memmove(left_over.data(), buff_seg.data() + ret, size);
send_buffer.push(std::move(left_over));
}
/* wait for the next write callback */
@@ -96,24 +95,18 @@ void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) {
bytearray_t buff_seg;
buff_seg.resize(seg_buff_size);
ret = recv(fd, buff_seg.data(), seg_buff_size, 0);
- SALTICIDAE_LOG_DEBUG("socket read %d bytes", ret);
- if (ret <= 0)
+ SALTICIDAE_LOG_DEBUG("socket read %zd bytes", ret);
+ if (ret < 0 && errno != EWOULDBLOCK)
{
- if (ret < 0 && errno != EWOULDBLOCK)
- {
- SALTICIDAE_LOG_INFO("reason: %s", strerror(errno));
- /* connection err or half-opened connection */
- terminate();
- return;
- }
- if (ret == 0)
- {
- terminate();
- return;
- }
-
- /* EWOULDBLOCK */
- break;
+ SALTICIDAE_LOG_INFO("reason: %s", strerror(errno));
+ /* connection err or half-opened connection */
+ terminate();
+ return;
+ }
+ if (ret == 0)
+ {
+ terminate();
+ return;
}
buff_seg.resize(ret);
recv_buffer.push(std::move(buff_seg));
@@ -215,7 +208,7 @@ void ConnPool::Conn::terminate() {
auto conn = it->second;
assert(conn.get() == this);
pool.erase(it);
- close();
+ on_close();
/* inform the upper layer the connection will be destroyed */
on_teardown();
}