【Linux】异步网络库dyad代码阅读

2023-05-16

简介

dyad是一个基于C编写的异步网络库,非常精简,单C文件,仅实现TCP,很适合用来学习Linux网络编程和异步非阻塞处理

链接

Github链接

基于Dyad的echo server实现

我写了一些注释

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "dyad.h"

/* An echo server: Echos any data received by a client back to the client */

static void onData(dyad_Event *e) {
  dyad_write(e->stream, e->data, e->size);
}

static void onAccept(dyad_Event *e) {
  dyad_addListener(e->remote, DYAD_EVENT_DATA, onData, NULL);
  dyad_writef(e->remote, "echo server\r\n");
}

static void onError(dyad_Event *e) {
  printf("server error: %s\n", e->msg);
}

int main(void) {
  dyad_Stream *s;
  dyad_init();//初始化dyad

  s = dyad_newStream();//新建一个流
  dyad_addListener(s, DYAD_EVENT_ERROR,  onError,  NULL);//添加监听
  dyad_addListener(s, DYAD_EVENT_ACCEPT, onAccept, NULL);//添加监听
  dyad_listen(s, 8000);//监听8000端口

  while (dyad_getStreamCount() > 0) {
    dyad_update();//更新
  }

  return 0;
}

设计思想

整体实现是基于select事件链表的方式(也就是它定义的stream)实现的,核心思想还是链表,所以分析也是从它的基础结构dyad_Stream结构体开始,我会试着分析下echo server实现中涉及到的函数.

dyad_Stream结构体

struct dyad_Stream {
  int state, flags;//状态标志
  dyad_Socket sockfd;//文件描述符
  char *address;//地址
  int port;//端口号
  int bytesSent, bytesReceived;//字节发送与接收
  double lastActivity, timeout;//上一次的活动时间,超时时间
  Vec(Listener) listeners;//listeners链表
  Vec(char) lineBuffer;//lineBuffer链表
  Vec(char) writeBuffer;//writeBuffer链表
  dyad_Stream *next;//链表结构
};

作者是这样定义TCP流的,可以看到该结构是存在链表数据结构的next指针,指向下一个dyad_Stream
Vec是实现模板动态数组的一组宏,应该说是这个网络库的核心数据结构了,可以看到这个“模板动态数组”是支持任意数据结构的。

Vec动态数组

static void vec_expand(char **data, int *length, int *capacity, int memsz) {
  if (*length + 1 > *capacity) {
    if (*capacity == 0) {
      *capacity = 1;
    } else {
      *capacity <<= 1;
    }
    *data = dyad_realloc(*data, *capacity * memsz);
  }
}

static void vec_splice(
  char **data, int *length, int *capacity, int memsz, int start, int count
) {
  (void) capacity;
  memmove(*data + start * memsz,
          *data + (start + count) * memsz,
          (*length - start - count) * memsz);
}


#define Vec(T)\
  struct { T *data; int length, capacity; }


#define vec_unpack(v)\
  (char**)&(v)->data, &(v)->length, &(v)->capacity, sizeof(*(v)->data)


#define vec_init(v)\
  memset((v), 0, sizeof(*(v)))


#define vec_deinit(v)\
  dyad_free((v)->data)

//清除长度
#define vec_clear(v)\
  ((v)->length = 0)


#define vec_push(v, val)\
  ( vec_expand(vec_unpack(v)),\
    (v)->data[(v)->length++] = (val) )

//分割
#define vec_splice(v, start, count)\
  ( vec_splice(vec_unpack(v), start, count),\
    (v)->length -= (count) )

这里我举一个例子(就下面这句),简单分析下它是如何实现入队列和扩容的

Listener listener;
vec_push(&stream->listeners, listener);

这里涉及到vec_push,也就是把listener变量压入链表中

#define vec_push(v, val)\
  ( vec_expand(vec_unpack(v)),\
    (v)->data[(v)->length++] = (val) )

可以看到这个宏,里面又包含了vec_unpack这个宏和vec_expand这个函数,展开就是这样

( vec_expand(vec_unpack(&stream->listeners)), (&stream->listeners)->data[(&stream->listeners)->length++] = (listener) )

再展开的话

vec_expand((char**)&(&stream->listeners)->data, &(&stream->listeners)->length, &(&stream->listeners)->capacity, sizeof(*(&stream->listeners)->data));

不得不感叹,作者把宏定义玩出了花~
我们着重看vec_expand函数

static void vec_expand(char **data, int *length, int *capacity, int memsz) {
  if (*length + 1 > *capacity) {//判断容量是不是超
    if (*capacity == 0) {//如果容量是0
      *capacity = 1;//设定容量为1
    } else {
      *capacity <<= 1;//将容量翻倍(右移1表示乘以2)
    }
    *data = dyad_realloc(*data, *capacity * memsz);//重新分配内存,扩容
  }
}

dyad_init

void dyad_init(void) {
#ifdef _WIN32
  WSADATA dat;
  int err = WSAStartup(MAKEWORD(2, 2), &dat);
  if (err != 0) {
    panic("WSAStartup failed (%d)", err);
  }
#else
  /* 当对一个已经停止了的socket写操作时,停止SIGPIPE信号*/
  signal(SIGPIPE, SIG_IGN);
#endif
}

