socket(二)–Tcp同步非阻塞式
文章目录
- socket(二)--Tcp同步非阻塞式
- 一、简介
- 二、关键类
- 2.1 ServerSocketChannel
- 2.2 SocketChannel
- 2.3 Selector
- 2.4 SelectionKey
- 2.5 Buffer
- 三、示例
-
- 四、注意
一、简介
同步阻塞式通信,工作线程一次只能处理一个连接请求,服务完成后,才可处理下一个连接请求。针对这种情况,有两种解决方案:
- 采用多线程编程,即是:主线程负责接收连接请求,连接成功后,服务的过程(即是数据交互的过程)交由其它多个线程处理;
- 采用非阻塞式通信,主要由java.nio包中的类实现。实现方式是,工作线程采用轮询的方式,不停监控连接事件、读事件、写事件,当某事件发生时,就进行相应事件的处理。
这里对第2种方式的tcp同步非阻塞通信方式进行介绍。
二、关键类
2.1 ServerSocketChannel
ServerSocketChannel是ServerSocket的替代类,支持阻塞和非阻塞两种方式,默认是阻塞式的。
2.2 SocketChannel
SocketChannel是Socket的替代类,同样支持阻塞和非阻塞两种式,默认是阻塞式的。
2.3 Selector
监听器,用于监听连接就绪事件、读就绪事件、写就绪事件。方法有:
keys():注册的所有事件;
selectedKeys():已经发生的事件;
2.4 SelectionKey
事件的句柄,当某事件的selectionKey对象位于Selector对象的selectedKeys()集合中时,表示事件发生。
具体事件有:
SelectionKey.OP_ACCEPT:接收连接就绪事件,表示至少有一个连接进来;
SelectionKey. OP_CONNECT:连接就绪事件,表示与服务连接成功;
SelectionKey.OP_READ:读就绪事件,表示输入流有数据了,可进行数据读;
SelectionKey.OP_WRITE:写就绪事件,表示可以向输出流写数据了。
2.5 Buffer
由于数据输入输出比较耗时,buffer用于缓冲,buffer有三个属性:
capacity:即容量,表示缓冲区可以保存多少数据;
limit:即极限,表示当前缓冲区的终点,读取操作只有在极限范围内。极限可以修改,可用于重用缓冲区,是非负整数,不能大于容量;
position:即位置,表示下一次读写的位置,是非负整数,不能大于极限。
另外buffer提供了改变上述属性的方法:
clear():极限设为容量,位置设为0;
flip():极限设为位置,位置设为0;
rewind():位置设为0;
compact():压缩,即删除0到位置间的数据,然后将位置与极限之前的数据移动,使位置再次为0;
三、示例
这里以创建服务端和客户端,两者可自由发送消息给对方为例。解释请查看注释。
3.1 服务端代码
import org.apache.commons.lang.StringUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class TcpNioServerSocketChannelPaperMain {
public static void main(String[] args) throws Exception {
int port = 7001;
Charset charset = Charset.forName("utf-8");
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(new InetSocketAddress(port));
System.out.println("服务启动");
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
Selector selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
if (selector.select(1000) < 1) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
System.out.println("receive connection, address:" + sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
ByteBuffer attachBuffer = ByteBuffer.allocateDirect(1024);
sc.register(selector, SelectionKey.OP_READ, attachBuffer);
new WriteThread(sc).start();
} else if (key.isReadable()) {
ByteBuffer attachBuffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
try {
sc.read(readBuffer);
readBuffer.flip();
String msg = charset.decode(readBuffer).toString();
System.out.println("server receive msg : " + msg);
readBuffer.clear();
if ("bye".equals(msg) || StringUtils.isBlank(msg)) {
key.interestOps(key.interestOps() & SelectionKey.OP_READ);
sc.close();
}
} catch (Exception e) {
sc.close();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/
* 发送消息线程
*/
public static class WriteThread extends Thread {
private SocketChannel sc;
public WriteThread(SocketChannel sc) {
this.sc = sc;
}
@Override
public void run() {
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
while (true) {
try {
if(!sc.isConnected()){
break;
}
System.out.print("server send msg:");
String msg = scanner.nextLine();
if ("bye".equals(msg)) {
sc.close();
break;
}
if (StringUtils.isBlank(msg)) {
continue;
}
writeBuffer.put(msg.getBytes());
writeBuffer.flip();
sc.write(writeBuffer);
writeBuffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
3.2 客户端代码
import org.apache.commons.lang.StringUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class TcpNioSocketChannelPaperMain {
public static void main(String[] args) throws Exception {
Charset charset = Charset.forName("utf-8");
String host = "127.0.0.1";
int port = 7001;
SocketChannel sc = SocketChannel.open();
InetSocketAddress isa = new InetSocketAddress(host, port);
sc.configureBlocking(false);
sc.connect(isa);
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
Selector selector = Selector.open();
sc.register(selector, SelectionKey.OP_CONNECT);
while (true) {
if (selector.select() < 1) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
try {
if (key.isConnectable()) {
System.out.println("连接服务器成功");
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.register(selector, SelectionKey.OP_READ);
if (socketChannel.isConnectionPending()) {
socketChannel.finishConnect();
}
new WriteThread(socketChannel).start();
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
try {
socketChannel.read(readBuffer);
readBuffer.flip();
String msg = charset.decode(readBuffer).toString();
if (StringUtils.isBlank(msg)) {
continue;
}
System.out.println("client receive msg : " + msg);
readBuffer.clear();
if ("bye".equals(msg) || StringUtils.isBlank(msg)) {
key.interestOps(key.interestOps() & SelectionKey.OP_READ);
socketChannel.close();
}
} catch (Exception e) {
socketChannel.close();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/
* 发送消息线程
*/
public static class WriteThread extends Thread {
private SocketChannel sc;
public WriteThread(SocketChannel sc) {
this.sc = sc;
}
@Override
public void run() {
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
while (true) {
try {
if (!sc.isConnected()) {
break;
}
System.out.print("client send msg:");
String msg = scanner.nextLine();
if ("bye".equals(msg)) {
sc.close();
break;
}
if (StringUtils.isBlank(msg)) {
continue;
}
writeBuffer.put(msg.getBytes());
writeBuffer.flip();
sc.write(writeBuffer);
writeBuffer.clear();
} catch (IOException e) {
e.printStackTrace();
try {
sc.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
}
四、注意
1、通常写事件不进行注册,可直接调socketChannel.write()直接发送。
写就绪事件,监控的是内核写缓冲区是否可写,当内核写缓冲区有空闲空间时,就会发生写就缓绪事件,而一般情况下内核写缓冲区是空闲的,这会导致cpu空转。所以一般不注册写就绪事件,或者写时再注册,写完就取消。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)