tcp业务层粘包和半包理解及处理

2023-11-11

tcp粘包处理

tcp是流式传输的,是安全的, 可靠的,顺序的。

udp是数据报协议,是不可靠的。

面试中经常被问到tcp粘包是如何处理的,通过百度和自己的理解,这里做笔记记录。

如果有不对,请指正~

参考:https://bbs.csdn.net/topics/380167545

业务层认识tcp传输及分析方案

tcp是流式传输的,tcp协议栈可以保证传输过程的顺序,可靠性

​ 也就是说,发送端调用send发送,接收端肯定能按照发送顺序依次接收到。

tcp是流式传输的,协议栈只是把接收到的数据放入到对应连接fd的缓冲区中。(待从源码确定)

1:分析tcp协议栈的接收:

1:协议栈只是负责把接收到的数据放入应用层缓冲区(很大的文件传输时,可能一次放不下)。

​ ==》自己的误区,tcp是安全的流式传输,缓冲区接收到的数据必然是完整的,有序的,只会涉及到粘包处理,不涉其他,这里要考虑大包的多次接收。

2:获取缓冲区中的包,可能是多个小包粘合在一起的大包(需要做粘包处理)

​ ==》小包的粘包问题,需要再业务层做处理,nagle算法

3:获取缓冲区中的包,可能是一个大包拆分出的多个小包进行传输 (接收到半包)

​ ==》默认是MTU大小,协议栈会拆包,这个应该是协议栈做了处理,收到还是一个大包,我们只需要循环接收做业务处理(并不需要组包处理)。
​ ==》recv时,根据长度用while循环做接收,确保包的完整。(拆包后的包应该是依次接收到,从缓冲区中获取到)

4:放入缓冲区中数据的完整性,协议栈是否是一次把一整个包放进来的,应该是的(看看源码吧,协议栈是如何处理的?)
==》recv时,需要用while循环做多次的接收。

2:tcp的接收,我们需要关注的问题:

1:对粘包问题做拆包处理。(需要业务层定义协议,增加结束符,固定长度,或者指定长度)

2:对协议栈大包拆包后的数据做组包处理/持续接收半包)。 ==》流式可靠顺序传输的

​ ==》同一个连接,用while循环一直进行接收,应该时可以保证持续接收到的数据时这个大包的数据。

​ ==》根据业务,需要在业务层,对大包进行自定义协议拆包后组包(用户控制发小包),或者根据长度持续接收(确保一个大包的接收完整),或者用缓冲区保存(可能多个连接同时处理的业务)等处理。

3:recv一次时,不一定能获取到一整个包。 (半包)==》应该还是只有大于MTU时拆包场景

​ ==》同2,while多次循环接收,通过长度/校验码确保数据的完整可靠。

4:如果要关注业务层的消息机制,消息的完整性,正确性,需要业务层对消息做一定的协议处理。

3:总结及方案

1:tcp是流式安全的,传输接收到的数据必然是顺序的,有序的:

​ ==》1:半包问题(协议栈拆包),多次while循环进行接收。

​ ==》2:粘包问题,需要在业务层定义协议处理。

2:处理粘包问题的方案:

​ 1:发送固定长度的字节,不够时补充特定字符如0

​ 2:末尾终结符 如\r\n,如FTP协议

​ 3:区分消息为头部和消息体,收到足够的数据确定包的完整性

​ 4:混合使用在应用层做拆包和粘包的处理。(头+类型+长度+数据+尾)

实现一个tcp粘包处理的demo

这里没有考虑大包的拆包时,缓冲区的细节机制。

简单描述:

1:定义用户层协议,使用“头+data+尾部”(这里的头部和尾部自己定义比较随意)的方案,方便粘包处理以及保证数据完整性。(data数据区可以扩展协议)

2:用一个ringbuffer暂存接收到的数据(暂时不考虑半包问题,可以用多个ringbuffer保存不同的fd的缓存做处理/发送端做拆包处理,ringbuffer接收后做业务层作保处理)。

3:接收到数据后放入ringbuffer中,并根据头+尾检测ringbuffer中数据的完整性,这里只是做打印(业务处理以及组包协议定义等都可以自由扩展)

测试demo:

tcp_ringbuffer.h

/*************************************
实现针对tcp接收处理的业务ringbuffer
	1:ringbuffer中的数据肯定是完整的包,然后处理粘包问题
	2:作为tcp的服务端,所有的接收放入包中,来自不同的客户端和不同的消息需要扩展处理
	3:如果发送很大很大的文件,tcp是如何处理的? 会是一次性一直接收吗?
*************************************/
#ifndef __RINGBUFFER_H_
#define __RINGBUFFER_H_

typedef struct RINGBUFF_T{
	void * data;
	unsigned int size;
	unsigned int read_pos;   //数据起始位置
	unsigned int write_pos;  //数据终止位置
}ringbuffer_t;

//创建ringbuffer
ringbuffer_t * ringbuffer_create(unsigned int size);
//销毁ringbuffer
void ringbuffer_destroy(ringbuffer_t * ring_buffer);