#define SIG_IGN (__p_sig_fn_t)1

这个是dyad库初始化的函数
在linux下这个函数就是把SIGPIPE信号重定向到SIG_IGN(忽略)
这里我把它原来的注释翻译了下,应该是确保socket如果停止了后,如果再对其写操作,不会触发SIGPIPE

就是客户端程序向服务器端程序发送了消息,然后关闭客户端,服务器端返回消息的时候就会收到内核给的SIGPIPE信号

dyad_newStream

初始化一个新的流

static dyad_Stream *dyad_streams;//链表头
....
//新建一个流
dyad_Stream *dyad_newStream(void) {
  dyad_Stream *stream = dyad_realloc(NULL, sizeof(*stream));
  memset(stream, 0, sizeof(*stream));
  stream->state = DYAD_STATE_CLOSED;
  stream->sockfd = INVALID_SOCKET;
  stream->lastActivity = dyad_getTime();
  /* Add to list and increment count */
  stream->next = dyad_streams;//新的元素的下一个指向链表头
  dyad_streams = stream;//链表头指向新的元素
  dyad_streamCount++;
  return stream;
}

可以看到它将新分配的stream插入到的链表中,dyad_streams是链表头,是一个全局静态变量.
在这里插入图片描述

dyad_addListener

添加一个新的监听事件(^ ^就这样翻译吧)

//添加监听
void dyad_addListener(
  dyad_Stream *stream, int event, dyad_Callback callback, void *udata
) {
  Listener listener;
  listener.event = event;
  listener.callback = callback;
  listener.udata = udata;
  vec_push(&stream->listeners, listener);//压入链表(先扩容,然后把数据加进去)
}

将指定的事件,指定的回调函数加到流结构体的listener链表上
这个库包含的事件有

enum {
  DYAD_EVENT_NULL,
  DYAD_EVENT_DESTROY,
  DYAD_EVENT_ACCEPT,
  DYAD_EVENT_LISTEN,
  DYAD_EVENT_CONNECT,
  DYAD_EVENT_CLOSE,
  DYAD_EVENT_READY,
  DYAD_EVENT_DATA,
  DYAD_EVENT_LINE,
  DYAD_EVENT_ERROR,
  DYAD_EVENT_TIMEOUT,
  DYAD_EVENT_TICK
};

dyad_listen

指定流的监听端口号
这个函数主要涉及到socket初始化,底层还套了一层函数dyad_listenEx

//监听
int dyad_listen(dyad_Stream *stream, int port) {
  return dyad_listenEx(stream, NULL, port, 511);
}

///listen底层初始化
int dyad_listenEx(
  dyad_Stream *stream, const char *host, int port, int backlog
) {
  struct addrinfo hints, *ai = NULL;
  int err, optval;
  char buf[64];
  dyad_Event e;

  /* 获取地址信息 Get addrinfo */
  memset(&hints, 0, sizeof(hints));
  hints.ai_family = AF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;
  hints.ai_flags = AI_PASSIVE;
  sprintf(buf, "%d", port);
  err = getaddrinfo(host, buf, &hints, &ai);
  if (err) {
    stream_error(stream, "could not get addrinfo", errno);
    goto fail;
  }
  /* 初始化socket Init socket */
  err = stream_initSocket(stream, ai->ai_family, ai->ai_socktype,
                          ai->ai_protocol);
  if (err) goto fail;
  /* Set SO_REUSEADDR so that the socket can be immediately bound without
   * having to wait for any closed socket on the same port to timeout */
  optval = 1;
  setsockopt(stream->sockfd, SOL_SOCKET, SO_REUSEADDR,
             &optval, sizeof(optval));
  /* Bind and listen */
  err = bind(stream->sockfd, ai->ai_addr, ai->ai_addrlen);
  if (err) {
    stream_error(stream, "could not bind socket", errno);
    goto fail;
  }
  err = listen(stream->sockfd, backlog);
  if (err) {
    stream_error(stream, "socket failed on listen", errno);
    goto fail;
  }
  stream->state = DYAD_STATE_LISTENING;//修改流状态
  stream->port = port;
  stream_initAddress(stream);
  /* Emit listening event */
  e = createEvent(DYAD_EVENT_LISTEN);//创建事件
  e.msg = "socket is listening";
  stream_emitEvent(stream, &e);//触发事件
  freeaddrinfo(ai);
  return 0;
  fail:
  if (ai) freeaddrinfo(ai);
  return -1;
}

这个函数主要干了这些事,我这里不贴其他涉及的函数了,比较多

  1. 获取地址信息
  2. 初始化socket并设置为非阻塞和端口复用
  3. socket绑定和监听
  4. 修改流的状态,设置流的地址信息
  5. 创建事件并触发事件

dyad_getStreamCount

获取流的总数,这个没啥好说的

dyad_update

这个库的核心函数,着重分析这个函数
这个函数就是用来处理整个事件触发机制的

