Spring Kafka 中的 Kafka 消费者/生产者测试

2023-12-10

我目前正在研究我正在使用的 Kafka 模块spring-kafka卡夫卡通信的抽象。我能够从实际实现的角度集成生产者和消费者,但是,我不确定如何测试(特别是集成测试)消费者周围的业务逻辑@KafkaListener。我试着跟随spring-kafk有关该主题的文档和各种博客,但没有一个回答我想要的问题。

Spring Boot 测试类

//imports not mentioned due to brevity

@RunWith(SpringRunner.class)
@SpringBootTest(classes = PaymentAccountUpdaterApplication.class,
                webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class CardUpdaterMessagingIntegrationTest {

    private final static String cardUpdateTopic = "TP.PRF.CARDEVENTS";

    @Autowired
    private ObjectMapper objectMapper;

    @ClassRule
    public static KafkaEmbedded kafkaEmbedded =
            new KafkaEmbedded(1, false, cardUpdateTopic);

    @Test
    public void sampleTest() throws Exception {
        Map<String, Object> consumerConfig =
                KafkaTestUtils.consumerProps("test", "false", kafkaEmbedded);
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        ConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerConfig);
        ContainerProperties containerProperties = new ContainerProperties(cardUpdateTopic);
        containerProperties.setMessageListener(new SafeStringJsonMessageConverter());
        KafkaMessageListenerContainer<String, String>
                container = new KafkaMessageListenerContainer<>(cf, containerProperties);

        BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, String>) data -> {
            System.out.println("Added to Queue: "+ data);
            records.add(data);
        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container, kafkaEmbedded.getPartitionsPerTopic());


        Map<String, Object> producerConfig = KafkaTestUtils.senderProps(kafkaEmbedded.getBrokersAsString());
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        ProducerFactory<String, Object> pf =
                new DefaultKafkaProducerFactory<>(producerConfig);
        KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(pf);

        String payload = objectMapper.writeValueAsString(accountWrapper());
        kafkaTemplate.send(cardUpdateTopic, 0, payload);
        ConsumerRecord<String, String> received = records.poll(10, TimeUnit.SECONDS);

        assertThat(received).has(partition(0));
    }


    @After
    public void after() {
        kafkaEmbedded.after();
    }

    private AccountWrapper accountWrapper() {
        return AccountWrapper.builder()
                .eventSource("PROFILE")
                .eventName("INITIAL_LOAD_CARD")
                .eventTime(LocalDateTime.now().toString())
                .eventID("8730c547-02bd-45c0-857b-d90f859e886c")
                .details(AccountDetail.builder()
                        .customerId("idArZ_K2IgE86DcPhv-uZw")
                        .vaultId("912A60928AD04F69F3877D5B422327EE")
                        .expiryDate("122019")
                        .build())
                .build();
    }
}

聆听班

@Service
public class ConsumerMessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerMessageListener.class);

    private ConsumerMessageProcessorService consumerMessageProcessorService;

    public ConsumerMessageListener(ConsumerMessageProcessorService consumerMessageProcessorService) {
        this.consumerMessageProcessorService = consumerMessageProcessorService;
    }


    @KafkaListener(id = "cardUpdateEventListener",
            topics = "${kafka.consumer.cardupdates.topic}",
            containerFactory = "kafkaJsonListenerContainerFactory")
    public void processIncomingMessage(Payload<AccountWrapper,Object> payloadContainer,
                                       Acknowledgment acknowledgment,
                                       @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
                                       @Header(KafkaHeaders.OFFSET) String offset) {

        try {
            // business logic to process the message
            consumerMessageProcessorService.processIncomingMessage(payloadContainer);
        } catch (Exception e) {
            LOGGER.error("Unhandled exception in card event message consumer. Discarding offset commit." +
                    "message:: {}, details:: {}", e.getMessage(), messageMetadataInfo);
            throw e;
        }
        acknowledgment.acknowledge();
    }
}

我的问题是:在测试类中,我断言正在轮询的分区、有效负载等BlockingQueue,但是,我的问题是如何验证我的业务逻辑在类中注释@KafkaListener正确执行并根据错误处理和其他业务场景将消息路由到不同的主题。在一些例子中,我看到CountDownLatch断言我不想将其放入我的业务逻辑中以在生产级代码中进行断言。消息处理器也是Async那么,如何断言执行,不确定。

任何帮助,不胜感激。


正确执行并根据错误处理和其他业务场景将消息路由到不同的主题。

集成测试可以使用该“不同”主题来断言侦听器按预期处理它。