//往ringbuffer中存数据 写入
int ringbuffer_put(ringbuffer_t * ring_buffer, const char* buffer, unsigned int len);
//判断是否是完整的数据  然后进行处理
int ringbuffer_get_len(ringbuffer_t *ring_buffer);
//依赖ringbuffer_get_len 返回值申请内存,取出ring_buffer中的数据
int ringbuffer_get(ringbuffer_t * ring_buffer, char * buffer, unsigned int len);

//基本接口 外部基本不用,但是函数内部有使用
//重置缓冲区
void ringbuffer_reset(ringbuffer_t * ring_buffer);
//ringbuffer已经使用的内存空间的大小
int ringbuffer_use_len(ringbuffer_t * ring_buffer);
//ringbuffer没有使用的内存的大小
int ringbuffer_space_len(ringbuffer_t * ring_buffer);

//基本的判空和判满接口
int ringbuffer_isempty(ringbuffer_t * ring_buffer);
int ringbuffer_isfull(ringbuffer_t * ring_buffer);

// int get_ringbuffer_size(ringbuffer_t * ring_buffer);
#endif //__RINGBUFFER_H_

tcp_ringbuffer.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>


#include "tcp_ringbuffer.h"

static inline __attribute__((const))
int is_power_of_2(unsigned long n)
{
	return (n != 0 && ((n & (n - 1)) == 0));
}

static unsigned long roundup_power_of_two(unsigned long n)
{
    if((n & (n-1)) == 0)
        return n;
    
    unsigned long maxulong = (unsigned long)((unsigned long)~0);
    unsigned long andv = ~(maxulong&(maxulong>>1));

    while((andv & n) == 0)
        andv = andv>>1;

    return andv<<1;
}

//创建ringbuffer 
ringbuffer_t * ringbuffer_create(unsigned int size)
{
	//对入参进行校验 并且是2的次方
	if (!is_power_of_2(size)) {
        size = roundup_power_of_two(size);
	}

	ringbuffer_t * ring_buffer;
	ring_buffer = (ringbuffer_t*)malloc(sizeof(*ring_buffer));
	if(ring_buffer == NULL)
	{
		printf("create ringbuffer error \n");
		return NULL;
	}

	ring_buffer->data = (void*)malloc(size);
	if(ring_buffer->data == NULL)
	{
		printf("create ringbuffer data error \n");
		free(ring_buffer);
		return NULL;
	}

	ring_buffer->size = size;
	ring_buffer->read_pos = 0;
	ring_buffer->write_pos = 0;
	return ring_buffer;
}

//销毁ringbuffer
void ringbuffer_destroy(ringbuffer_t * ring_buffer)
{
	if(ring_buffer)
	{
		if(ring_buffer->data)
		{
			free(ring_buffer->data);
			ring_buffer->data = NULL;
		}
		free(ring_buffer);
		ring_buffer = NULL;
	}
}


// typedef struct RINGBUFF_T{
// 	void * data;
// 	unsigned int size;
// 	unsigned int read_pos;   //数据起始位置
// 	unsigned int write_pos;  //数据终止位置
// }ringbuffer_t;


//往ringbuffer中存数据 写入
int ringbuffer_put(ringbuffer_t * ring_buffer, const char* buffer, unsigned int len)
{
	if(ring_buffer->write_pos >=ring_buffer->read_pos &&(len <(ring_buffer->size - ring_buffer->write_pos +ring_buffer->read_pos)))
	{
		//进行拷贝
		if(ring_buffer->size - ring_buffer->write_pos >len)
		{
			memcpy(ring_buffer->data + ring_buffer->write_pos, buffer, len);
			ring_buffer->write_pos += len;
		}else
		{
			unsigned int right_space_len = ring_buffer->size - ring_buffer->write_pos;
			memcpy(ring_buffer->data + ring_buffer->write_pos, buffer, right_space_len);
			memcpy(ring_buffer->data, buffer+right_space_len, len - right_space_len);
			ring_buffer->write_pos = len - right_space_len;
		}
		return 0;
	}

	if(ring_buffer->write_pos <ring_buffer->read_pos && (ring_buffer->read_pos - ring_buffer->write_pos) >len)
	{
		memcpy(ring_buffer->data + ring_buffer->write_pos, buffer, len);
		ring_buffer->write_pos += len;
		return 0;
	}

	return -1;
}