///更新
void dyad_update(void) {
  dyad_Stream *stream;
  struct timeval tv;

  destroyClosedStreams();//销毁掉已经关掉的流
  updateTickTimer();//更新计时器
  updateStreamTimeouts();//更新流的超时计时器

  /* Create fd sets for select() */
  select_zero(&dyad_selectSet);
  //遍历每一个流链表的子元素,根据TCP状态处理select事件
  stream = dyad_streams;
  while (stream) {
    switch (stream->state) {
      case DYAD_STATE_CONNECTED:
        select_add(&dyad_selectSet, SELECT_READ, stream->sockfd);
        if (!(stream->flags & DYAD_FLAG_READY) ||
            stream->writeBuffer.length != 0
        ) {
          select_add(&dyad_selectSet, SELECT_WRITE, stream->sockfd);
        }
        break;
      case DYAD_STATE_CLOSING:
        select_add(&dyad_selectSet, SELECT_WRITE, stream->sockfd);
        break;
      case DYAD_STATE_CONNECTING:
        select_add(&dyad_selectSet, SELECT_WRITE, stream->sockfd);
        select_add(&dyad_selectSet, SELECT_EXCEPT, stream->sockfd);
        break;
      case DYAD_STATE_LISTENING:
        select_add(&dyad_selectSet, SELECT_READ, stream->sockfd);
        break;
    }
    stream = stream->next;
  }

  /* Init timeout value and do select */
  #ifdef _MSC_VER
    #pragma warning(push)
    /* Disable double to long implicit conversion warning,
     * because the type of timeval's fields don't agree across platforms */
    #pragma warning(disable: 4244)
  #endif
  tv.tv_sec = dyad_updateTimeout;
  tv.tv_usec = (dyad_updateTimeout - tv.tv_sec) * 1e6;
  #ifdef _MSC_VER
    #pragma warning(pop)
  #endif
  //监视文件描述符的读/写/异常变化
  select(dyad_selectSet.maxfd + 1,
         dyad_selectSet.fds[SELECT_READ],
         dyad_selectSet.fds[SELECT_WRITE],
         dyad_selectSet.fds[SELECT_EXCEPT],
         &tv);

  /* Handle streams */
  //遍历每一个流链表的子元素,根据select的状态做事情
  stream = dyad_streams;
  while (stream) {
    switch (stream->state) {

      case DYAD_STATE_CONNECTED://连接上
        if (select_has(&dyad_selectSet, SELECT_READ, stream->sockfd)) {
          stream_handleReceivedData(stream);
          if (stream->state == DYAD_STATE_CLOSED) {
            break;
          }
        }
        /* Fall through */

      case DYAD_STATE_CLOSING://关闭
        if (select_has(&dyad_selectSet, SELECT_WRITE, stream->sockfd)) {
          stream_flushWriteBuffer(stream);
        }
        break;

      case DYAD_STATE_CONNECTING://正在连接
        if (select_has(&dyad_selectSet, SELECT_WRITE, stream->sockfd)) {
          /* Check socket for error */
          int optval = 0;
          socklen_t optlen = sizeof(optval);
          dyad_Event e;
          getsockopt(stream->sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen);
          if (optval != 0) goto connectFailed;//判断有没有socket错误
          /* Handle succeselful connection */
          stream->state = DYAD_STATE_CONNECTED;
          stream->lastActivity = dyad_getTime();
          stream_initAddress(stream);
          /* Emit connect event */
          e = createEvent(DYAD_EVENT_CONNECT);
          e.msg = "connected to server";
          stream_emitEvent(stream, &e);
        } else if (
          select_has(&dyad_selectSet, SELECT_EXCEPT, stream->sockfd)
        ) {
          /* Handle failed connection */
connectFailed:
          stream_error(stream, "could not connect to server", 0);
        }
        break;

      case DYAD_STATE_LISTENING://正在监听
        if (select_has(&dyad_selectSet, SELECT_READ, stream->sockfd)) {
          stream_acceptPendingConnections(stream);
        }
        break;
    }

    /* If data was just now written to the stream we should immediately try to
     * send it */
    if (
      stream->flags & DYAD_FLAG_WRITTEN &&
      stream->state != DYAD_STATE_CLOSED
    ) {
      stream_flushWriteBuffer(stream);
    }

    stream = stream->next;
  }
}

在分析它之前,我们先看看SelectSet结构体,因为这个和后面的select有很大的关系

SelectSet

enum {
  SELECT_READ,
  SELECT_WRITE,
  SELECT_EXCEPT,
  SELECT_MAX
};
//Select组
typedef struct {
  int capacity;
  dyad_Socket maxfd;
  fd_set *fds[SELECT_MAX];
} SelectSet;

#define DYAD_UNSIGNED_BIT (sizeof(unsigned) * CHAR_BIT)


static void select_deinit(SelectSet *s) {
  int i;
  for (i = 0; i < SELECT_MAX; i++) {
    dyad_free(s->fds[i]);
    s->fds[i] = NULL;
  }
  s->capacity = 0;
}

//select组扩容
static void select_grow(SelectSet *s) {
  int i;
  int oldCapacity = s->capacity;
  s->capacity = s->capacity ? s->capacity << 1 : 1;
  for (i = 0; i < SELECT_MAX; i++) {
    s->fds[i] = dyad_realloc(s->fds[i], s->capacity * sizeof(fd_set));
    memset(s->fds[i] + oldCapacity, 0,
           (s->capacity - oldCapacity) * sizeof(fd_set));
  }
}


static void select_zero(SelectSet *s) {
  int i;
  if (s->capacity == 0) return;
  s->maxfd = 0;
  for (i = 0; i < SELECT_MAX; i++) {
#if _WIN32
    s->fds[i]->fd_count = 0;
#else
    memset(s->fds[i], 0, s->capacity * sizeof(fd_set));
#endif
  }
}

