要从套接字读取数据,您可以使用RecordParser
。在套接字连接上,数据通常由换行符分隔:
RecordParser parser = RecordParser.newDelimited("\n", sock);
A RecordParser
是一个 Vert.xReadStream
所以它可以转化为Flowable
:
FlowableHelper.toFlowable(parser)
现在如果一个EchoRequestMessage
可以从创建Buffer
:
public class EchoRequestMessage {
private String message;
public static EchoRequestMessage fromBuffer(Buffer buffer) {
// Deserialize
}
public String getMessage() {
return message;
}
}
And an EchoResponseMessage
转换为Buffer
:
public class EchoResponseMessage {
private final String message;
public EchoResponseMessage(String message) {
this.message = message;
}
public Buffer toBuffer() {
// Serialize;
}
}
您可以使用 RxJava 运算符来实现回显服务器流程:
vertx.createNetServer().connectHandler(sock -> {
RecordParser parser = RecordParser.newDelimited("\n", sock);
FlowableHelper.toFlowable(parser)
.map(EchoRequestMessage::fromBuffer)
.map(echoRequestMessage -> {
return new EchoResponseMessage(echoRequestMessage.getMessage());
})
.subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> {
throwable.printStackTrace();
sock.close();
}, sock::close);
}).listen(1234);
[编辑] 如果在您的协议消息中不是行分隔而是长度前缀,那么您可以创建自定义ReadStream
:
class LengthPrefixedStream implements ReadStream<Buffer> {
final RecordParser recordParser;
boolean prefix = false;
private LengthPrefixedStream(ReadStream<Buffer> stream) {
recordParser = RecordParser.newFixed(4, stream);
}
@Override
public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
recordParser.exceptionHandler(handler);
return this;
}
@Override
public ReadStream<Buffer> handler(Handler<Buffer> handler) {
if (handler == null) {
recordParser.handler(null);
return this;
}
recordParser.handler(buffer -> {
if (prefix) {
prefix = false;
recordParser.fixedSizeMode(buffer.getInt(0));
} else {
prefix = true;
recordParser.fixedSizeMode(4);
handler.handle(buffer);
}
});
return this;
}
@Override
public ReadStream<Buffer> pause() {
recordParser.pause();
return this;
}
@Override
public ReadStream<Buffer> resume() {
recordParser.resume();
return this;
}
@Override
public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
recordParser.endHandler(endHandler);
return this;
}
}
并将其转换为Flowable
:
FlowableHelper.toFlowable(new LengthPrefixedStream(sock))