//判断是否是完整的数据  然后进行处理
int ringbuffer_get_len(ringbuffer_t *ring_buffer)
{
	//对ringbuffer中的数据做判断解析  如果是完整的数据  则提取出去
	if(ringbuffer_use_len(ring_buffer) < strlen("FFFF0D0A<header><tail>0D0AFEFE"))
	{
		printf("ringbuffer data is error [%d], [%ld]\n", ringbuffer_use_len(ring_buffer),  strlen("FFFF0D0A<header><tail>0D0AFEFE"));
		return -1;
	}
	//判断是否是终结的字段
	const char* end_str = "<tail>0D0AFEFE";
	char check_end_str[20] = {0};
	if(ring_buffer->write_pos >strlen(end_str))
	{
		memcpy(check_end_str, ring_buffer->data+ring_buffer->write_pos - (strlen(end_str)),  strlen(end_str));
	}else
	{
		unsigned int left_len = ring_buffer->write_pos;
		memcpy(check_end_str, ring_buffer->data +ring_buffer->size - (strlen(end_str) - left_len), ring_buffer->size - (strlen(end_str) - left_len));
		memcpy(check_end_str + (strlen(end_str) - left_len), ring_buffer->data, left_len);
	}
	printf("get check_end_str is %s \n", check_end_str);


	char * ret_addr = strstr(check_end_str, end_str);
	if(ret_addr == NULL)
	{
		return -1;
	}

	if(check_end_str - ret_addr != 0)
	{
		printf("DDDDD :why end string is error");
		return -1;
	}

	return ringbuffer_use_len(ring_buffer);
}

//从ringbuffer中取数据做处理, 判断接收到的字符是否是终结符号,就可以去做处理
//取完数据后重置ringbuffer的位置  读取
int ringbuffer_get(ringbuffer_t * ring_buffer, char * buffer, unsigned int len)
{
	//这里建立在ringbuffer_get_len 的基础上,传入入参,取出数据
	int data_len = ringbuffer_use_len(ring_buffer);
	if(data_len >= len)
	{
		printf("para buffer is not enough space \n");
		return -1;
	}

	if(ring_buffer->write_pos >ring_buffer->read_pos )
	{
		printf("get data from ringbuffer len: [%d] \n", ring_buffer->write_pos - ring_buffer->read_pos);
		memcpy(buffer, ring_buffer->data + ring_buffer->read_pos, data_len);
	}else
	{
		memcpy(buffer, ring_buffer->data+ring_buffer->read_pos, ring_buffer->size - ring_buffer->read_pos);
		memcpy(buffer+ring_buffer->size - ring_buffer->read_pos, ring_buffer->data, data_len - (ring_buffer->size - ring_buffer->read_pos));
	}

	ring_buffer->write_pos = 0;
	ring_buffer->read_pos = 0;
	return 0;
}

//直接从socket中读数据放入ringbuffer中也可以
int ringbuffer_get_from_dev()
{
	return 0;
}
//直接从ringbuffer中取数据用socket进行发送
int ringbuffer_put_to_dev()
{
	return 0;
}

void ringbuffer_reset(ringbuffer_t * ring_buffer)
{
	ring_buffer->read_pos = ring_buffer->write_pos = 0;
}

int ringbuffer_use_len(ringbuffer_t * ring_buffer)
{
	if(ring_buffer->write_pos >= ring_buffer->read_pos)
	{
		return ring_buffer->write_pos-ring_buffer->read_pos;
	}

	return ring_buffer->write_pos + ring_buffer->size - ring_buffer->read_pos;
}

int ringbuffer_space_len(ringbuffer_t * ring_buffer)
{
	if(ring_buffer->write_pos >= ring_buffer->read_pos)
	{
		return ring_buffer->read_pos +(ring_buffer->size - ring_buffer->write_pos);
	}

	return ring_buffer->read_pos - ring_buffer->write_pos;
}

int ringbuffer_isempty(ringbuffer_t * ring_buffer)
{
	return ringbuffer_use_len(ring_buffer) == 0? 0 :-1;
}

int ringbuffer_isfull(ringbuffer_t * ring_buffer)
{
	return ringbuffer_space_len(ring_buffer) == 0? 0 :-1;
}

// int get_ringbuffer_size(ringbuffer_t * ring_buffer)
// {
// 	return ring_buffer->size;
// }

tcp_sticky_bag.c

//在业务层实现对tcp的粘包,拆包处理逻辑
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>

#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

