如何评估kafka流应用程序的消耗时间

2024-04-22

我有 1.0.0 kafka 流应用程序,有两个类,如下所示“class FilterByPolicyStreamsApp”和“class FilterByPolicyTransformerSupplier”。在我的应用程序中,我读取事件,执行一些条件检查并转发到另一个主题中的同一个 kafka。我能够使用 FilterByPolicyTransformerSupplier 类中的“eventsForwardTimeInMs”变量获取生成时间。但我无法获得耗时(有或没有(反)序列化)。这次我要怎样才能得到呢?请帮我。

FilterByPolicyStreamsApp .java:

public class FilterByPolicyStreamsApp implements CommandLineRunner {
    String policyKafkaTopicName="policy";
    String policyFilterDataKafkaTopicName = "policy.filter.data";
    String bootstrapServers="11.1.1.1:9092";
    String sampleEventsKafkaTopicName = 'sample-.*";
    String applicationId="filter-by-policy-app";
    String policyFilteredEventsKafkaTopicName = "policy.filter.events";

    public static void main(String[] args) {
        SpringApplication.run(FilterByPolicyStreamsApp.class, args);
    }

    @Override
    public void run(String... arg0) {
        String policyGlobalTableName = policyKafkaTopicName + ".table";
        String policyFilterDataGlobalTable = policyFilterDataKafkaTopicName + ".table";

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

        KStreamBuilder builder = new KStreamBuilder();
        builder.globalTable(Serdes.String(), new JsonSerde<>(List.class), policyKafkaTopicName,
                policyGlobalTableName);
        builder.globalTable(Serdes.String(), new JsonSerde<>(PolicyFilterData.class), policyFilterDataKafkaTopicName,
                policyFilterDataGlobalTable);

        KStream<String, SampleEvent> events = builder.stream(Serdes.String(),
                new JsonSerde<>(SampleEvent.class), Pattern.compile(sampleEventsKafkaTopicName));
        events = events.transform(new FilterByPolicyTransformerSupplier(policyGlobalTableName,
                policyFilterDataGlobalTable));

        events.to(Serdes.String(), new JsonSerde<>(SampleEvent.class), policyFilteredEventsKafkaTopicName);

        KafkaStreams streams = new KafkaStreams(builder, config);

        streams.start();
        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                logger.error(e.getMessage(), e);
            }
        });
    }
}

FilterByPolicyTransformerSupplier.java:

public class FilterByPolicyTransformerSupplier
            implements TransformerSupplier<String, SampleEvent, KeyValue<String, SampleEvent>> {
        private String policyGlobalTableName;
        private String policyFilterDataGlobalTable;

        public FilterByPolicyTransformerSupplier(String policyGlobalTableName,
            String policyFilterDataGlobalTable) {
            this.policyGlobalTableName = policyGlobalTableName;
            this.policyFilterDataGlobalTable = policyFilterDataGlobalTable;
        }
        @Override
        public Transformer<String, SampleEvent, KeyValue<String, SampleEvent>> get() {
            return new Transformer<String, SampleEvent, KeyValue<String, SampleEvent>>() {
                private KeyValueStore<String, List<String>> policyStore;
                private KeyValueStore<String, PolicyFilterData> policyMetadataStore;
                private ProcessorContext context;

                @Override
                public void close() {
                }

                @Override
                public void init(ProcessorContext context) {
                    this.context = context;
                    // Call punctuate every 1 second
                    this.context.schedule(1000);
                    policyStore = (KeyValueStore<String, List<String>>) this.context
                            .getStateStore(policyGlobalTableName);
                    policyMetadataStore = (KeyValueStore<String, PolicyFilterData>) this.context
                            .getStateStore(policyFilterDataGlobalTable);
                }

                @Override
                public KeyValue<String, SampleEvent> punctuate(long arg0) {
                    return null;
                }

                @Override
                public KeyValue<String, SampleEvent> transform(String key, SampleEvent event) {
                    long eventsForwardTimeInMs = 0;
                    long forwardedEventCouunt = 0;
                    List<String> policyIds = policyStore.get(event.getCustomerCode().toLowerCase());
                    if (policyIds != null) {
                        for (String policyId : policyIds) {

                            /*
                                PolicyFilterData policyFilterMetadata = policyMetadataStore.get(policyId);

                                Do some condition checks on the event. If it satisfies then will forward them.
                                if(policyFilterMetadata == null){
                                    continue;
                                }
                            */

                            // Using context forward as event can map to multiple policies
                            long startForwardTime = System.currentTimeMillis();
                            context.forward(policyId, event);
                            forwardedEventCouunt++;
                            eventsForwardTimeInMs += System.currentTimeMillis() - startForwardTime;
                        }
                    }
                    return null;
                }
            };
    }
}