//向select组添加select
static void select_add(SelectSet *s, int set, dyad_Socket fd) {
#ifdef _WIN32
  fd_set *f;
  if (s->capacity == 0) select_grow(s);
  while ((unsigned) (s->capacity * FD_SETSIZE) < s->fds[set]->fd_count + 1) {
    select_grow(s);
  }
  f = s->fds[set];
  f->fd_array[f->fd_count++] = fd;
#else
  unsigned *p;
  while (s->capacity * FD_SETSIZE < fd) {
    select_grow(s);//select组扩容
  }
  p = (unsigned*) s->fds[set];
  p[fd / DYAD_UNSIGNED_BIT] |= 1 << (fd % DYAD_UNSIGNED_BIT);//除以DYAD_UNSIGNED_BIT和% DYAD_UNSIGNED_BIT是为了防止数组越界
  if (fd > s->maxfd) s->maxfd = fd;
#endif
}

//判断指定的selectset有没有对应的事件发生
static int select_has(SelectSet *s, int set, dyad_Socket fd) {
#ifdef _WIN32
  unsigned i;
  fd_set *f;
  if (s->capacity == 0) return 0;
  f = s->fds[set];
  for (i = 0; i < f->fd_count; i++) {
    if (f->fd_array[i] == fd) {
      return 1;
    }
  }
  return 0;
#else
  unsigned *p;
  if (s->maxfd < fd) return 0;
  p = (unsigned*) s->fds[set];
  return p[fd / DYAD_UNSIGNED_BIT] & (1 << (fd % DYAD_UNSIGNED_BIT));
#endif
}

涉及到的代码先贴上
在这里插入图片描述
结构体的结构如上图示
后面要用select函数来找我们感兴趣的文件描述符,select函数的参数如下

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

可以看到需要3个fd_set,分别对应感兴趣的读/写/异常的文件描述符集合
那这个文件描述符集合又是啥

其实这个东西就是一个64位的数,每一个位对应一个文件描述符,如果这一位为1说明对应的文件描述符发生了指定的事[读/写/异常],就是位图

为了实现对这些位图的操作,他提供了上面这些函数,包括赋值/清空/判断/初始化/释放操作函数。
其中我这里重点说下select_add
函数原型是

static void select_add(SelectSet *s, int set, dyad_Socket fd)

static void select_add(SelectSet *s, int set, dyad_Socket fd) {
#ifdef _WIN32
  fd_set *f;
  if (s->capacity == 0) select_grow(s);
  while ((unsigned) (s->capacity * FD_SETSIZE) < s->fds[set]->fd_count + 1) {
    select_grow(s);
  }
  f = s->fds[set];
  f->fd_array[f->fd_count++] = fd;
#else
  unsigned *p;
  while (s->capacity * FD_SETSIZE < fd) {
    select_grow(s);//select组扩容
  }
  p = (unsigned*) s->fds[set];
  p[fd / DYAD_UNSIGNED_BIT] |= 1 << (fd % DYAD_UNSIGNED_BIT);//除以DYAD_UNSIGNED_BIT和% DYAD_UNSIGNED_BIT是为了防止数组越界
  if (fd > s->maxfd) s->maxfd = fd;
#endif
}

流程图如下
在这里插入图片描述
把这个弄明白,就可以来分析逐个函数update函数的实现了

destroyClosedStreams

销毁掉已经关掉的流

static void destroyClosedStreams(void) {
  dyad_Stream *stream = dyad_streams;
  while (stream) {
    if (stream->state == DYAD_STATE_CLOSED) {
      dyad_Stream *next = stream->next;
      stream_destroy(stream);//把需要删掉的流从流链表里销毁
      stream = next;//把后一个替补上去
    } else {
      stream = stream->next;
    }
  }
}

这个函数的作用就是将需要删除的流(已经关闭的流)移出链表,然后把后面的元素接上。
移出链表的实现函数在stream_destroy
关闭socket,将待移除的元素移出链表之后,触发销毁流的事件,释放掉分配的内存。

//销毁流
static void stream_destroy(dyad_Stream *stream) {
  dyad_Event e;
  dyad_Stream **next;
  /* Close socket */
  if (stream->sockfd != INVALID_SOCKET) {
    close(stream->sockfd);
  }
  /* Emit destroy event */
  e = createEvent(DYAD_EVENT_DESTROY);
  e.msg = "the stream has been destroyed";
  stream_emitEvent(stream, &e);
  /* 从链表中删掉这个元素 Remove from list and decrement count */
  next = &dyad_streams;
  while (*next != stream) {
    next = &(*next)->next;
  }
  *next = stream->next;
  dyad_streamCount--;
  /* Destroy and free */
  vec_deinit(&stream->listeners);
  vec_deinit(&stream->lineBuffer);
  vec_deinit(&stream->writeBuffer);
  dyad_free(stream->address);
  dyad_free(stream);
}

updateTickTimer

更新计时器
每一个流都有一个时间计数器和超时计数器
这个函数就是用来维护时间计数器这个计时变量的,每次更新时间之后,触发更新时间的事件