#include "tcp_ringbuffer.h"
/*********************************************************************
https://bbs.csdn.net/topics/380167545
梳理一下有关tcp数据包的定义方案:
	TCP的可靠性是到哪种程度,在传输过程中数据的内容或长度有没有可能出错,不管是意外还是人为,数据的内容是可能出错的。 
	首先LZ的问题很单纯,	===》对于粘包情况,是记录长度拆分好呢,还是按特定分隔符来拆分好。 
	1楼、2楼的回答,		===》通常是用固定长度,或者记录长度 
	3楼就问,				===》结束符的判断不受欢迎?以前的老代码用过结束符,其中难道有什么玄机 
	接着zhao4zhong1			===》就给出个简单的方法base64,将二进制数据流转化成文本来,然后用'\0'作为结束符,
		也就是要将二进制流中的'\0'转译掉,又懒得自己写代码,所以用了现成的base64,虽然很浪费资源,但也凑合着可以用。 
		单到这点上,还是个很好的答案的。 
	不过接下来的问题就奇怪了,竟然奇怪的问了用包头加包体几个问题。一下子引得无数人进攻。然后就是口水了。 
	
	使用分隔符和包头加包体的方案,都是为了处理数据的安全性的。
		1、仅收到一个字节时怎么办。 :你用base64时仅收到一个字母而没有受到'\0'时怎么办? :你怎么办这里就怎么办 
		2、收到两个字节的包头比如FF FF(表示后续包体长度为65535,或者包头+包体长度65535即后续包体长度65533),但后续收到数据长度迟迟达不到要求怎么办? :你用base64时一直收到字母和数字,就是收不到'\0'时怎么办? :你怎么办这里就怎么办 
		3、比如约定每个包最大长度为0x1000,但收到的包头中长度超过0x1000怎么办? :base64中可能出现的字符范围是固定的,但收到的数据里有超过范围的字符怎么办?比如收到个值为00000001的字节 :对于这种数据出错的情况,你怎么办的这里就这么办 
		4、应用需要发送超过65535个字节的包怎么办 :base64的字符只有64种来表示数据,对于一个值>64的字节要怎么表示? :base64可以把数据分成很多个字符来表示,这里为什么就不能把大数据分成很多个包后再传? 
	……………………………… 接下来大多都是无谓的口水了(我板凳看了2小时的热闹) ……………………………… 
	还是回到LZ这个很单纯的问题上来吧,是记录长度拆分好呢,还是按特定分隔符来拆分好? 
	从计算机逻辑上来讲,其实没什么区别,都是要有校验有分割,而从人的思维逻辑上讲,我还是喜欢记录长度的方式。 
		因为如果用分隔符,就要对每个字节都进行转译,发送前转译,收到后还要转译回来,虽然对于编程来说,就是一个简单的循环语句,但在心理上,却有一种对所有数据都要进行变换的这种很麻烦的感觉。
		而按长度来,只要处理少数几个字节,真正的数据体是原样copy的,虽然从编程语句上来说没什么变化,但从心理上,却有一种数据就是原样搬来搬去,这样清爽的感觉。 
		所以,我建议大家用长度。
	qq120848369在22楼说的就是大众格式,LZ在26楼也赞同了。 
		--------原文: 头和尾基本用来做校验, 不是拿来做边界的. 头+类型+长度+数据+尾, 这种结构就可以. 拆包就是: 检验头, 然后拆出类型+长度, 然后根据长度拆数据, 然后检验尾巴. 
		-------- 顺便再啰嗦一下,我自己现在这个应用中,只传输控制命令,而且是加密成128字节的数据段,加解密的算法也是自己写的,本身就有校验,上层逻辑还有数据是否合法的判断,
			所以在网络这块什么多余的都不用考虑,就一次128字节的收发,超时设为10秒,有错就重来,连错3次就重连,连续3次重连就拒绝此IP五分钟,逻辑上单纯又直白。

*****************************************************************************/

/****************************************************************************
一直不理解面试时为什么会问到tcp粘包相关知识,以及业务层对tcp粘包需要做的处理到底有哪些。
	然后通过探索,总结,按照自己的思路理解一下:
	1:协议栈只是负责把接收到的数据放入应用层缓冲区 				==》自己的误区,tcp是安全的流式传输,缓冲区接收到的数据必然是完整的,有序的,只会涉及到粘包处理,不涉其他,这里要考虑大包的多次接收。
	2:获取缓冲区中的包,可能是多个小包粘合再一起的大包 			==》小包的粘包问题,需要再业务层做处理,nagle算法
	3:获取缓冲区中的包,可能是一个大包拆分出的多个小包进行传输  ==》默认是MTU大小,协议栈会拆包,这个应该是协议栈做了处理,收到还是一个大包,我们只需要循环接收做业务处理(并不需要组包处理)。
		==》recv时,根据长度用while循环做处理。
	4:放入缓冲区中数据的完整性,协议栈是否是一次把一整个包放进来的,会有接收到一个大包,先放入缓冲区一半,有位置再放入下一半?(看看源码吧,协议栈是如何处理的?)
		==》recv时,需要用while循环做多次的接收。

	综上所述,我们需要关注的问题有:
		1:对粘包问题做拆包处理。
		2:对协议栈大包拆包后的数据做组包处理。 ==》协议栈会做,业务层不需要关注。 ==》流式可靠顺序传输的
		3:recv一次时,不一定能获取到一整个包。
			1:recv缓冲区中的数据,一次不一定全获取到包数据,需要根据返回值做处理。 ==》用while+业务实现包的完整性
			2:tcp是流式的,安全的,所以面对的是字符流,能保证各个字节按顺序到达,不会乱序。  ==》直接接收,协议栈的拆包多次发送我们不关注。
		4:如果要关注业务层的消息机制,消息的完整性,正确性,需要对消息做一定的协议处理。

	思考一下处理方案:
		1:发送固定长度的字节,不够时补充特定字符如0
		2:末尾终结符 如\r\n,如FTP协议
		3:区分消息为头部和消息体,收到足够的数据确定包的完整性
		4:再应用层做拆包和粘包的处理。

		可以参考netty对包进行处理。

	总结:tcp是流式安全的,传输接收到的数据必然是顺序的,有序的,但是会有粘包,recv一次性接收不完全的现象,需要我们处理。
*****************************************************************************/

