我有两个线程正在处理非阻塞套接字的 Java NIO。这就是线程正在做的事情:
主题 1:
调用选择器的 select() 方法的循环。如果有可用的密钥,则会对它们进行相应的处理。
话题2:
偶尔会通过调用 register() 将 SocketChannel 注册到选择器。
问题是,除非 select() 的超时时间非常小(比如 100 毫秒左右),否则对 register() 的调用将无限期地阻塞。即使通道配置为非阻塞,并且 javadoc 声明 Selector 对象是线程安全的(但我知道它的选择键不是)。
那么有人对问题可能是什么有任何想法吗?如果我将所有内容都放在一个线程中,该应用程序将完美运行。然后就不会出现问题了,但我真的很想有单独的线程。任何帮助表示赞赏。我在下面发布了我的示例代码:
将 select(1000) 更改为 select(100) 即可。将其保留为 select() 或 select(1000) ,但它不会。
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
公共类 UDPSocket
{
私有 DatagramChannel 客户端通道;
私有字符串 dstHost;
私有 int dstPort;
私有静态选择器recvSelector;
私有静态易失性布尔值已初始化;
私有静态ExecutorService eventQueue = Executors.newSingleThreadExecutor();
公共静态无效初始化()
{
初始化=真;
尝试
{
recvSelector = Selector.open();
}
捕获(IOException e)
{
System.err.println(e);
}
线程 t = 新线程(新 Runnable()
{
@覆盖
公共无效运行()
{
同时(初始化)
{
读取数据();
Thread.yield();
}
}
});
t.start();
}
公共静态无效关闭()
{
初始化=假;
}
私有静态无效readData()
{
尝试
{
int numKeys = recvSelector.select(1000);
if (numKeys > 0)
{
迭代器 i = recvSelector.selectedKeys().iterator();
while(i.hasNext())
{
SelectionKey key = i.next();
i.remove();
if (key.isValid() && key.isReadable())
{
DatagramChannel channel = (DatagramChannel) key.channel();
// allocate every time we receive so that it's a copy that won't get erased
final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE);
channel.receive(buffer);
buffer.flip();
final SocketSubscriber subscriber = (SocketSubscriber) key.attachment();
// let user handle event on a dedicated thread
eventQueue.execute(new Runnable()
{
@Override
public void run()
{
subscriber.onData(buffer);
}
});
}
}
}
}
捕获(IOException e)
{
System.err.println(e);
}
}
公共 UDPSocket(字符串 dstHost, int dstPort)
{
尝试
{
this.dstHost = dstHost;
this.dstPort = dstPort;
clientChannel = DatagramChannel.open();
clientChannel.configureBlocking(假);
}
捕获(IOException e)
{
System.err.println(e);
}
}
公共无效addListener(SocketSubscriber订阅者)
{
尝试
{
DatagramChannel serverChannel = DatagramChannel.open();
serverChannel.configureBlocking(假);
DatagramSocket套接字 = serverChannel.socket();
socket.bind(new InetSocketAddress(dstPort));
SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ);
key.attach(订阅者);
}
捕获(IOException e)
{
System.err.println(e);
}
}
公共无效发送(ByteBuffer缓冲区)
{
尝试
{
clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort));
}
捕获(IOException e)
{
System.err.println(e);
}
}
公共无效关闭()
{
尝试
{
clientChannel.close();
}
捕获(IOException e)
{
System.err.println(e);
}
}
}