static void updateTickTimer(void) {
  /* Update tick timer */
  if (dyad_lastTick == 0) {
    dyad_lastTick = dyad_getTime();
  }
  while (dyad_lastTick < dyad_getTime()) {
    /* 发射给所有流 Emit event on all streams */
    dyad_Stream *stream;
    dyad_Event e = createEvent(DYAD_EVENT_TICK);
    e.msg = "a tick has occured";
    stream = dyad_streams;
    while (stream) {
      stream_emitEvent(stream, &e);
      stream = stream->next;
    }
    dyad_lastTick += dyad_tickInterval;//更新时间戳
  }
}

updateStreamTimeouts

更新流的超时计时器
这个函数就是用来维护超时计时器这个计时变量的,每次更新时间之后,触发时间超时的事件

///更新流的超时计数器
static void updateStreamTimeouts(void) {
  double currentTime = dyad_getTime();
  dyad_Stream *stream;
  dyad_Event e = createEvent(DYAD_EVENT_TIMEOUT);
  e.msg = "stream timed out";
  stream = dyad_streams;
  while (stream) {
    if (stream->timeout) {
      if (currentTime - stream->lastActivity > stream->timeout) {
        stream_emitEvent(stream, &e);
        dyad_close(stream);
      }
    }
    stream = stream->next;
  }
}

dyad_update流程图

点开大图更清晰
请添加图片描述
画出来搞明白了,就是前半部分switch-case注册select事件,select函数之后,后半部分switch-case处理各个状态下事件发生后的具体处理。

事件

在这个库的框架下,事件也是一个重要的概念
当用户调用dyad_addListener添加需要关注的事件的时候,其实就是向对应的流注册事件,将事件挂到对应的流上。
事件的结构体定义如下:

typedef struct {
  int type;//类型
  void *udata;//用户数据
  dyad_Stream *stream;//对应的流
  dyad_Stream *remote;//对应远端的流
  const char *msg;//消息
  char *data;//数据
  int size;//大小
} dyad_Event;

库提供了函数用于触发事件

stream_emitEvent

static void stream_emitEvent(dyad_Stream *stream, dyad_Event *e) {
  int i;
  e->stream = stream;
  for (i = 0; i < stream->listeners.length; i++) {
    Listener *listener = &stream->listeners.data[i];
    if (listener->event == e->type) {//被触发的事件的类型 == 流中注册的事件类型
      e->udata = listener->udata;
      listener->callback(e);//回调函数
    }
    /* Check to see if this listener was removed: If it was we decrement `i`
     * since the next listener will now be in this ones place */
    if (listener != &stream->listeners.data[i]) {
      i--;
    }
  }
}

事件处理子函数

Update函数里还有一类重要的函数,事件处理子函数
这里我重要读了stream_handleReceivedData,stream_acceptPendingConnections和stream_flushWriteBuffer

stream_handleReceivedData

处理数据的接收

//处理接收到的数据
static void stream_handleReceivedData(dyad_Stream *stream) {
  for (;;) {
    /* Receive data */
    dyad_Event e;
    char data[8192];
    int size = recv(stream->sockfd, data, sizeof(data) - 1, 0);
    if (size <= 0) {
      if (size == 0 || errno != EWOULDBLOCK) {
        /* Handle disconnect */
        dyad_close(stream);
        return;
      } else {
        /* No more data */
        return;
      }
    }
    data[size] = 0;
    /* Update status */
    stream->bytesReceived += size;
    stream->lastActivity = dyad_getTime();
    /* Emit data event */
    e = createEvent(DYAD_EVENT_DATA);
    e.msg = "received data";
    e.data = data;
    e.size = size;
    stream_emitEvent(stream, &e);
    /* Check stream state in case it was closed during one of the data event
     * handlers. */
    if (stream->state != DYAD_STATE_CONNECTED) {
      return;
    }

    /* 处理线事件 Handle line event */
    if (stream_hasListenerForEvent(stream, DYAD_EVENT_LINE)) {
      int i, start;
      char *buf;
      for (i = 0; i < size; i++) {
        vec_push(&stream->lineBuffer, data[i]);//逐字节压入lineBuffer
      }
      start = 0;
      buf = stream->lineBuffer.data;
      for (i = 0; i < stream->lineBuffer.length; i++) {
        if (buf[i] == '\n') {
          dyad_Event e;
          buf[i] = '\0';
          e = createEvent(DYAD_EVENT_LINE);
          e.msg = "received line";
          e.data = &buf[start];
          e.size = i - start;
          /* Check and strip carriage return */
          if (e.size > 0 && e.data[e.size - 1] == '\r') {
            e.data[--e.size] = '\0';//字符串分段处理
          }
          stream_emitEvent(stream, &e);//触发事件
          start = i + 1;
          /* Check stream state in case it was closed during one of the line
           * event handlers. */
          if (stream->state != DYAD_STATE_CONNECTED) {
            return;
          }
        }
      }
      if (start == stream->lineBuffer.length) {
        vec_clear(&stream->lineBuffer);
      } else {
        vec_splice(&stream->lineBuffer, 0, start);
      }
    }
  }
}

可以看到函数先是调用了recv函数,接收数据,然后将接收到的数据挂载到流上,并触发数据接收事件。下半部分,作者设计了一个Line事件(可以理解为行事件嘛),专门处理字符串吧