/*************************************
通过测试demo,实现对tcp消息的接收处理:
	1:作为服务端,接收客户端主动上报的各种消息。
	2:自定义协议,做消息构造,方便消息完整性接收,数据校验,粘包处理

实现方案:
	1:实现自定义协议的业务
	2:增加tcp服务端业务,进行demo验证。
*************************************/

//触发accept 连接相关的处理
int demo_accept_exec(int epfd, int listenfd);
//触发数据接收的相关处理
int demo_recv_exec(ringbuffer_t *ringbuff,  int epofd, int connectfd);

//创建服务端的fd
int demo_init_socket();
//加入epoll 真正的处理入口
int demo_server_exec(int listenfd);

//处理接收到的粘包报文
int check_recv_data(char *data,  int len);

int SetNonblock(int fd) {
	int flags;
	flags = fcntl(fd, F_GETFL, 0);
	if (flags < 0)
		return flags;
	flags |= O_NONBLOCK;
	if (fcntl(fd, F_SETFL, flags) < 0) 
		return -1;
	return 0;
}

//创建服务端socket fd
int demo_init_socket()
{
	int fd = socket(AF_INET, SOCK_STREAM, 0);
	if(fd < 0)
	{
		printf("create socket fd error. \n");
		return -1;
	}
	SetNonblock(fd);
	struct sockaddr_in server_addr;
	memset(&server_addr, 0, sizeof(server_addr));
	server_addr.sin_family = AF_INET;
	server_addr.sin_port = htons(9000);
	server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
	//设置端口可重用 也可以设置缓冲区大小等
	int optval = 1;
	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(int));
	bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));

	if(listen(fd, 20) < 0)
	{
		printf("listen sock fd error. \n");
	}

	printf(" listen 9000 port,create socket  fd is %d \n", fd);
	return fd;
}


//要么用accpet做连接监听,开始处理,
//用epoll实现连接与接收的消息通信。
int demo_server_exec(int listenfd)
{
	//创建epfd 并把serverfd加入epfd中
	int epfd = -1;
	{
		epfd = epoll_create(1);
		if(epfd == -1)
		{
			printf("create epoll error \n");
			close(listenfd);
			return -1;
		}

		//把我们的fd加入到epoll中,只监听可读事件,连接处理
		struct epoll_event event;
		event.data.fd 	= listenfd;
		event.events	= EPOLLIN|EPOLLET;

		if(epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event) == -1)
		{
			printf("add listenfd to epoll error \n");
			close(listenfd);
			close(epfd);
			return -1;
		}
	}

	//创建缓冲区
	ringbuffer_t * ringbuff = ringbuffer_create(1024*4);
	if(ringbuff == NULL)
	{
		printf("create ringbuff error, ringbuff is null \n");
		close(listenfd);
		close(epfd);
		return -1;
	}

	struct epoll_event event_wait[1024];
	int nready = 0;
	printf("start to exex server: \n");
	while(1)
	{
		nready = epoll_wait(epfd, event_wait, 1024, 1000);
		if(nready < 0) 
    	{
    		if (errno == EINTR)// 信号被中断
    		{
	    		printf("epoll_wait return and errno is EINTR \n");
                continue;
    		}
            printf("epoll_wait error. \n");
            break;
    	}else if(nready == 0) // 超时,继续
    	{
    		// printf("epoll_wait timeout \n");
    		continue;
    	}

    	//已经准备就绪的fd
    	for(int i=0; i< nready; i++)
    	{
    		//处理可读
    		if(event_wait[i].events & EPOLLIN)
    		{
    			if(event_wait[i].data.fd == listenfd)
    			{
    				demo_accept_exec(epfd, listenfd);
    			}else
    			{
    				demo_recv_exec(ringbuff, epfd, event_wait[i].data.fd);
    			}
    		}

    		//这种情况下应该从epoll中移除,并关闭fd
    		//这里如果不是客户端发完就终止的业务,我们是不是不del,只有异常时del
    		if (event_wait[i].events & (EPOLLERR | EPOLLHUP))
    		{
    			printf("epoll error [EPOLLERR | EPOLLHUP].\n");
    			epoll_ctl(epfd, EPOLL_CTL_DEL, event_wait[i].data.fd, NULL);
    			close(event_wait[i].data.fd);
    		}
    	}
	}
	ringbuffer_destroy(ringbuff);

	return 0;
}

