Kafka 快速入门:我需要哪些依赖项?

2024-04-08

我正在研究卡夫卡快速入门:

http://kafka.apache.org/07/quickstart.html http://kafka.apache.org/07/quickstart.html

以及基本的消费者组示例:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

我已经对 Consumer 和 ConsumerThreadPool 进行了编码,如下所示:

import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;

public class Consumer implements Runnable {

    private KafkaStream m_stream;
    private Integer m_threadNumber;

    public Consumer(KafkaStream a_stream, Integer a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext()) {
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));

        }
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

其他几个方面:我正在使用 spring 来管理我的动物园管理员:

import javax.inject.Named;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan("com.truecar.inventory.worker.core")
public class AppConfig {

    @Bean
    @Named("consumerConfig")
    private static ConsumerConfig createConsumerConfig() {
        String zookeeperAddress = "127.0.0.1:2181";
        String groupId = "inventory";
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeperAddress);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }
}

我正在使用 Maven 和 OneJar Maven 插件进行编译。但是,我编译并运行生成的一个 jar 后出现以下错误:

Aug 26, 2013 6:15:41 PM org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider registerDefaultFilters
INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803)
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2521)
at java.lang.Class.getDeclaredMethods(Class.java:1845)
at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180)
at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:222)
at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:165)
at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:140)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:282)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:223)
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:630)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461)
at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73)
at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31)
at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20)
... 6 more
Caused by: java.lang.ClassNotFoundException: scala.ScalaObject
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 27 more

现在,我对 Kafka 知之甚少,对 Scala 也一无所知。我该如何解决?接下来我应该尝试什么?这是一个已知的问题?我还需要其他依赖吗?这是我的 pom.xml 中的 kafka 版本:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.9.2</artifactId>
    <version>0.8.0-beta1</version>
</dependency> 

更新:我联系了 Kafka 开发者邮件列表,他们让我知道了 scala 依赖项的一些特定版本要求。然而,还有一个未记录的 log4j 依赖项,这会导致另一个运行时异常,而不是编译时异常。

Exception in thread "main" java.lang.reflect.InvocationTargetException
Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterAndLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V
at org.apache.log4j.Category.log(Category.java:333)
at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177)

另一个更新:

我找到了正确的 log4j 依赖项:

    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>

但现在我遇到了一个更加神秘的运行时异常:

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)

此时我有一种 WTF 的感觉。所以我添加了另一个依赖项:

    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.3</version>
    </dependency>

但这暴露了另一个运行时异常:

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)

我希望能够启动并运行这个婴儿示例,但这也许就是使用测试版产品的代价?也许我应该切换到 Apache Active MQ。但这听起来不太有趣。我错过了什么吗?


问题是kafka beta 的构建方式是用 jar 生成的 pom 无效,maven 无法识别它并正确解析 https://issues.apache.org/jira/browse/KAFKA-1064,从而获取传递依赖。我们通过在 pom 定义中加入该 pom(scala、zk 等)的所有依赖项,成功地缓解了这个问题。我们正在等待 kafka 的下一个 beta 版本,其中问题将得到解决。

完整的依赖项列表如下。请注意,您必须根据您的 kafka 工件的后缀更改 scala 版本依赖项。

<dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.15</version>
            <exclusions>
                <exclusion>
                    <groupId>com.sun.jmx</groupId>
                    <artifactId>jmxri</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jdmk</groupId>
                    <artifactId>jmxtools</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.jms</groupId>
                    <artifactId>jms</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>net.sf.jopt-simple</groupId>
            <artifactId>jopt-simple</artifactId>
            <version>3.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.3</version>
        </dependency>
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-annotation</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.easymock</groupId>
            <artifactId>easymock</artifactId>
            <version>3.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest</artifactId>
            <version>1.2</version>
            <scope>test</scope>
        </dependency>

至于

也许我应该切换到 Apache Active MQ。但这听起来不太有趣。 我错过了什么吗?

好吧,你别忘了这是beta发布?确实发生了一些不好的事情,但目前我们正在运行 kafka 0.7,但没有任何努力.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 快速入门:我需要哪些依赖项? 的相关文章