作者将接收到的数据逐个字节压入流的lineBuffer里,然后也是创建对应的“行事件”,触发“行事件”

stream_acceptPendingConnections

static void stream_acceptPendingConnections(dyad_Stream *stream) {
  for (;;) {
    dyad_Stream *remote;
    dyad_Event e;
    int err = 0;
    dyad_Socket sockfd = accept(stream->sockfd, NULL, NULL);
    if (sockfd == INVALID_SOCKET) {
      err = errno;
      if (err == EWOULDBLOCK) {
        /* No more waiting sockets */
        return;
      }
    }
    /* Create client stream */
    remote = dyad_newStream();
    remote->state = DYAD_STATE_CONNECTED;
    /* Set stream's socket */
    stream_setSocket(remote, sockfd);
    /* Emit accept event */
    e = createEvent(DYAD_EVENT_ACCEPT);
    e.msg = "accepted connection";
    e.remote = remote;
    stream_emitEvent(stream, &e);
    /* Handle invalid socket -- the stream is still made and the ACCEPT event
     * is still emitted, but its shut immediately with an error */
    if (remote->sockfd == INVALID_SOCKET) {
      stream_error(remote, "failed to create socket on accept", err);
      return;
    }
  }
}

这个函数是处理accept的,可以看到调用了accept,如果accept成功,则创建accept事件,触发事件,执行accept的回调函数。

stream_flushWriteBuffer

//处理写缓存
static int stream_flushWriteBuffer(dyad_Stream *stream) {
  stream->flags &= ~DYAD_FLAG_WRITTEN;
  if (stream->writeBuffer.length > 0) {
    /* Send data */
    int size = send(stream->sockfd, stream->writeBuffer.data,
                    stream->writeBuffer.length, 0);
    if (size <= 0) {
      if (errno == EWOULDBLOCK) {
        /* No more data can be written */
        return 0;
      } else {
        /* Handle disconnect */
        dyad_close(stream);
        return 0;
      }
    }
    if (size == stream->writeBuffer.length) {
      vec_clear(&stream->writeBuffer);
    } else {
      vec_splice(&stream->writeBuffer, 0, size);
    }
    /* Update status */
    stream->bytesSent += size;
    stream->lastActivity = dyad_getTime();
  }

  if (stream->writeBuffer.length == 0) {
    dyad_Event e;
    /* If this is a 'closing' stream we can properly close it now */
    if (stream->state == DYAD_STATE_CLOSING) {
      dyad_close(stream);
      return 0;
    }
    /* Set ready flag and emit 'ready for data' event */
    stream->flags |= DYAD_FLAG_READY;
    e = createEvent(DYAD_EVENT_READY);
    e.msg = "stream is ready for more data";
    stream_emitEvent(stream, &e);
  }
  /* Return 1 to indicate that more data can immediately be written to the
   * stream's socket */
  return 1;
}

这个函数是处理写操作的函数,调用了send函数,在处理完send之后,判断是否空闲下来(待发送数据空)来决定是不是需要触发ready事件,执行对应的回调函数。

总结