//开始进行accept处理并且把连接fd放入epoll中
int demo_accept_exec(int epfd, int listenfd)
{
	struct sockaddr_in cliaddr;
	socklen_t clilen = sizeof(cliaddr);

	//进行accept接收,因为是et模式,所以需要循环接收
	int clifd = -1;
	while((clifd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen)) >= 0)
	{
	
		// if(clifd == -1)
		// {
		// 	if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) 
		// 	{
	 //            /* 资源暂时不可读,再来一遍 */
	 //            break; //这里应该重复读
	 //        }
	 //        //这里其实是异常的
		// 	printf(" accept error: [%s]\n", strerror(errno));
		// 	return -1;
		// }
		SetNonblock(clifd);
		//加入epoll
		struct epoll_event clifd_event;
		clifd_event.data.fd = clifd;
		clifd_event.events = EPOLLIN | EPOLLET; //ET模式要循环读

		if(epoll_ctl(epfd, EPOLL_CTL_ADD, clifd, &clifd_event) == -1)
		{
			printf(" epoll ctl error . \n");
			close(clifd);
			return -1;
		}
		printf("accept success. [%s:%d] connected \n", inet_ntoa(cliaddr.sin_addr), ntohs(cliaddr.sin_port));	
	}
	
	return 0;
}

//这里只是处理了最常规的accept成功加入epoll以及有数据时的加入
// EPOLLERR 是read或者write时知道对端关闭了
// FFFF0D0A<header>len|data<tail>0D0AFEFE
int demo_recv_exec(ringbuffer_t *ringbuff,  int epfd, int connectfd)
{
	//因为这里用了et模式,所以要循环进行读
	//定义一个缓冲区,对数据进行持续接收以及处理
	printf("start to recv \n");
	char recv_data [1024] = {0};
	while(1)
	{
		//ET模式要一次性读完,读到缓冲区没有数据
		//定义一个内存,每次读取,放入缓冲区中

		int datalen = -1;
		//读取所有的数据
		while((datalen = read(connectfd, recv_data, 1024)) > 0)
		{
			printf("recv data is [%s] [%lu] [%d]\n",  recv_data, strlen(recv_data), datalen);
			if(ringbuffer_put(ringbuff, recv_data, datalen)== 0)
			{
				printf("put to ringbuff success \n");
			}
			memset(recv_data, 0 , 1024);
		}

		//应该是对端关闭 关闭fd,从epfd中移除该fd
		//这里可以改为监听可写  写完后删除
		if(datalen == 0)
		{
			if(epoll_ctl(epfd, EPOLL_CTL_DEL, connectfd, 0) == -1)
			{
				printf("client disconnection error from epoll \n");
			}else
			{
				printf("client disconnected success,clientfd is [%d] \n", connectfd);
			}
			
			close(connectfd);//这里做epoll的删除和fd的close,客户端在发送时会重连
			break;
		}

		if(datalen < 0)   //接收到最后一个报文的返回,这里循环接收必然触发
		{
			printf("DDDDD recv data len <0 \n");
			if (errno == EWOULDBLOCK && errno == EINTR) //不做处理
			{
				continue;
			}
			// if(epoll_ctl(epfd, EPOLL_CTL_DEL, connectfd, 0) == -1)
			// {
			// 	printf("client 1 disconnection error from epoll \n");
			// }else
			// {
			// 	printf("client 1 disconnected success,clientfd is [%d] \n", connectfd);
			// }
			
			// close(connectfd);
			break;
		}
	}

	//获取ringbuff中的数据然后进行显示
	int ringbuff_data_len = ringbuffer_get_len(ringbuff);
	if(ringbuff_data_len  == -1)
	{
		printf("get ringbuff dada error. \n");
		return 0;
	}
	printf("get ringbuff data len is %d \n", ringbuff_data_len);

	char * data = (char*)malloc(ringbuff_data_len +1);
	memset(data, 0, ringbuff_data_len+1);
	ringbuffer_get(ringbuff, data, ringbuff_data_len+1);
	printf("get all data is [%ld][%s] \n", strlen(data), data);
	check_recv_data(data, ringbuff_data_len);
	//这里对取出来的数据做业务处理
	free(data);
	data =NULL;
	return 0;
	//每次读取完一次缓冲区的数据后,对读取到的数据做粘包处理的业务逻辑,应该不涉及丢包的
}

int exec_one_package_data(char* data, int len)
{
	char * print_data = malloc(len+1);
	memset(print_data, 0, len+1);
	memcpy(print_data, data, len);
	printf("111 one package data is [%s][%ld][%d] \n", print_data, strlen(print_data), len);
	free(print_data);
	return 0;
}