随机推荐

  • 当返回类型为 Option[Error] 时处理快速失败

    我已经发布了很多关于 Scala 中的故障处理的问题 我真的感谢大家的回答 我理解在处理 Either 和 Scalaz 或 a 时的选择 以便理解 我还有另一个 最后一个 问题 当操作处理外部非功能世界 例如数据库 时 如何执行快速失败的
  • 在 SQL 数据库中保持 RSS 提要唯一的最佳实践

    我正在开发一个项目 该项目显示来自不同站点的 RSS 提要 我将它们保存在数据库中 我的程序每 3 小时获取一次并将它们插入到 SQL 数据库中 我希望提供者有独特的记录 不要显示重复的内容 但问题是一些提供商不提供 GUID 字段 而其他
  • 全局 $sce.trustAsResourceUrl()

    我怎样才能做这样的事情 sce trustAsResourceUrl URL HERE 在全球范围内 就像在主应用程序中一样config or run 函数 以便任何 iFrame img src 等具有URL HERE将工作 文档对此的解
  • 在rabbitmq配置spring boot中在AMQP中配置多个Vhost

    我正在实现一个项目 我必须在rabbitmq中的不同虚拟主机之间发送消息 使用 SimpleRoutingConnectionFactory 但得到 java lang IllegalStateException 无法确定查找键的目标 Co
  • 休眠:不想节省毫秒

    我遇到了一个小问题 我在网上没有找到合适的解决方案 因为我的问题对于搜索引擎来说有点棘手 有很多关于休眠节省毫秒的主题 但我的问题是别的 事实上 我有一个数据库 它保存我的日期 如下所示 2014 03 20 10 58 09 I used
  • 如何使用 cartopy 添加点特征形状文件以进行地图绘制

    我有两个形状文件 一个是点要素形状文件 名为 point shp 另一个是名为 polygon shp 的多边形形状文件 我想使用 cartopy 添加到地图中 我设法添加 polygon shp 但添加 point shp 失败 这是我的
  • android gradle MonkeyTalk 构建问题

    我正在使用 MonkeyTalk 来进行写入测试这个演示 https github com georgepapas android gradle monkey talk demo 我正在使用 AndroidAnnotations 2 7 1
  • gridview 中 nvarchar 附近的语法不正确

    我在更新 gridview 的列时遇到问题 当我尝试更新该字段时出现此错误 nvarchar 附近的语法不正确 必须声明标量变量 pid 这是我的一段代码
  • 使用下拉列表在 Plotly 中交互式过滤数据表

    我正在尝试制作一个交互式表格 其中通过从下拉列表中选择一个值来更改表格的值 这应该只在 Plotly 而不是 Dash 中完成 因为我需要与其他用户共享文件 例如 如果我选择通道 1 那么表格应该是 Date A item B item C
  • 以编程方式切换 UIAlert 控制器

    我正在快速创建一个注册对话框 其中包含 3 个文本字段和一个开关 并且我成功添加了三个文本字段和两个警报 下面的代码显示了相同的内容 let alertController UIAlertController title Register
  • Mendeley API - 如何使用 JavaScript SDK - 隐式流身份验证

    我希望您不会介意我提出问题 因为我不是程序员 需要 傻瓜式 解释 虽然我可以使用基本的 JavaScript 但我从未使用过 API 这就是我想做的 我在硬盘上制作了一个 HTML 页面 我最终希望将其与 PhoneGap 之类的东西打包到
  • 在 iCarouselTypeRotary 视图中隐藏背面图像

    我已将图像设置为轮播 https github com nicklockwood iCarousel 当我滚动轮播时 它会显示正面和背面的图像 我不想在后面显示图像 请帮忙 谢谢 您应该实现委托 CGFloat carousel iCaro
  • HTML 输入值更改

    我有一个 PHP 更新页面 其中显示一个包含数据库中的值的文本字段 就是这样 并且正在工作
  • 按最新回复时间排列论坛帖子

    在你喊之前ORDER BY id 情况就大不相同了 我被要求建立的论坛是一个留言板论坛 未注册的用户可以在其中回复和发帖 对帖子的回复缩进在被回复的帖子下方 当然 这都是无序列表中的 一个简短的例子 主帖 回复帖子 对回复的回复 另一个回复
  • jupyter 笔记本显示 matplotlib 错误键“text.kerning_factor”的错误消息

    import pandas as pd import numpy as np import matplotlib pyplot as plt 第 4 行的错误键 text kerning factor home samyak anacond
  • iOS UICollectionView 原型单元格大小属性被忽略

    我正在使用带有两个原型单元的 UICollectionView 原型单元具有不同的宽度并包含不同的控件 图像视图和网页视图 我肯定会为给定索引返回正确的原型单元格 所有单元格都显示正确的内容 但原型单元格大小将被忽略 而会使用集合视图的项目
  • Android App ID前缀问题

    我正在构建一个适用于 Android 的 Adob e Air 应用程序 但遇到了一个大问题 我使用旧的 Flash Air 扩展向 Android 市场发布了一个应用程序 该扩展保存了带有前缀 app 的应用程序 ID 现在我已经升级了扩
  • 可以忽略 ReactiveObject 的初始值吗?

    使用 ReactiveUI 是否可以忽略给定 ReactiveObject 的初始值 例如 我初始化了一个 ViewModel 然后我WhenAnyValue在视图模型上 我立即收到通知 该值是null为我选择的财产 是的我可以 Where
  • JVM 内存类型

    我正在做一个监控项目 我们有监控软件正在运行并从服务器重新收集指标 一切工作正常 但我们需要一些有关 JVM 内存使用情况详细信息 我们有一些具有不同内存类型的列 我们需要知道这些是什么 Heap Non Heap Usage Peak C
  • Kafka 快速入门:我需要哪些依赖项?

    我正在研究卡夫卡快速入门 http kafka apache org 07 quickstart html http kafka apache org 07 quickstart html 以及基本的消费者组示例 https cwiki a