您还可以添加一个BeanPostProcessor到你的测试用例并包装ConsumerMessageListener代理中的 bean 来验证输入参数是否符合预期。

EDIT

这是将侦听器包装在代理中的示例......

@SpringBootApplication
public class So53678801Application {

    public static void main(String[] args) {
        SpringApplication.run(So53678801Application.class, args);
    }

    @Bean
    public MessageConverter converter() {
        return new StringJsonMessageConverter();
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

@Component
class Listener {

    @KafkaListener(id = "so53678801", topics = "so53678801")
    public void processIncomingMessage(Foo payload,
            Acknowledgment acknowledgment,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
            @Header(KafkaHeaders.OFFSET) String offset) {

        System.out.println(payload);
        // ...
        acknowledgment.acknowledge();
    }

}

and

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=manual

and

@RunWith(SpringRunner.class)
@SpringBootTest(classes = { So53678801Application.class,
        So53678801ApplicationTests.TestConfig.class})
public class So53678801ApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule embededKafka = new EmbeddedKafkaRule(1, false, "so53678801");

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.kafka.bootstrap-servers",
                embededKafka.getEmbeddedKafka().getBrokersAsString());
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    @Autowired
    private ListenerWrapper wrapper;

    @Test
    public void test() throws Exception {
        this.template.send("so53678801", "{\"bar\":\"baz\"}");
        assertThat(this.wrapper.latch.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(this.wrapper.argsReceived[0]).isInstanceOf(Foo.class);
        assertThat(((Foo) this.wrapper.argsReceived[0]).getBar()).isEqualTo("baz");
        assertThat(this.wrapper.ackCalled).isTrue();
    }

    @Configuration
    public static class TestConfig {

        @Bean
        public static ListenerWrapper bpp() { // BPPs have to be static
            return new ListenerWrapper();
        }

    }

    public static class ListenerWrapper implements BeanPostProcessor, Ordered {

        private final CountDownLatch latch = new CountDownLatch(1);

        private Object[] argsReceived;

        private boolean ackCalled;

        @Override
        public int getOrder() {
            return Ordered.HIGHEST_PRECEDENCE;
        }

        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            if (bean instanceof Listener) {
                ProxyFactory pf = new ProxyFactory(bean);
                pf.setProxyTargetClass(true); // unless the listener is on an interface
                pf.addAdvice(interceptor());
                return pf.getProxy();
            }
            return bean;
        }

        private MethodInterceptor interceptor() {
            return invocation -> {
                if (invocation.getMethod().getName().equals("processIncomingMessage")) {
                    Object[] args = invocation.getArguments();
                    this.argsReceived = Arrays.copyOf(args, args.length);
                    Acknowledgment ack = (Acknowledgment) args[1];
                    args[1] = (Acknowledgment) () -> {
                        this.ackCalled = true;
                        ack.acknowledge();
                    };
                    try {
                        return invocation.proceed();
                    }
                    finally {
                        this.latch.countDown();
                    }
                }
                else {
                    return invocation.proceed();
                }
            };
        }

    }

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Kafka 中的 Kafka 消费者/生产者测试 的相关文章

  • 在 Java 中使用 Apache POI XWPF 在同一个 Word 文档中横向和纵向页面

    我正在尝试使用 Java 和 Apache POI 库创建一个包含一些横向页面和一些纵向页面的 Word 文档 我可以更改所有页面的方向 但有没有办法只更改其中某些页面的方向 我尝试过使用不同的部分和主体 但无济于事 目前我已经编写了一个函
  • 如何解析比 Java 中 NumberFormat 更严格的数字?

    我正在验证表单中的用户输入 我解析输入NumberFormat http docs oracle com javase 7 docs api java text NumberFormat html 但它是邪恶的 几乎允许任何事情 有没有办法
  • Eclipse 说“更新 Android Developer Toolkit”

    我不知何故弄乱了我的 Eclipse 和 Android 设置 我不知道如何修复它 问题症状如下 在 首选项 gt Android 中 我尝试选择 android sdk linux 的位置 选择时出现错误 此 Android SDK 需要
  • 如何使用 Apache Camel 路由从授权服务器获取访问令牌?

    我有一个授权服务器 带有注释的简单类 SpringBootApplication RestController Configuration EnableAuthorizationServer oauth2 security 在端口上运行80
  • 使用Optional作为类中的属性是一个好习惯吗? [复制]

