如何在客户端使用 Java 读取 gRPC 中的元数据

2024-03-29

我正在使用 Java 和 Protoc 3.0 编译器,我的 proto 文件如下所述。https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang

syntax = "proto3";

package Telemetry;

// Interface exported by Agent
service OpenConfigTelemetry {
    // Request an inline subscription for data at the specified path.
    // The device should send telemetry data back on the same
    // connection as the subscription request.
    rpc telemetrySubscribe(SubscriptionRequest)                     returns (stream OpenConfigData) {}

    // Terminates and removes an exisiting telemetry subscription
    rpc cancelTelemetrySubscription(CancelSubscriptionRequest)      returns (CancelSubscriptionReply) {}

    // Get the list of current telemetry subscriptions from the
    // target. This command returns a list of existing subscriptions
    // not including those that are established via configuration.
    rpc getTelemetrySubscriptions(GetSubscriptionsRequest)          returns (GetSubscriptionsReply) {}

    // Get Telemetry Agent Operational States
    rpc getTelemetryOperationalState(GetOperationalStateRequest)    returns (GetOperationalStateReply) {}

    // Return the set of data encodings supported by the device for
    // telemetry data
    rpc getDataEncodings(DataEncodingRequest)                       returns (DataEncodingReply) {}
}

// Message sent for a telemetry subscription request
message SubscriptionRequest {
    // Data associated with a telemetry subscription
    SubscriptionInput input                                 = 1;

    // List of data models paths and filters
    // which are used in a telemetry operation.
    repeated Path path_list                                 = 2;

    // The below configuration is not defined in Openconfig RPC.
    // It is a proposed extension to configure additional
    // subscription request features.
    SubscriptionAdditionalConfig additional_config          = 3;
}

// Data associated with a telemetry subscription
message SubscriptionInput {
    // List of optional collector endpoints to send data for
    // this subscription.
    // If no collector destinations are specified, the collector
    // destination is assumed to be the requester on the rpc channel.
    repeated Collector  collector_list                      = 1;
}

// Collector endpoints to send data specified as an ip+port combination.
message Collector {
    // IP address of collector endpoint
    string address                                          = 1;

    // Transport protocol port number for the collector destination.
    uint32 port                                             = 2;
}

// Data model path
message Path {
    // Data model path of interest
    // Path specification for elements of OpenConfig data models
    string path                                             = 1;

    // Regular expression to be used in filtering state leaves
    string filter                                           = 2;

    // If this is set to true, the target device will only send
    // updates to the collector upon a change in data value
    bool suppress_unchanged                                 = 3;

    // Maximum time in ms the target device may go without sending
    // a message to the collector. If this time expires with
    // suppress-unchanged set, the target device must send an update
    // message regardless if the data values have changed.
    uint32 max_silent_interval                              = 4;

    // Time in ms between collection and transmission of the
    // specified data to the collector platform. The target device
    // will sample the corresponding data (e.g,. a counter) and
    // immediately send to the collector destination.
    //
    // If sample-frequency is set to 0, then the network device
    // must emit an update upon every datum change.
    uint32 sample_frequency                                 = 5;
}

// Configure subscription request additional features.
message SubscriptionAdditionalConfig {
    // limit the number of records sent in the stream
    int32 limit_records                                     = 1;

    // limit the time the stream remains open
    int32 limit_time_seconds                                = 2;
}

// Reply to inline subscription for data at the specified path is done in
// two-folds.
// 1. Reply data message sent out using out-of-band channel.
// 2. Telemetry data send back on the same connection as the
//    subscription request.

// 1. Reply data message sent out using out-of-band channel.
message SubscriptionReply {
    // Response message to a telemetry subscription creation or
    // get request.
    SubscriptionResponse response                           = 1;

    // List of data models paths and filters
    // which are used in a telemetry operation.
    repeated Path path_list                                 = 2;
}

// Response message to a telemetry subscription creation or get request.
message SubscriptionResponse {
    // Unique id for the subscription on the device. This is
    // generated by the device and returned in a subscription
    // request or when listing existing subscriptions
    uint32 subscription_id = 1;
}

