是否可以使用Kafka传输文件?

2024-05-09

我每天都会生成数千个文件,我想使用 Kafka 进行流式传输。 当我尝试读取该文件时,每一行都被视为一条单独的消息。

我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息,以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中。


您可以编写自己的序列化器/反序列化器来处理文件。 例如 :

制作人道具:

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);  
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, YOUR_FILE_SERIALIZER_URI);

消费道具:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, YOUR_FILE_DESERIALIZER_URI);

串行器

public class FileMapSerializer implements Serializer<Map<?,?>> {

@Override
public void close() {

}

@Override
public void configure(Map configs, boolean isKey) {
}

@Override
public byte[] serialize(String topic, Map data) {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutput out = null;
    byte[] bytes = null;
    try {
        out = new ObjectOutputStream(bos);
        out.writeObject(data);
        bytes = bos.toByteArray();
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        try {
            if (out != null) {
                out.close();
            }
        } catch (IOException ex) {
            // ignore close exception
        }
        try {
            bos.close();
        } catch (IOException ex) {
            // ignore close exception
        }
    }
    return bytes;
}
}

解串器

public class MapDeserializer implements Deserializer<Map> {

@Override
public void close() {

}

@Override
public void configure(Map config, boolean isKey) {

}

@Override
public Map deserialize(String topic, byte[] message) {
    ByteArrayInputStream bis = new ByteArrayInputStream(message);
    ObjectInput in = null;
    try {
        in = new ObjectInputStream(bis);
        Object o = in.readObject();
        if (o instanceof Map) {
            return (Map) o;
        } else
            return new HashMap<String, String>();
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        try {
            bis.close();
        } catch (IOException ex) {
        }
        try {
            if (in != null) {
                in.close();
            }
        } catch (IOException ex) {
            // ignore close exception
        }
    }
    return new HashMap<String, String>();
}
}

按以下形式撰写消息

final Object kafkaMessage = new ProducerRecord<String, Map>((String) <TOPIC>,Integer.toString(messageId++), messageMap);

messageMap 将包含文件名作为键,文件内容作为值。 值可以是可序列化的对象。 因此,每条消息将包含一个带有 File_Name 与 FileContent 映射的映射。可以是单值或多值。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

是否可以使用Kafka传输文件? 的相关文章

  • 即使在kafka机器重新启动后,如何保留kafka保留字节和kafka保留段[重复]

    这个问题已经存在了 we set retention bytes价值 104857600对于主题 topic test root confluent01 kafka topics zookeeper localhost 2181 alter
  • 无法向 kafka 主题发送消息

    我正在使用 Kafka Play 以及 Scala 这是我的代码 我想在其中发送消息到kafka服务器 主题名称是 测试主题 尽管我没有在主题中看到我发送的消息 但我没有收到任何错误 这里有什么问题吗 import kafka produc
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • Strimzi 运算符 Kafka 集群 ACL 未启用类型:简单

    我们知道要启用Kafka ACL属性authorizer class name kafka security auth SimpleAclAuthorizer要添加到server properties但是如果 Kafka 集群由 Strim
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区
  • 编辑 Kafka Listener Spring 应用程序以更改阶段/目标

    我可以利用另一个运行 Kafka 应用程序 代码库的团队来使用相同的数据 将其加载到我们的新暂存表中 而不是他们的 他们在 Messages 文件夹中有许多不同的 kafka 侦听器适配器 java 文件 每个文件消耗不同类型的数据 每个
  • 如何使用rest api设置kafka连接auto.offset.reset

    我创建了一个接收器 kafka 连接 将数据转换为其他存储 我想设置auto offset reset as latest当新连接器创建时kafka connect rest api 我已经设定consumer auto offset re
  • kafka 连接 s3 源无法与 Minio 一起使用

    我已经验证了与 minio 的连接 确保凭据工作正常并且可以访问 minio 另外 如果我尝试任何其他值store url http minio 9000我无法保存配置 所以我猜想在可见性方面不存在问题卡夫卡连接容器和minio容器 我不确
  • 调试自定义 Kafka 连接器的简单有效的方法是什么?

    我正在使用几个 Kafka 连接器 在控制台输出中没有看到它们的创建 部署有任何错误 但是我没有得到我正在寻找的结果 没有任何结果 无论是期望的还是否则 我基于 Kafka 的示例 FileStream 连接器制作了这些连接器 因此我的调试
  • 使用 offsets_for_times 从时间戳消费

    尝试使用 confluence kafka AvroConsumer 来消费给定时间戳的消息 if flag creating a list topic partitons to search list map lambda p Topic
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 我的 Kafka 流应用程序刚刚退出,代码为 0,什么也不做

    为了尝试 Kafka 流 我这样做了 public static void main String args final StreamsBuilder builder new StreamsBuilder final Properties
  • 当我们在 Apache Spark 中使用时,无法找到 Set([TOPIC NAME,0])) 的领导者

    我们使用 Apache Spark 1 5 1 和 kafka 2 10 0 8 2 1 以及 Kafka DirectStream API 通过 Spark 从 Kafka 获取数据 我们使用以下设置在 Kafka 中创建了主题 复制因子
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • 如何检测 KTable 连接的哪一侧触发了更新?

    当您在 Kafka 中连接两个表时 每次更新两个 KTable 之一时 您的输出 Ktable 也会更新 想象一下你正在加入Customers与一个列表Orders你已经适当减少了 再次想象一下 您使用此连接的结果来为最终客户提供特别优惠和
  • Spring Kafka - 为任何主题的分区消耗最后 N 条消息

    我正在尝试读取请求的卡夫卡消息数 对于非事务性消息 我们将从 endoffset N 对于 M 个分区 开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息 对于幂等 事务消息 我们必须考虑事务标记 重复消息 这意味着偏移量将不连续 在这