    这个问题在这里已经有答案了 我读过一些关于目的的内容Optional 不幸的是我不记得在哪里 在Java 8中 我很惊讶作者没有提到使用Optional作为类中的属性 由于我在课堂上经常使用选项 我想知道这是否是一个好的做法 或者我可以更好
  • 使用java在mysql中插入带有\\的文件路径

    我正在使用java制作一个独立的应用程序 并且我需要插入用户从文件选择器中选择的图像的路径 我正在获取文件的路径 但是当我将其存储在数据库 mysql 中时 它不会存储 所以当我检索该路径时 该文件不会显示 如何存储文件的路径 这样就可以使
  • Maven:缺少工件 org.springframework:spring:jar:4.2.6

    我在 SpringToolSuite 中有一个动态 Web 项目 它被转换为 Maven 项目 我遇到问题 缺少工件 org springframework spring jar 4 2 6 我已经尝试清理 重建和运行该项目 它给 读取文件
  • Apache POI 的 ProGuard 设置

    我正在构建一个使用 Apache POI 库的应用程序 当我调试应用程序 在不运行 Proguard 的情况下编译它 时 一切都运行良好 但是在导出 APK 后 当我运行应用程序并打开 Excel 文件时 出现以下异常 RuntimeExc
  • 内容安全策略:页面设置阻止自行加载资源?

    我有基于 Java 的 Web 应用程序运行在Tomcat http en wikipedia org wiki Apache Tomcat6 我的应用程序在本地主机和端口 9001 上运行 为了使我的应用程序更加安全并降低风险XSS ht
  • 使用 viewModel 从 ChildFragment 访问 ParentFragment 中的 ViewModel

    我正在尝试访问ParentViewModel for ParentFragment from ChildFragment using viewModels 这是我的代码 In ParentFragment class ParentFragm
  • 用dagger 2查看依赖注入

    我有一个自定义视图扩展TextView 我应该在哪里调用我的组件来注入视图 component inject customTextView 因此 我发现我需要在自定义视图的构造函数中添加注入 在所有视图中 或者使一个调用另一个 Exampl
  • 使用 Spring 注入 Google Guava Hashmultimap

    是否可以提供一个创建示例Multimap
  • 如何从 REstAssured 中的 Json 数组获取 JSON 对象

    任何人都可以帮我解决这个场景 我是新来的RestAssured和处理JSON在我们的自动化脚本中 我有一个API谁的回应是JSONArray i e id 1002 entity testcase fieldName TextName di
  • Spark toLocalIterator 和迭代器方法之间的区别

    在编写 Spark 程序时我遇到了这个toLocalIterator 方法 之前我只使用iterator method 如果有人曾经使用过这种方法 请点亮 我在使用时遇到foreach and foreachPartitionSpark程序
  • 使用JPanel绘制直线并获取点坐标

    我现在完全不知所措 我没有太多用 Java 构建 GUI 我一直在阅读有关 swing 和 JPanel 的所有内容 我认为我想做的事情是可能的 我只是还没有弄清楚how 我正在尝试构建一个 GUI 您可以在其中在某个绘图区域内绘制直线 我
  • Spring @Configuration如何缓存对bean的引用

    使用基于 Java 的配置时 Spring 如何防止再次调用 bar 我想知道编译时注释处理或通过代理方法 Configuration public class AppConfig Bean public Foo foo return ne
  • 在 Java Jersey 2 JAX-RS 中初始化单例

    我是泽西岛 2 22 2 的新手 请耐心等待 我正在创建一个与 LDAP 服务器交互的 REST 服务 用于存储 删除和检索用户数据 该服务通过执行加密 解密充当安全中介 在使用 REST 服务之前必须进行相当多的初始化 并且我只想执行此初
  • Selenium Webdriver 中的 IF 语句

    我想知道是否有人可以帮助我解决我正在尝试解决的问题以及 Java 中 Webdriver 的 If 语句 当登录到我正在测试的应用程序时 可以在主页之前进入安全问题页面 如果是新用户等 我希望测试中的代码做的是 如果出现安全问题页面 请填写
  • 如何使用SAXReader解析GPX文件?

    我正在尝试解析GPX file http en wikipedia org wiki GPS eXchange Format 我用 JDOM 尝试过 但效果不太好 SAXBuilder builder new SAXBuilder Docu
  • 如何获取 EC2 实例的 CloudWatch 指标数据

    我想获取我的 EC2 实例的 Cloudmetrics 数据 以便我可以使用这些数据绘制图表并将其显示在我的 Android 设备上 我怎么做 有相同的示例程序或教程吗 提前致谢 这就是我正在做的 private static void f

随机推荐