// 2. Telemetry data send back on the same connection as the
//    subscription request.
message OpenConfigData {
    // router name:export IP address
    string system_id                                        = 1;

    // line card / RE (slot number)
    uint32 component_id                                     = 2;

    // PFE (if applicable)
    uint32 sub_component_id                                 = 3;

    // Path specification for elements of OpenConfig data models
    string path                                             = 4;

    // Sequence number, monotonically increasing for each
    // system_id, component_id, sub_component_id + path.
    uint64 sequence_number                                  = 5;

    // timestamp (milliseconds since epoch)
    uint64 timestamp                                        = 6;

    // List of key-value pairs
    repeated KeyValue kv                                    = 7;
}

// Simple Key-value, where value could be one of scalar types
message KeyValue {
    // Key
    string key                                              =  1;

    // One of possible values
    oneof value {
        double double_value                                 =  5;
        int64  int_value                                    =  6;
        uint64 uint_value                                   =  7;
        sint64 sint_value                                   =  8;
        bool   bool_value                                   =  9;
        string str_value                                    = 10;
        bytes  bytes_value                                  = 11;
    }
}

// Message sent for a telemetry subscription cancellation request
message CancelSubscriptionRequest {
    // Subscription identifier as returned by the device when
    // subscription was requested
    uint32 subscription_id                                  = 1;
}

// Reply to telemetry subscription cancellation request
message CancelSubscriptionReply {
    // Return code
    ReturnCode code                                         = 1;

    // Return code string
    string     code_str                                     = 2;
};

// Result of the operation
enum ReturnCode {
    SUCCESS                                                 = 0;
    NO_SUBSCRIPTION_ENTRY                                   = 1;
    UNKNOWN_ERROR                                           = 2;
}

// Message sent for a telemetry get request
message GetSubscriptionsRequest {
    // Subscription identifier as returned by the device when
    // subscription was requested
    // --- or ---
    // 0xFFFFFFFF for all subscription identifiers
    uint32 subscription_id                                  = 1;
}

// Reply to telemetry subscription get request
message GetSubscriptionsReply {
    // List of current telemetry subscriptions
    repeated SubscriptionReply subscription_list            = 1;
}

// Message sent for telemetry agent operational states request
message GetOperationalStateRequest {
    // Per-subscription_id level operational state can be requested.
    //
    // Subscription identifier as returned by the device when
    // subscription was requested
    // --- or ---
    // 0xFFFFFFFF for all subscription identifiers including agent-level
    // operational stats
    // --- or ---
    // If subscription_id is not present then sent only agent-level
    // operational stats
    uint32 subscription_id                                  = 1;

    // Control verbosity of the output
    VerbosityLevel verbosity                                = 2;
}

// Verbosity Level
enum VerbosityLevel {
    DETAIL                                                  = 0;
    TERSE                                                   = 1;
    BRIEF                                                   = 2;
}

// Reply to telemetry agent operational states request
message GetOperationalStateReply {
    // List of key-value pairs where
    //     key      = operational state definition
    //     value    = operational state value
    repeated KeyValue kv                                    = 1;
}

// Message sent for a data encoding request
message DataEncodingRequest {
}

// Reply to data encodings supported request
message DataEncodingReply {
    repeated EncodingType  encoding_list                    = 1;
}

// Encoding Type Supported
enum EncodingType {
    UNDEFINED                                               = 0;
    XML                                                     = 1;
    JSON_IETF                                               = 2;
    PROTO3                                                  = 3;
}

为了首先进行服务调用(rpc TelemetrySubscribe),我需要读取具有订阅 ID 的标头,然后开始读取消息。现在,使用 Java 我可以连接到服务,我确实引入了拦截器,但是当我打印/检索标头时,它是空的。我调用拦截器的代码如下,

 ClientInterceptor interceptor = new HeaderClientInterceptor();
      originChannel = OkHttpChannelBuilder.forAddress(host, port)
        .usePlaintext(true)
        .build();
     Channel channel =  ClientInterceptors.intercept(originChannel, interceptor);
      telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);