大部分的核心功能差不多就分析完了,对我这个刚刚入门写Linux程序的菜鸡来说,真是学到了不少东西。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【Linux】异步网络库dyad代码阅读 的相关文章

  • 步进电机和伺服电机的区别你知道吗?

    在许多领域都需要各种电机 xff0c 包括知名的步进电机和伺服电机 但是 xff0c 对于许多用户而言 xff0c 他们不了解这两种电机的主要区别 xff0c 因此他们始终不知道如何选择 那么 xff0c 步进电机和伺服电机之间的主要区别是
  • 独轮车成功站立

    真是废了不少力 卡了这么久首要原因就是过于青睐串级PID 串级PID可以自主寻找机械中位的特性实在是太优雅了 但动量轮这种对即使性要求极高的系统似乎不能用串级PID实现 昨天沉下心把串级PID推掉换成并联 xff0c 波形一下就朝着正常的方
  • 字节序:大端字节序(Big Endian) & 小端字节序(Little Endian)

    一 什么是字节序 xff1f 多字节数据存储在存储器中的顺序就叫做字节序 字节序又分为俩种 xff0c 一种叫做小端字节序 xff1b 另外一种叫做大端字节序 二 大端字节序 xff08 Big Endian xff09 amp 小端字节序
  • Google doc

    https docs google com spreadsheets d 1lOtc072A0QaJAXormoUeiaqZu5 20BR1ikh0YZe65PI edit gid 61 0
  • Boot Loader启动过程分析

    一 Boot Loader的概念和功能 1 嵌入式Linux软件结构与分布在一般情况下嵌入式Linux系统中的软件主要分为以下及部分 xff1a xff08 1 xff09 引导加载程序 xff1a 其中包括内部ROM中的固化启动代码和Bo
  • eclipse:解决Ctrl+S 无法保存问题

    解决Ctrl 43 S 无法保存问题 工作中有一个同事的eclipse中无法使用Ctrl 43 S保存文件 xff0c 尝试了各种方法 xff0c 禁用了所有其他软件的快捷键 xff0c 依然无法解决 xff0c 最终发现是eclipse自
  • windbg学习笔记 FOR 内核调试(三) --进程句柄表HANDLE_TABLE

    windbg学习笔记 FOR 内核调试 三 进程句柄表HANDLE TABLE 想当年 初学核编 阅读第三章的内核对象的时候跟看天书没什么感觉 死命在想到底内核对象 句柄是个什么东西 干嘛用的 于是我们工作室的老大就对我说 这篇看过就过了
  • tx2 上 安装nvidia Isaac安装记录

    Isaac的功能挺多 xff0c 但是只支持最新的版本 xff0c 所以没办法 xff0c 只能装最新的了 下面就记录一下这个过程 安装环境真是很麻烦 xff0c 不是下载失败就是下载慢 一天基本安装不完 第一步安装1804 因为只支持18
  • 关于进程间的访问权限等进程间控制资料收集

    http www vckbase com DUPLICATEHANDLE函数可以实现将同步内核对象被拷贝并且将原内核对象关闭 xff0c 从而达到可以自由控制内核对象的目的 xff0c 这可以实现使得只能单一启用的进程成为多启用的进程的目的
  • IIC总线基础知识

    IIC总线基础知识 一 简介 IIC xff08 Inter Integrated Circuit xff09 是一个多主从的串行总线 xff0c 又叫I2C xff0c 是由飞利浦公司发明的通讯总线 xff0c 属于半双工同步传输类型总线
  • 常见RISC-V介绍

    当前一颗新出的CPU xff1a RISC V简直火透了半边天 xff0c 无论是财大气粗的阿里系的平头哥 xff0c 还是新创企业 xff0c 似乎只要和RISC V挂上钩就足可以实现赶英超美 那事实上RISC V是什么 xff1f 除了
  • qsort的compare函数

    qsort的compare函数 功能 xff1a 使用快速排序例程进行排序 头文件 xff1a stdlib h 用法 xff1a void qsort void base size t num size t width int cdecl
  • 深度相机(3D相机)

    二维图片 xff0c 人眼可以通过物体的相对位置关系判断物体距离的远近 xff0c 而相机则不可以 深度相机 xff08 3D相机 xff09 就是终端和机器人的眼睛 xff0c 其就是通过该相机能检测出拍摄空间的景深距离 通过深度相机获取
  • ubuntu只有一种分辨率的解决方案——4K显示屏与扩展屏幕

    4K显示屏的ubuntu系统在安装NVIDIA显卡后 xff0c 只有一个分辨率为 xff1a 3840 2160 xff0c 如图所示 xff0c 该分辨率在扩展显示器的使用时 xff0c 会出现4K主屏幕分辨率很高 xff0c 而扩展显
  • 使用RTSO-9003拓展板的TX2镜像备份与恢复

    须知 TX2进入recovery模式 在给TX2通电的时候 xff0c 马上按住板子上的recovery键 xff0c 持续3秒左右 xff0c 继续保持按住recovery键并按住reset键 xff0c 便可以进入TX2的recover
  • MATLAB学习笔记

    MATLAB学习笔记 一级目录备忘录HELP文件路径MATLAB函数定义与表达MATLAB特殊变量和常量MATLAB文件读写数组数组的创建常用操作常用操作常用操作 MATLAB画图plot函数 xff1a 改变图像中线的颜色和线条形式leg
  • ubuntu16.04 boot空间不足 no space left on device

    Linux 中 boot 是存放系统启动文件的地方 xff0c 安装 ubuntu 时单独分区给 200M 足够 xff0c 但是系统内核更新后 xff0c 老的内核依然保存在 boot 分区内 xff0c 几次升级后 xff0c 就会提示
  • 什么是死锁,产生死锁的原因及必要条件

    什么是死锁 xff1f 所谓死锁 xff0c 是指多个进程在运行过程中因争夺资源而造成的一种僵局 xff0c 当进程处于这种僵持状态时 xff0c 若无外力作用 xff0c 它们都将无法再向前推进 因此我们举个例子来描述 xff0c 如果此
  • nvidia jetson TX2 踩坑解决记录

    最近拿着一张多年前实验室买的Jetson想刷个软路由玩 xff0c 奈何折腾了一周才把clash meta内核装好 xff0c 记录一下自己踩的坑 xff0c 整理一下以免其他玩jetson TX2的兄弟掉大坑 已经过去一周了很多都记不太清
  • 接收灵敏度

    接收灵敏度是检验基站接收机接收微弱信号的能力 xff0c 它是制约基站上行作用距离的决定性技术指标 xff0c 也是RCR STD 28协议中 xff0c 空中接口标准要求测试的技术指标之一 合理地确定接收灵敏度直接地决定了大基站射频收发信