//对特定格式的数据做业务处理,单包处理
//这里的数据必然是有我们的终结符尾部的数据,
int exec_one_data(char* data, int len)
{
	//为了打印
	char * print_data = malloc(len+1);
	memset(print_data, 0, len+1);
	memcpy(print_data, data, len);
	printf("one package data is [%s][%ld][%d] \n", print_data, strlen(print_data), len);
	free(print_data);

	//真正的业务处理,1:正确提取到内部数据
	//我们的业务应该是只走这个
	const char * start_str = "FFFF0D0A<header>";
	char * ops;
	ops = strstr(data, start_str);
	if(ops == data)
	{
		//正确的一帧数据,去做正常业务处理
		exec_one_package_data(data + strlen("FFFF0D0A<header>"), len -strlen("FFFF0D0A<header><tail>0D0AFEFE"));
	}	

	if(ops == NULL)
	{
		printf("package data is error, not find start data. \n");
		for(int i=0; i<len; i++)
		{
			printf("%c", *(data+i));
		}
		printf("\n");
	}

	if(ops != data)
	{
		printf("recv package data is error. ");
		for(int i=0; i<len; i++)
		{
			printf("%c", *(data+i));
		}
		printf("\n");
		exec_one_package_data(ops+ strlen("FFFF0D0A<header>"), len - strlen("FFFF0D0A<header><tail>0D0AFEFE") - (ops-data));
	}
	//至于中间包含header,通过日志分析把

	return 0;
}
//客户端正常逻辑应该是 连接一次 然后发送,发送完后断开。
//如果客户端是长连接 连接一次,一直发送,上面是否能满足条件?
//如何实现tcp的长连接,以及如何用session管理

//也可以用mod,在recv后,监听可写,read后监听可读

//如果依赖于tcp的拆包,我们在业务层没有做拆包处理,这里直接循环接收即可,
//然后校验头和尾,做完整的包校验即可

//对接收到的数据做校验,拆包处理
int check_recv_data(char *data,  int len)
{
	if(len <= strlen("FFFF0D0A<header><tail>0D0AFEFE"))
	{
		printf("get data from ringbuff is error, and throw away data : %s.", data);
		return -1;
	}

	//对接收到的数据做拆包处理
	int datalen = -1;
	char * onedata;
	char * ops;
	char * temp_data = data;
	//因为包含着开始的头,所以这里用尾处理更方便
	// const char * start_str = "FFFF0D0A<header>";
	// while((ops = strstr(temp_data, start_str)) != NULL)
	// {
	// 	datalen = ops - temp_data;
	// 	exec_one_data(temp_data, datalen);
	// 	temp_data = ops;
	// }

	const char * end_str = "<tail>0D0AFEFE";
	while((ops = strstr(temp_data, end_str)) != NULL)
	{
		datalen = ops - temp_data +strlen(end_str);
		exec_one_data(temp_data, datalen);
		temp_data = ops+strlen(end_str);
	}

	//有剩下的数据
	if(temp_data - data != len)
	{
		printf("there is loss data: [%ld][%s] \n", strlen(temp_data), temp_data);
	}

	return 0;
}
//扔到ringbuff,然后每一次扔完,需要做校验处理


//暂定的tcp包的协议是  FFFF0D0A<header>len|data<tail>0D0AFEFE
//

//1:接收到对端的数据
//2:放入缓冲区中,可以是环形缓冲区
//3:对缓冲区中的数据做校验处理 
//4:缓冲区对完整数据做业务处理,不完整的数据做等待或者丢弃处理

int main()
{
	int fd = demo_init_socket();
	if(fd < 0)
	{
		printf("create fd error is %d \n", fd);
		return -1;
	}
	printf("create fd success is %d \n", fd);

	//开始循环进行处理 
	demo_server_exec(fd);

	printf("main func end\n");
	return 0;
}

//这里假设报文数据够小,每一次对应的缓冲区都能取完,取到异常的数据那就丢弃
//不然,这里的方案就会复杂,需要管理每一个fd对应的缓冲区的ringbuffer
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

tcp业务层粘包和半包理解及处理 的相关文章