这是读取元数据的拦截器代码。

  @Override
  public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
      CallOptions callOptions, Channel next) {
    return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

      @Override
      public void start(Listener<RespT> responseListener, Metadata headers) {

        super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
          @Override
          public void onHeaders(Metadata headers) {

             Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY", Metadata.ASCII_STRING_MARSHALLER);

            System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));

想知道是否还有其他方法可以读取其中包含订阅 ID 的元数据或第一条消息?我需要读取具有订阅 ID 的第一条消息,并将相同的订阅 ID 返回到服务器,以便流式传输可以启动我有使用相同 proto 文件的等效 Python 代码,并且它通过下面提到的代码与服务器进行通信,仅供参考:

     sub_req = SubscribeRequestMsg("host",port)
     data_itr = stub.telemetrySubscribe(sub_req, _TIMEOUT_SECONDS)
     metadata = data_itr.initial_metadata()

                   if metadata[0][0] == "responseKey":
                    metainfo = metadata[0][1]
                    print metainfo

                    subreply = agent_pb2.SubscriptionReply()
                    subreply.SetInParent()
                    google.protobuf.text_format.Merge(metainfo, subreply)

                    if subreply.response.subscription_id:
                    SUB_ID = subreply.response.subscription_id

从上面的 python 代码我可以轻松检索元数据对象,不知道如何使用 Java 检索相同的元数据对象?

阅读元数据后我得到的是:Metadata({content-type=[application/grpc], grpc-encoding=[identity], grpc-accept-encoding=[identity,deflate,gzip]})

但我知道从元数据到它还有一行,那就是

response {
  subscription_id: 2
}

如何从其中包含订阅 ID 的标头中提取最后一个响应。我确实尝试了很多选择,但我在这里迷失了。


您使用的方法是为了request元数据,不是response元数据:

public void start(Listener<RespT> responseListener, Metadata headers) {

对于响应元数据,您将需要ClientCall.Listener并等待 onHeaders 回调:

public void onHeaders(Metadata headers)

我确实觉得你提到的元数据的使用看起来很奇怪。元数据通常用于额外的错误详细信息或不特定于 RPC 方法的横切功能(如身份验证、跟踪等)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在客户端使用 Java 读取 gRPC 中的元数据 的相关文章

  • 如何替换引号之间出现的任何单词

    我需要能够替换所有出现的单词 and 仅当它出现在单引号之间时 例如 将字符串中的 and 替换为 XXX This and that with you and me and others and not her and him 结果是 T
  • Java中printf左对齐

    当我运行该程序时 阶乘值右对齐 有没有办法让它左对齐 同时保持中间 50 个空格 public class Exercise 5 13 public static void main String args int numbers 1 2
  • 生成固定长度的随机数组[重复]

    这个问题在这里已经有答案了 我只是想更改我的代码 以便每次运行代码时都会生成固定长度 100 个整数的随机数组 而不仅仅是在代码中包含一个预先设置的数组 我对此很陌生 所以只需要正确方向的指导 谢谢 public class Selecti
  • 将命令行参数传递给可运行的 JAR [重复]

    这个问题在这里已经有答案了 我从 Eclipse 项目构建了一个可运行的 JAR 用于处理给定的 XML 文件并提取纯文本 但是 此版本要求将该文件硬编码在代码中 有没有办法做这样的事情 java jar wiki2txt enwiki 2
  • “不能从静态上下文引用非静态方法”JPA Java

    我从这一行收到 无法从静态上下文引用非静态方法 错误 createStudent stu00001 new Date 631152000000 m WB new Type Name Bob Smith 如何正确组成 日期 我查看了 API
  • 线程“main”中出现异常 java.lang.NoClassDefFoundError:无法初始化类 com.sun.jersey.core.header.MediaTypes

    我正在尝试运行一个球衣客户端并面临这个问题 WS级 import javax ws rs GET import javax ws rs Path import javax ws rs Produces import javax ws rs
  • 如何使用 mongoTemplate 实现 Mongodb Collection 的分页

    我是 mongoDb 中的菜鸟 我需要为任何特定集合实现分页 例如说 我有一个 Foo 集合 并且有一个返回 Foo 集合中所有记录的函数 public List
  • 包java.time不存在,jdk1.8

    嗯 我刚刚开始从事代号工作 我对 Java 有相当不错的经验 我的代码一切都很好 没有任何问题 但在编译时我得到了这个 error package java time does not exit import java time Local
  • 当服务器仅从请求中读取标头时,Http 客户端未收到响应

    我在 Java 中搞乱了 HTTP 和套接字 希望你能对此有所了解 当我用 Java SE 11 编写的 HTTP 服务器没有读取整个请求然后响应时 客户端不会收到它或收到错误 这是为什么 在服务器读取整个请求之前 客户端是否无法读取响应
  • Hamcrest 与 MockMvc:检查键是否存在,但值可能为空

    我正在使用 MockMvc 进行一些测试 我想验证 JSON 响应的结构 具体来说 我想确保属性的键存在 并且值是某种类型或为 null keyToNull null This may be null or a String keyToSt
  • 如何使用jsp上传服务器文件夹上的文件[重复]

    这个问题在这里已经有答案了 我正在尝试使用 servlet jsp 将一些图像上传到位于我的服务器上的文件夹中 下面是我的代码 它在我的本地计算机上运行 import java io import java util import java
  • 实现一个java UDF并从pyspark调用它

    我需要创建一个在 pyspark python 中使用的 UDF 它使用 java 对象进行内部计算 如果它是一个简单的 python 我会做类似的事情 def f x return 7 fudf pyspark sql functions
  • Spring:当我的类已经用@RestController注释时,为什么我仍然应该使用@RequestBody?

    我目前正在将 Java 和 Spring 用于我的 Web 服务应用程序 我正在使用 RestController希望消除使用注释的需要 ResponseBody and RequestBody注释 不幸的是 删除 RequestBody注
  • 将 LinkedHashset 内容复制到新的 ArrayList?

    我有一个最初包含一些内容的 listView 如果它得到相同的内容 我通过删除重复linkedhashset 现在 我想复制linkedhashset内容 即没有重复的内容到新的ArrayList 我尝试复制通过 p addAll 0 lh
  • 无法向 openfire 服务器发送消息

    我无法使用 SMACK API 向 openfire 服务器上的 XMPP 客户端发送消息 我不确定我哪里出错了 我在 gtalk 上测试了相同的代码 它工作正常 public class SenderTest public static
  • 从文件执行db语句

    我在我的应用程序中使用嵌入式 Apache derby 我有一个名为的 SQL 脚本创建的数据库 sql创建数据库中的所有表并用初始数据填充它 例如 SET SCHEMA APP CREATE TABLE study study id bi
  • 从 MySql 迁移:MariaDB 服务器意外关闭客户端连接

    由于许可 商业使用原因 我们正在从 MySql 迁移到 MariaDB 我们已经成功用 MariaDB 客户端 jar 替换了 MySql 连接器 jar 第一次更改 现在正在尝试用 MariaDB 服务器替换 MySql 服务器而不更改数
  • 什么触发了java垃圾收集器

    我对 Java 中垃圾收集的工作原理有点困惑 我知道当不再有对某个对象的实时引用时 该对象就有资格进行垃圾回收 但是如果它有对实时对象的引用怎么办 可以说我有一个节点集合 它们再次引用更多节点 List 1 gt Node a gt Nod
  • 使用 Java 重新启动 Tomcat

    我需要从 Java 代码重新启动 tomcat 例如 如果某个查询在一段时间内没有执行 那么它将自动重新启动 tomcat 我已经尝试了以下关闭和启动代码 但是当我们关闭tomcat时 java代码将不会运行并且tomcat不会启动 注意
  • “该选择不能在任何服务器上运行”

    我一直在 Eclipse 中开发一个动态 Web 项目 我收到这个错误 该选择不能在任何服务器上运行 早些时候它工作得很好 但现在我收到了这个错误 我删除了服务器并再次添加 Project gt Right Click gt Propert

随机推荐