随机推荐

  • 16行,使用Python制作简易版QQ自动回复机器人(windows版)

    目录 1 安装go cqhttp 2 使用go cqhttp 2 1 发送信息 2 1 1发送 你好 2 1 2 在群里 64 人 2 2获取群成员列表 2 3 实现QQ机器人 1 安装go cqhttp 点此安装go cqhttp xff
  • FPGA在线升级实战应用篇

    FPGA在线升级实战应用篇 1 摘要 项目在运营过程中可能需要根据应用需求更改固件 xff0c 或者对现有产品进行升级及在产品使用过程出现的故障进行分析 xff0c 故需要对产品进行升级维护 以往的产品出现的故障或BUG问题只能通过产品寄回
  • (xTaskNotify)- assert failed! 错误的修复

    今日在测试ESP32代码的时候 xff0c 使用xTaskNotify发生错误 xff0c 提示如下 xff1a xTaskNotify assert failed xff0c 然后系统重启 找了一下原因 xff0c 在xTaskNotif
  • kubernetes dashboard用户界面安装使用

    原文 xff1a https www toocruel net kubernetes dashboardyong hu jie mian an zhuang shi yong 1 下载kubernetes dashboard yaml文件
  • 网络通信编程学习笔记(四):在Ubuntu下创建新用户、用puTTY/VNCViewer远程登录、用ftp上传和下载、用Xming远程连接

    前言 真的用不惯VNCViewer xff0c 树莓派还是外接显示屏来的舒服 xff0c 分辨率也是1080p xff0c 只有全高清壁纸才可以慰籍学习之痛 xff01 Xming也是不如按开机键来的方便 笑哭 目录 一 用puTTY VN
  • node.js和npm离线安装

    离线安装node js和npm 1 下载官方安装包并拷贝到离线机器上 官方下载地址 xff1a https nodejs org en download 2 解压文件 xff1a tar xJf node v8 9 4 linux x64
  • Github 创建新分支

    一 clone Repository clone Github 上的Repository xff0c 如下 xff1a git clone git 64 github span class hljs preprocessor com spa
  • ARM平台基于嵌入式Linux部署ROS

    By Toradex 秦海 随着ARM平台处理能力的日益强大 xff0c 越来越多的工业智能 机器人应用在ARM平台上面实现 xff0c 在这个过程中不可避免的就涉及到将机器人应用开发框架移植到ARM平台来运行 xff0c 因此本文就着重示
  • 如何设计一款低成本的计算机载板- 第一部分

    By Toradex Peter Lischer 1 简介 在以前的博客文章中 xff0c 我们已经在一个硬件项目中使用计算机模块提出了许多讨论 xff0c 因此 xff0c 这里我们假设你已经在项目中决定采用计算机模块SoM xff0c
  • git rebase后commit id的变化

    经测试发现 xff0c 在执行完 git rebase 之后 xff0c 1 xff09 会生成的新的 commit id 2 xff09 新 commit 与旧 commit 的父节点不相同 3 xff09 旧 commit 的父节点保持
  • 嵌入式Linux下串口调试

    By Toradex秦海 1 简介 UART串口是嵌入式设备最为常用的调试和通讯接口之一 xff0c 无论是RS232还是RS422 485都有着非常广泛的应用 xff0c 因此本文就基于嵌入式Linux演示在User Space进行串口调
  • [LeetCode刷题笔记] 关于LeetCode的前言

    原创文章 转载请注册来源http blog csdn net tostq 又到了一年毕业就业季了 xff0c 三年前的校招季我逃避了 xff0c 可这一次终于还是要轮到我了 61 61 作为要准备踏入码农行业的人来说 xff0c 要准备校招
  • 关于机器视觉标定的pnp问题

    https blog csdn net cocoaqin article details 77485436 https blog csdn net cocoaqin article details 77848588利用二维码求解相机世界坐标
  • kvaser在linux中的应用

    本文主要讲解 xff0c kvaser如何使用简单socketcan 1 硬件 kvaser USBcan Pro 2xHS v2 2 准备系统 ubuntu 16 04 由于项目需要在程序中使用socketcan xff0c 所以需要将k
  • 编译错误-build stopped: subcommand failed. 解决方法

    make 1 Leaving directory 96 home sunhz sl8541e out target product sp8541e srvm obj u boot15 39 make Leaving directory 96
  • 如何备份jetson nano 的u盘系统?

    使用工具 xff1a Win32DiskImager 备份步骤 xff1a 1 在本地盘 xff08 C盘或D盘都行 xff0c 盘符剩余内存大于u盘系统内存就行 xff09 新建文本文档 xff0c 连后缀名字一起改成backup img
  • Security Onboard Communication-SecOC

    一 通讯加密的必要性 随着汽车电子的发展及整车功能复杂性的提高 xff0c 车载控制器数量从之前的寥寥几个增加至规模复杂的上百个 基于功能的需求 xff0c 各个控制器每时每刻需要进行大量数据的交互 xff0c 数据交互的方式也多种多样 x
  • 解决VNC远程连接树莓派,窗口显示不全的问题,亲测可行!!

    哇 xff0c 就在刚刚才百度到解决VNC远程连接树莓派 xff0c 窗口显示不全的问题 xff0c 昨晚上查了一晚上都没搞定 xff0c xff0c xff0c 首先说下问题吧 xff0c 就是用VNC远程连接树莓派后 xff0c 会出现
  • Avoid mutating a prop directly since the value will be overwritten whenever

    在vue中 父组件向子组件传值 并在子组件改变了父组件的值 就会发出警告 所有的 prop 都使得其父子 prop 之间形成了一个单向下行绑定 xff1a 父级 prop 的更新会向下流动到子组件中 xff0c 但是反过来则不行 这样会防止
  • 【Linux】异步网络库dyad代码阅读

    简介 dyad是一个基于C编写的异步网络库 xff0c 非常精简 xff0c 单C文件 xff0c 仅实现TCP xff0c 很适合用来学习Linux网络编程和异步非阻塞处理 链接 Github链接 基于Dyad的echo server实现