Http协议
- 无状态,使得HTTP服务轻量级-
- HTTP消息冗长:HTTP消息包含消息头,消息体,换行符等。且大多采用文本传输。所以HTTP消息会有很多冗余消息并且消息占用字节数大,消耗过多的带宽
- 半双工通信:同一时刻,数据只能往同一方向传输。比如向服务器发送消息时,服务器此时不可以向客户端发送消息。(不过目前HTTP2已经支持了全双工通信)
1、WebSocket可以由tomcat和netty进行实现,这里选择用netty进行实现
WebSocket
- 底层采用单一TCP链接,全双工通信
- 对代理、防火墙、路由器透明
- 无头部消息,cookie,身份验证
- 通过ping/pong消息保持心跳
代码演示
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
@Component
@Slf4j
public class NettyBooter implements ApplicationRunner, ApplicationListener<ContextClosedEvent>, ApplicationContextAware {
private static final Logger LOGGER= LoggerFactory.getLogger(NettyBooter.class);
@Value("${netty.websocket.port}")
private int port;
@Value("${netty.websocket.ip}")
private String ip;
@Value("${netty.websocket.path}")
private String path;
@Value("${netty.websocket.max-frame-size}")
private int maxFrameSize;
private ApplicationContext applicationContext;
private Channel serverChannel;
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void run(ApplicationArguments args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.localAddress(new InetSocketAddress(this.ip, this.port));
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChannelInboundHandlerAdapter(){
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof FullHttpRequest) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
String uri = fullHttpRequest.uri();
if (!uri.equals(path)) {
ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND))
.addListener(ChannelFutureListener.CLOSE);
return ;
}
}
super.channelRead(ctx, msg);
}
});
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new WebSocketServerProtocolHandler(path, null, true, maxFrameSize));
pipeline.addLast(applicationContext.getBean(WebsocketMessageHandler.class));
}
});
Channel channel = serverBootstrap.bind().sync().channel();
this.serverChannel = channel;
LOGGER.info("websocket 服务启动,ip={},port={}", this.ip, this.port);
channel.closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@Override
public void onApplicationEvent(ContextClosedEvent contextClosedEvent) {
if (this.serverChannel != null) {
this.serverChannel.close();
}
LOGGER.info("websocket 服务停止");
}
}
- WebsocketMessageHandler .java
package com.canbot.enterprise.websocket.netty;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Sharable
@Component
public class WebsocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketMessageHandler.class);
public static ChannelGroup channelGroup;
static {
channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
if (msg instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
sendMessage(ctx);
} else {
ctx.channel().writeAndFlush(WebSocketCloseStatus.INVALID_MESSAGE_TYPE).addListener(ChannelFutureListener.CLOSE);
}
}
public void sendMessage(ChannelHandlerContext ctx){
ctx.channel().eventLoop().scheduleWithFixedDelay(() -> {
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器发送消息了"+System.currentTimeMillis()));
}, 0,30, TimeUnit.SECONDS);
}
public void sendMessage(){
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
long oneDay = 24 * 60 * 60 * 1000;
long initDelay = getTimeMillis("15:48:00") - System.currentTimeMillis();
initDelay = initDelay > 0 ? initDelay : oneDay + initDelay;
scheduledThreadPool.scheduleAtFixedRate(
()->{
channelGroup.writeAndFlush(new TextWebSocketFrame("服务器主动推送消息。当前服务器时间:"+System.currentTimeMillis()));
},
initDelay,
oneDay,
TimeUnit.MILLISECONDS);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
channelGroup.add(ctx.channel());
sendMessage();
LOGGER.info("链接创建:{}", ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channelGroup.remove(ctx.channel());
LOGGER.info("链接断开:{}", ctx.channel().remoteAddress());
}
private static long getTimeMillis(String time) {
try {
DateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
DateFormat dayFormat = new SimpleDateFormat("yy-MM-dd");
Date curDate = dateFormat.parse(dayFormat.format(new Date()) + " " + time);
return curDate.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return 0;
}
}
总结:
sendMessage()实现定时向所有在线的机器推送消息,sendMessage(ctx)实现向指定机器发送延迟消息。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)