随机推荐

  • 自动移动站点重定向

    我刚刚制作了我的第一个 jQuery 移动网站 我想让使用手机查看我的 完整网站 的人自动转移到 移动网站 如果需要 还可以单击链接查看完整站点 我不知道从哪里开始 有一些我可以使用的 JavaScript 吗 如果您想查看这些网站的外观
  • 在 iframe/对象标签内运行时更新初始路由器 url

    我目前正在容器 主 Vue 应用程序的对象标签 iframe 也可以工作 内渲染 Vue 应用程序 首先 我设置一个文件服务器 为该容器或请求的子应用程序提供服务 以在 div 内呈现 为了简单起见 我将仅显示 Node Express 服
  • Redis Cluster 与 Pub/Sub 中的 ZeroMQ,用于水平扩展的分布式系统

    如果我要设计一个巨大的分布式系统 其吞吐量应随系统中的订阅者数量和通道数量线性扩展 哪个会更好 1 Redis集群 仅适用于Redis 3 0 alpha 如果是集群模式 您可以在一个节点上发布并在另一个完全不同的节点上订阅 消息将传播并到
  • 无法为 Python 3.4 创建工作虚拟环境

    I 安装Python 3 4 2 https docs python org 3 using unix html building python和我的 Linux Mint 17 1 中的 Virtualenv 12 0 5 然后我尝试创建
  • 我是否可以通过第三方支付网关为我的 iPhone 应用程序提供付款服务?

    所以我有一个 RESTful Api 服务 它有免费和付费的东西 任何人都可以利用我们的 API 创建 iPhone Andriod MSPhone 应用程序 不好的类比 假设我们正在为 Steam 创建一个聊天 api 服务 并且您可以为
  • 从通用列表中删除项目

    我有以下方法 我希望从我的收藏中删除与产品 ID 匹配的项目 看起来相当简单 但我有一个例外 基本上我的收藏已经不同步了 那么从集合中删除项目的最佳方法是什么 public void RemoveOrderItem Model Order
  • 在最后修改的区域扩展Jtree?

    我正在使用 dom4j 从 dom4j 文档创建 DocumentTreeModel 我在里面显示这个 DocumentTreeModelJScrollPane 我有一个按钮 可以将新节点添加到 dom4j 文档 并重新创建 Documen
  • 元素不存在,尽管它具有 ID 属性

    在 selenium excel vba 中 我试图了解有关如何处理 CSS 选择器的更多信息 我很想知道 因为在检查带有 ID 的元素并运行代码时 我收到一条消息 指出未找到该元素 这是到目前为止的代码 Private bot As Ne
  • DC-sunburst、dc-Menuslect、dc-Non 交互图

    我是 dc js 的新手 我对 dc 的灵活性有一些疑问 首先 我一直在寻找答案 但还没有找到任何答案 1 我正在使用 dc sunburst 图表 我想知道是否可以创建 Zoomable sunburst 因为 d3 js 实际上就是这种
  • `printf()` 中格式说明符“%qd”的用途是什么?

    我看到格式说明符 qd浏览时github https github com Microsoft clang blob master test Sema format strings c代码 然后我检查了 GCC 编译器 它工作正常 incl
  • recvfrom() 中的 addrlen 字段有何用途?

    我在程序中使用 recvfrom 从我在 src addr 中指定的服务器获取 DGRAM 数据 但是 我不确定为什么需要初始化并传入addrlen 我读了手册页 但不太明白它的意思 如果src addr不为NULL 并且底层协议提供了源地
  • Powershell Bash/Zsh 命令中的多个参数

    无法在 Powershell 中运行以下 Bash Zsh 命令 KeyPath Join Path Path this Plate ChildPath install tekton key kubectl create secret do
  • NSUndoManager 会撤消后台发生的更改吗?

    我有一个编辑视图控制器 我正在使用 NSUndoManager 它是我的持久性存储 核心数据项目 的一组 我的应用程序的功能之一是与外部服务器同步 我想知道的是 如果我正在视图中编辑某些内容 同时应用程序正在与服务器同步 如果我改变主意并决
  • 如何在java中访问USB端口[关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在尝试编写一个java应用程序来访问USB端口以读取和写入通过USB连接的设备 我面临的问题是我不
  • jQuery UI 滑动轻松同级推送

    我正在使用 jQuery UIslide切换 div 的切换效果 link click function targetDiv toggle slide direction up 1000 幻灯片是唯一具有我想要的动画的效果 本质上是 div
  • 我应该使用 和 吗?如果是,为什么以及如何使用?

    我一直在尝试正确使用 colgroup 和 col 标签 但我不明白 我阅读了规范和所有内容 但我不明白其目的或如何实现它 A colgroup用于table元素来帮助理解具有不规则标题的表中复杂的信息层次结构 WAI 有一个关于如何处理此
  • 在运行时更改蓝图或重新加载 Flask 应用程序

    我正在编写一个支持插件架构的 Flask 应用程序 每个插件都位于一个单独的文件夹中 并且是一个模块 该模块至少具有一个类 该类是一个子类Plugin班级 出于安全原因 我不想在 Flask 应用程序最初运行时加载所有插件 相反 用户可以从
  • Three.js 椭圆

    如何在 Three js 中创建一个椭圆 我看过这个 在 THREE js 中绘制椭圆 https stackoverflow com questions 11419896 drawing an ellipse in three js 但如
  • ndb.StructuredProperty 不调用 ndb.PolyModel 子类方法

    在将 ndb Polymodel 超类存储为 ndb StructuredProperty 时 我无法访问子类方法 相反 调用超类方法并引发 NotImplementedError 这是我想要完成的任务的删节版本 class Recipie
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中