None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何评估kafka流应用程序的消耗时间 的相关文章

  • 使用 Kafka Streams 进行 OpenTracing - 如何?

    我正在尝试将 Jaeger 跟踪集成到 K Streams 中 我计划将跟踪添加到几个最重要的管道中 并且想知道将 Traceid 从一个管道传递到另一个管道的好方法是什么 这是我到目前为止所做的 在流处理管道开始时 我启动一个服务器范围并
  • Kafka Streams 在 HDFS 上查找数据

    我正在使用 Kafka Streams v0 10 0 1 编写一个应用程序 并希望通过查找数据来丰富我正在处理的记录 该数据 带时间戳的文件 每天 或每天 2 3 次 写入 HDFS 目录 我怎样才能将其加载到Kafka Streams应
  • 卡夫卡幂等生产者

    卡夫卡文档说 幂等生产者可以使用相同的生产者会话 但我无法理解这一点 比如说 Kafka 为每条消息添加序列号 最后一个序列号保存在 Kafka 中 不确定它在哪里维护 它如何生成序列号以及它保存在哪里 为什么当生产者崩溃并再次出现时它无法
  • Kafka Connect 进入重新平衡循环

    我刚刚部署了 Kafka Connect 我只使用连接源 MQTT 应用程序位于两个实例的集群上 2 个容器上 机器 现在它似乎进入了一种重新平衡循环 我一开始有一点数据 但没有新数据出现 这就是我在日志中得到的内容 2017 08 11
  • Kafka 连接教程停止工作

    我在此链接中执行了步骤 7 使用 Kafka Connect 导入 导出数据 http kafka apache org documentation html quickstart http kafka apache org documen
  • 如何评估kafka流应用程序的消耗时间

    我有 1 0 0 kafka 流应用程序 有两个类 如下所示 class FilterByPolicyStreamsApp 和 class FilterByPolicyTransformerSupplier 在我的应用程序中 我读取事件 执
  • 即使没有消费者,消费者群体仍陷入“再平衡”

    我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
  • kafka启动失败(版本0.8.0 beta1)

    我正在尝试在独立模式 在ec2上 上使用zookeeper版本 3 3 6 启动kafka服务 所以我运行 1 sbt update 2 sbt package 3 sbt assembly package dependency 然后启动z
  • 由于 jaas.conf 不正确而导致 Kafka TopicAuthorizationException

    我指的是JAAS登录配置文件 https docs oracle com javase 7 docs technotes guides security jgss tutorials LoginConfigFile html 它讨论了两种指
  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • 如何在 Spring Kafka 中以编程方式设置 Jsonserializer Type Value 方法

    所以我无法仅使用 yaml 为 JsonSerializer 配置 JavaType 方法 还不确定原因 但与此同时 我如何以编程方式设置它 我在文档中看到了它的代码 但是该代码到底需要在哪里运行 Spring Kafka JsonDese
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • Grafana/prometheus 中没有 kafka 指标

    我成功部署了 Helm Chart普罗米修斯操作员 https github com coreos prometheus operator tree master helm prometheus operator kube 普罗米修斯 ht
  • Kafka Java 消费者从未收到任何消息

    我正在尝试设置一个基本的 Java 消费者来接收来自 Kafka 主题的消息 我已经跟踪了样本 https cwiki apache org confluence display KAFKA Consumer Group Example h
  • 如何使用 C# 从 Kafka 获取主题列表

    我想从卡夫卡获取主题列表 我正在使用 kafka net 客户端 但无法在有关获取主题列表的文档中找到 您可以使用 Confluence Kafka 包中提供的 AdminClient 列出所有主题 using Confluent Kafk
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • 使用Spring Cloud Stream Kafka动态更改instanceindex

    如同 在运行时更改 spring cloud stream 实例索引 计数 https stackoverflow com questions 37579939 changing spring cloud stream instance i
  • 如何更改主题的起始偏移量?

    是否可以更改新主题的起始偏移量 我想创建一个新主题并从偏移量开始阅读10000 How 自从卡夫卡0 11 0 0 https issues apache org jira browse KAFKA 4743你可以使用脚本kafka con
  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti

随机推荐

  • 为什么 ng-mouseover 不能与 ng-if 一起使用

    我试图在具有 ng if 的图像上使用 ng mouseover 指令 但它不起作用 但如果我使用 ng show 指令它起作用 每个人都可以告诉我为什么吗 或者这是 AngularJS 的问题 在 AngularJS 文档中 我无法阅读任
  • WooCommerce 挂钩 woocommerce_cancelled_order

    再会 这是我第一次使用 stackoverflow 很高兴认识大家 不管怎样 我正在为 WooCommerce 编写一个插件 当订单取消时我会自动退款 当我在没有钩子的情况下在单独的文件中手动执行它时 我的代码工作正常 但是 使用我的钩子它
  • Azure 移动服务和 Azure Web 应用身份验证

    当用户通过 Azure Web 应用程序 ASP NET MVC 和 Xamarin iOS 应用程序登录时 我为同一用户获得两个不同的 SID Setup 带有 API 控制器的 Azure WebApp ASP NET 5 带有 Mic
  • 如何使用 proguard 混淆 android 库(.aar)?

    我想混淆 aar使用 proguard 进行分发的库 我在互联网上尝试了很多解决方案 但到目前为止没有任何效果 只有一些代码被混淆了 有人可以帮我解决这个问题吗 在 build gradle 中 在 defaultConfig 下添加 Co
  • 将数据从 ASP.NET MVC 控制器推送到视图

    我正在构建一个网站的后端 该网站的前端将有多个需要实时更新的 小部件 现在我只是有一个加载方法 它用数据填充所有小部件 显然是在页面加载时 我的问题是如何处理进一步更新的实时方面 我想过只进行多个 ajax 调用 它可以每秒左右查询一个服务
  • 运行 Code First 迁移种子方法而不进行迁移

    如何运行代码优先实体框架迁移而不更改数据模型中的任何内容 这会导致创建迁移 我只想再次运行种子方法 因为我向其中添加了一些内容 如果你只需要运行Seed 再次 没有任何改变会导致添加新的迁移 只需调用Update Database再次没有标
  • Linq - 按日期分组并选择计数

    我目前正在解决一个问题 我想运行一个查询 该查询按所选日期对结果进行分组 对于这个例子 想象一个像这样的简单模型 public class User public DateTime LastLogIn get set public stri
  • 搜索事件的 Jquery 选择器

    我需要选择所有已绑定 单击 事件的元素 是否存在这样的选择器 jQuery 本身不支持它 但您可以使用编写自己的自定义选择器有事件插件 http plugins jquery com project hasevent jQuery expr
  • 当 Java 中的集合超出容量时会发生什么?

    我有一个服务 它将所有对其进行的调用暂存在内存中 因为我们不想丢失数据 同时我们需要该服务因任何外部依赖项 例如数据库 而失败 然后 这些分阶段的调用会在后台例行接收和处理 如果出于任何原因 如果调用太多并且内存不足 我们就需要警惕 所以
  • 如何将孤立分支“按原样”附加到 master?

    搬迁使用过程中git我们采用了解决方案的生产版本并将其作为master 然后我们拿了一个开发版本并做了一个孤儿分支 called develop 背景 为什么我们在这里有点纠结是因为从开发版本到生产版本并没有干净的演变 此外 组装所涉及的解
  • 从 cURL 发布:HTTP_X_REQUESTED_WITH

    我正在以编程方式将表单发布到 PHP 表单处理脚本 有没有办法让我的表单处理脚本认为该帖子是由ajax 完成的 表单处理程序当前检查 SERVER 中的 HTTP X REQUESTED WITH 以实现特殊的仅限 ajax 的逻辑 当我使
  • 重新启动 Android MediaRecorder 时出现 IllegalStateException [在无效状态下调用启动:1]

    我正在尝试实现简单的逻辑来开始 停止录制MediaRecorder安卓的 周期为 连接到 localSocket 设置选项 mRecorder prepare mRecorder start mRecorder stop mRecorder
  • Android Studio 3.1:代理配置:无法为git操作设置https用户密码

    我最近将 Android Studio 从 3 0 升级到了 3 1 在3 0中 我曾经在中设置代理配置gradle properties 全局属性 文件 其中包括设置systemProp https proxyPassword除其他外 但
  • 更改 SOLR 默认连接

    我正在使用嵌入 SOLR 的应用程序 SOLR 在 Tomcat 的 webapp 区域中像一场战争一样运行 是否有 SOLR 配置允许我切换搜索的默认 SOLR 行为以假定 AND 而不是 OR 作为连接运算符 在您的模式文件中添加 或修
  • /var/run/docker.sock:在 Python CGI 脚本中运行 docker 时权限被拒绝

    我正在尝试运行 Python CGI 脚本 在其中需要运行 docker 镜像 我使用的是 Docker 版本 1 6 2 用户是 www data 添加到docker组中 www data www data sudo docker 在机器
  • 模型响应包含 swagger 中不同对象类型的数组

    我想建模一个响应对象 其中包含 swagger 中不同类型对象的数组 如下所示 table user customer employee 我尝试了下面的解决方案 但它将所有属性包装在单个对象 user customer 中 response
  • 使用 SSE/AVX 获取 __m256d 中存储的值的总和

    有没有办法获得存储在 m256d 变量中的值的总和 我有这个代码 acc mm256 add pd acc mm256 mul pd row vec acc in this point contains 2 0 8 0 18 0 32 0
  • Rvest XML 网络抓取

    我是一个初学者 我在抓取方面遇到了问题 我需要获取有关一些客户的活动 非活动 VEIS 号码的数据 目前 我只尝试一个 在网站上 我必须 设置值并发送表单 然后浏览器重定向到下一页 我可以在其中找到有趣的日期 下面我发送了我的代码 也许有人
  • Excel动态图表标题(前n个)

    是否可以创建包含 前 n 的动态 Excel 数据透视图标题 其中 n 用户选择的值过滤器行限制 我知道图表标题可以设置为单元格的内容 但不确定如何确定用户选择的前 n 个限制或如何将其添加到单元格公式中 如果可能的话 我们将不胜感激任何帮
  • 如何评估kafka流应用程序的消耗时间

    我有 1 0 0 kafka 流应用程序 有两个类 如下所示 class FilterByPolicyStreamsApp 和 class FilterByPolicyTransformerSupplier 在我的应用程序中 我读取事件 执