随机推荐

  • EMC测试项目——辐射骚扰

    辐射骚扰 Radiation emission 主要是指能量以电磁波的形式由源发射到空间 或能量以电磁波形式在空间传播的现象 辐射骚扰是电磁兼容的重要内容 也是测试最不容易通过且最难整改的项目 辐射骚扰超标的产品可能引起周围装置 设备或系统
  • rust腐蚀怎么建立单机服务器_腐蚀rust新手入门指南 腐蚀rust怎么开始游戏

    如何开始游戏 巴拉巴拉那么多现在开始步入正轨吧 点击find game 就进入了服务器列表 在这里你可以加入官方的服务器 热闹但高延迟 也可以加入玩家自己设置的服务器 有些服务器不怎么友好详情请看贴吧举报贴 1 官方服务器列表 2和3 玩家
  • 解决JDK版本导致JMeter无法启动问题

    最近在做一个秒杀系统练习时 需要使用JMeter进行压力测试 但是安装JMeter后 出现了以下错误 很明显是JDK的版本问题导致的 但是我又不想改变系统的JDK版本 所以可以下载高版本的JDK 无需改变系统的JDK版本 直接在bin jm
  • nginx-代理多个服务

    目录 1 主机多Ip 1 1单网卡多ip主机配置 1 2修改default conf 1 3server1 conf 1 3server2 conf 1 4测试文件 1 4重启测试 2 主机多端口 2 1server1 conf 2 2se
  • 三个不等_高中数学竞赛常用的不等式归纳(续一)

    当 时 代入 23 为减少篇幅就不在此写出完整的 23式 下同 式得 即 25 25 式正是 22 九 加权不等式 9 1若 且 则 26 26 式就是加权的均值不等式 简称加权不等式 26 式形式直接理解为 几何均值不大于算术均值 十 赫
  • 2020第八届“泰迪杯”特等奖(基于 BERT 深度语言模型的“智慧政务”文本挖掘应用)

    目录 1绪论 1 1 智慧政务 文本挖掘的意义 1 2 智慧政务 文本挖掘的目标 1 3语言智能的里程碑技术 BERT 深度语言模型介绍 1 4本文的总体框架 1 5本文主要的创新之处 2基于 BERT 模型的留言自动分类 2 1任务介绍与
  • 数据库连接池C3P0学习

    数据库连接池C3P0框架是个非常优异的开源jar 高性能的管理着数据源 这里只讨论程序本身负责数据源 不讨论容器管理 一 实现方式 C3P0有三种方式实现 1 自己动手写代码 实现数据源 例如 在类路径下配置一个属性文件 config pr
  • 1-2 继承和接口

    1 继承 关键字extends 父类中私有成员可以被继承 只是外界无法访问 父类中公共属性 方法可以被子类继承 支持单继承 多重继承 单链式继承 不支持多继承 一个类继承多个父类 子类中的方法重写必须是父类中已有的方法 重写后再次调用父类的
  • shell 自动备份 MySQL 数据库脚本

    前提 在当前的机器中 已经安装了 MySQL 并且将 MySQL 已经加入到环境当中 安装 MySQL 和配置 MySQL 环境可参考文章 CentOS 8 通过二进制安装 MySQL 需求 编写 shell 脚本 自动备份 MySQL 数
  • 插入排序和选择排序(普通排序)

    我自己的代码 更容易理解 void XuanZePaiXu int a int n int i j k for int i 0 i lt n i k i for int j i 1 j lt n j if a k gt a j k j if
  • Vue2 _ 实现拖拽功能

    老项目重构 其中有一些拖拽功能 不过用的是两个开源 JS 拖拽文件实现的效果 版本太老了 所以需要换代了 然后就查阅了能够用 Vue 来简单快速实现拖拽的功能实现方法 目录 一 HTML 拖放 二 Vue Draggable 强烈推荐 三
  • NFT.net批量生成NFT头像(汉化版+使用文档)

    本程序NFT net可用于批量快速生成NFT头像 相同风格但不尽相同 原程序由老外开发 本人将其汉化 并制作使用文档 工具参考 NFT net 一个可以批量生成NFT头像的工具 素材参考 B站up主 卡司红茶 汉化版 使用文档 本人首发 如
  • maven如何快速查找某个包哪里引入的

    描述 最近项目中遇到一个问题 有个jar包跟项目的中的代码冲突导致一些奇怪的异常 项目是maven项目 问题查找 由maven官网可知道maven的Dependency plugin就有这个问题的解决方案filtering the depe
  • Sybase的客户端工具

    虽然已经离开用Sybase的项目很久了 但今天突然有同事问我Sybase的客户端工具都用什么 我却不记得当时天天用的什么工具了 上网找了半天才找到软件的名字 在此做个小小的总结 以免以后更想不起来 以下几个工具都是当时常用的 他们各有优缺点
  • panda3d虚幻引擎--(1)

    目录 前言 阿巴阿巴 安装 调整窗口 导入环境 前言 阿巴阿巴 前几天无意间看到了一个叫做panda3d的东西 觉得挺好玩 就翻教程 发现现在中文教程似乎没有那么全面成体系 大部分都是复制粘贴过来官网的实例然后就发布出去了 看得云里雾里的
  • 网络安全与密码学

    1 网络安全威胁 破坏网络安全的一些理论方式 窃听 窃听信息 在网路通信双方直接进行窃听 插入 主动在网络连接中插入信息 可以在message中插入恶意信息 假冒 伪造 spoof 分组中的源地址 假冒客户端或服务器 劫持 通过移除 取代发
  • idea的bug导致的项目编译问题。

    项目代码报红 方法一 删掉依赖的子项目target 然后重新install子项目 本项目重新maven reimport 重新install 方法二 删掉 idea 文件夹 重新引入项目 方法三 invalidate Caches 清除缓存
  • 运算放大器的异常总结——震荡和发热

    上图中 运算放大器输出端F1是保险丝 此处的保险丝有2个作用 1 防止输出端短路 保护运放 2 保险丝本身存在电阻 防止运放震荡 对于第一条不需要过多解释 但是第二条 如果此处保险丝去除改为直通 则因为容性负载的存在 运放可能引起震荡 此外
  • 《画解数据结构》(2 - 5)- 堆 与 优先队列

    画解数据结构 2 5 堆
  • tcp业务层粘包和半包理解及处理

    tcp粘包处理 tcp是流式传输的 是安全的 可靠的 顺序的 udp是数据报协议 是不可靠的 面试中经常被问到tcp粘包是如何处理的 通过百度和自己的理解 这里做笔记记录 如果有不对 请指正 参考 https bbs csdn net to