kafka 使用python消费consumer

2023-11-13

参考

python kafka 使用
大数据:kafka常见问题
kafka
python之操作kafka

Kafka基本了解

使用python读取consumer中的数据

安装kafka-python

pip install kafka-python

简单使用

import kafka import KafkaConsumer
#消费kafka中最新的数据 并且自动提交offsets[消息的偏移量]
consumer = KafkaConsumer('my-topic',group_id='my-group',bootstrap_servers=['localhost:9092'])
for message in consumer:
    #注意: message ,value都是原始的字节数据,需要decode
    #例如: message.value.decode('utf-8')
    #完成对每条数据中的操作
    print ("%s:%d:%d: key=%s value=%s" %s (message.topic, message.partition,
                                               message.offset, message.key,
                                               message.value))

其他使用

topic="****"
groupid="****"
brokerlist="*:9092,*:9092"

#读取目前可读最早的消息
consumer = KafkaConsumer(topic, auto_offset_reset='earliest', bootstrap_servers=brokerlist)
#获取topic主题的分区信息
consumer.partitions_for_topic(topic)  
#获取主题列表
print consumer.topics()  
#获取当前消费者订阅的主题
print consumer.subscription()  
#获取当前消费者topic、分区信息
print consumer.assignment()  
#获取当前消费者可消费的偏移量
print consumer.beginning_offsets(consumer.assignment()) 
#重置偏移量,从第5个偏移量消费
consumer.seek(TopicPartition(topic=topic, partition=0), 5)
#获取当前主题的最新偏移量
print consumer.position(TopicPartition(topic=u'test', partition=0)) 

#消费多个主题
consumer = KafkaConsumer(bootstrap_servers=brokerlist)
consumer.subscribe(topics=('test','test0'))  #订阅要消费的主题
for message in consumer:
。。。。

#手动拉取消息
while True:
    msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
    print msg
    time.sleep(1)

#消息挂起与恢复
consumer.pause(TopicPartition(topic=u'test', partition=0))
print consumer.paused()   #获取当前挂起的消费者
#处理操作
consumer.resume(TopicPartition(topic=u'test', partition=0))
#pause执行后,consumer不能读取,直到调用resume后恢复。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka 使用python消费consumer 的相关文章

  • 从数据框中按索引删除行

    我有一个数组wrong indexes train其中包含我想从数据框中删除的索引列表 0 63 151 469 1008 要删除这些索引 我正在尝试这样做 df train drop wrong indexes train 但是 代码失败
  • Python - 将宽字符字符串从二进制文件转换为 Python unicode 字符串

    这是漫长的一天 我有点困惑 我正在读取一个包含大量宽字符字符串的二进制文件 我想将它们转储为 Python unicode 字符串 为了解压非字符串数据 我使用 struct 模块 但我不知道如何对字符串执行相同的操作 例如 阅读 系列 一
  • 如何正确地将 MIDI 刻度转换为毫秒?

    我正在尝试将 MIDI 刻度 增量时间转换为毫秒 并且已经找到了一些有用的资源 MIDI Delta 时间刻度到秒 http www lastrayofhope co uk 2009 12 23 midi delta time ticks
  • Python模块可以访问英语词典,包括单词的定义[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我正在寻找一个 python 模块 它可以帮助我从英语词典中获取单词的定义 当然有enchant 这可以帮助我检查该单词是否存在于英语中
  • 如何迭代按值排序的 Python 字典?

    我有一本字典 比如 a 6 b 1 c 2 我想迭代一下by value 不是通过键 换句话说 b 1 c 2 a 6 最直接的方法是什么 sorted dictionary items key lambda x x 1 对于那些讨厌 la
  • 如何使用 Plotly 中的直方图将所有离群值分入一个分箱?

    所以问题是 我可以在 Plotly 中绘制直方图 其中所有大于某个阈值的值都将被分组到一个箱中吗 所需的输出 但使用标准情节Histogram类我只能得到这个输出 import pandas as pd from plotly import
  • 通过列表理解压平列表列表

    我正在尝试使用 python 中的列表理解来展平列表 我的清单有点像 1 2 3 4 5 6 7 8 只是为了打印这个列表列表中的单个项目 我编写了这个函数 def flat listoflist for item in listoflis
  • Django 模型在模板中不可迭代

    我试图迭代模型以获取列表中的第一个图像 但它给了我错误 即模型不可迭代 以下是我的模型和模板的代码 我只需要获取与单个产品相关的列表中的第一个图像 模型 py class Product models Model title models
  • 忽略 Mercurial hook 中的某些 Mercurial 命令

    我有一个像这样的善变钩子 hooks pretxncommit myhook python path to file myhook 代码如下所示 def myhook ui repo kwargs do some stuff 但在我的例子中
  • 对图像块进行多重处理

    我有一个函数必须循环遍历图像的各个像素并计算一些几何形状 此函数需要很长时间才能运行 在 24 兆像素图像上大约需要 5 小时 但似乎应该很容易在多个内核上并行运行 然而 我一生都找不到一个有据可查 解释充分的例子来使用 Multiproc
  • 使用鼻子获取设置中当前测试的名称

    我目前正在使用鼻子编写一些功能测试 我正在测试的库操作目录结构 为了获得可重现的结果 我存储了一个测试目录结构的模板 并在执行测试之前创建该模板的副本 我在测试中执行此操作 setup功能 这确保了我在测试开始时始终具有明确定义的状态 现在
  • 如何从Python中的字符串中提取变量名称和值

    我有一根绳子 data var1 id 12345 name John White python中有没有办法将var1提取为python变量 更具体地说 我对字典变量感兴趣 这样我就可以获得变量的值 id和name python 这是由提供
  • Seaborn Pairplot 图例不显示颜色

    我一直在学习如何在Python中使用seaborn和pairplot 这里的一切似乎都工作正常 但由于某种原因 图例不会显示相关的颜色 我无法找到解决方案 因此如果有人有任何建议 请告诉我 x sns pairplot stats2 hue
  • 将 2D NumPy 数组按元素相乘并求和

    我想知道是否有一种更快的方法 专用 NumPy 函数来执行 2D NumPy 数组的元素乘法 然后对所有元素求和 我目前使用np sum np multiply A B 其中 A B 是相同维度的 NumPy 数组m x n 您可以使用np
  • Python 将日志滚动到变量

    我有一个使用多线程并在服务器后台运行的应用程序 为了无需登录服务器即可监控应用程序 我决定包括Bottle http bottlepy org为了响应一些HTTP端点并报告状态 执行远程关闭等 我还想添加一种查阅日志文件的方法 我可以使用以
  • 使用yield 进行字典理解

    作为一个人为的例子 myset set a b c d mydict item yield join item s for item in myset and list mydict gives as cs bs ds a None b N
  • 默认情况下,Keras 自定义层参数是不可训练的吗?

    我在 Keras 中构建了一个简单的自定义层 并惊讶地发现参数默认情况下未设置为可训练 我可以通过显式设置可训练属性来使其工作 我无法通过查看文档或代码来解释为什么会这样 这是应该的样子还是我做错了什么导致默认情况下参数不可训练 代码 im
  • 当鼠标悬停在上面时,intellisense vscode 不显示参数或文档

    我正在尝试将整个工作流程从 Eclipse 和 Jupyter Notebook 迁移到 VS Code 我安装了 python 扩展 它应该带有 Intellisense 但它只是部分更糟糕 我在输入句点后收到建议 但当将鼠标悬停在其上方
  • Ubuntu 上的 Python 2.7

    我是 Python 新手 正在 Linux 机器 Ubuntu 10 10 上工作 它正在运行 python 2 6 但我想运行 2 7 因为它有我想使用的功能 有人敦促我不要安装 2 7 并将其设置为我的默认 python 我的问题是 如
  • 字典和数组作为类变量与实例变量

    这是赚取积分的简单方法 请解释以下内容 class C a b 0 c def init self self x def d self k v self x k v self a k v self b v self c append v d

随机推荐

  • 【ARM】Linux内核驱动之中断

    作者主页 凉开水白菜 作者简介 共同学习 互相监督 热于分享 多加讨论 一起进步 专栏资料 https gitee com stylle linux code 点赞 收藏 再看 养成习惯 订阅的粉丝可通过PC端文末加我微信 可对文章的内容进
  • Microsoft.Web.WebView2 初体验

    上篇已经介绍了WebView2的背景 今天有时间尝试一下 文档地址 https docs microsoft com zh cn dotnet api microsoft web webview2 winforms webview2 exe
  • Django基于用户画像的电影推荐系统源码(项目源代码)

    一 项目介绍 公众号 yk 坤帝 获取全部源代码 本系统是以Django作为基础框架 采用MTV模式 数据库使用MongoDB MySQL和Redis 以从豆瓣平台爬取的电影数据作为基础数据源 主要基于用户的基本信息和使用操作记录等行为信息
  • 微信分享引导页效果

    span style font size 18px span
  • java的rmi

    写在前面 本文看下如何通过Java原生提供的rmi功能来调用远端JVM对象的方法 并获取其结果 1 定义远端service pojo 使用了lombok Getter Setter ToString public class User im
  • 在较新版pycharm中使用conda虚拟环境的两种方法-保姆级教程

    文章目录 方法一 配置解释器 方法二 命令行直接调用 注意事项 方法一 配置解释器 首先创建new project 之后等待配置索引等文件即可 方法二 命令行直接调用 在终端terminal中直接使用conda activate xxx 但
  • 计算机网络工程毕业设计题目选题大全

    文章目录 0 简介 1 如何选题 2 最新网络工程选题 2 1 Java web SSM 系统 2 2 大数据方向 2 3 人工智能方向 2 4 其他方向 4 最后 0 简介 学长搜集分享最新的网络工程专业毕设毕设选题 难度适中 适合作为毕
  • js复制一个对象的方法,不改变原对象

    复制一个对象 不改变原对象简单方法如下 var obj a 1 b 2 es6新方法 Object assign var newObj Object assign obj es6新方法 扩展运算符 var newObj obj 有个弊端 就
  • 什么是网络编程?

    目录 一 UDP DatagramSocket DatagramPacket 服务器 客户端 二 TCP ServerSocket Socke 服务器 客户端 网络编程指的就是网络上的主机通过不同的进程 以编程的方式实现网络信息传输 而提到
  • CSDN如何调节成黑色主题,手把手教学

    今天师弟来问我 有没有把浏览器调节成黑色主题的东东 每天看网页很久 白色太刺眼了 作为未来的新晋程序员 也显得不专业 想到自己有个插件 就分享了出来 获得好评 有需求的地方就有分享 首先上效果图 随便打开一篇文章看一看 教程 首先 要用谷歌
  • 5G技术详解系列-PDU会话签约数据(6)

    相关文章会在公众号同步更新 公众号 5G通信大家学 持续更新的相关5G内容都是直接根据3GPP整理 保证更新内容的准确性 避免通过二手 甚至多手的资料 以讹传讹误导网友 在介绍完流程详解后 会整理专题内容 比如切片 服务发现 QoS流端到端
  • 我的世界java版怎么加整合包_我的世界Minecraft Mod(模组)安装指南

    前言 Mod的安装方法主要分为核心Jar文件手动覆盖安装和使用Forge加载 现在大多数的Mod基本都是依赖于Forge来加载Mod 不过对于刚接触我的世界的玩家来说 在安装Mod的时候也是一头雾水 导致安装Mod后 出现诸如游戏崩溃 黑屏
  • 【从零开始的Java开发】1-6-2 泛型:概述、泛型作为方法参数、自定义泛型、自定义泛型方法

    文章目录 泛型概述 泛型作为方法参数 自定义泛型 一个参数 两个参数 自定义泛型方法 总结 泛型概述 为什么要有泛型 在Java增加泛型之前 泛型程序设计使用继承来实现 坏处 需要强制转换 可向集合中添加任意类型的对象 存在风险 泛型的使用
  • 主干光缆线路的组网结构

    主干光缆是指连接主干光交与业务汇聚节点 以及主干光交之间的光缆 业务汇聚点指安装了OLT设备的节点 主干光交内的业务端口与业务汇聚点ODF间的光纤链路部分或全部是直连的 中间没有活动连接 主干光缆线路的组网结构一般分为 环形 树形和星形 1
  • 嵌入式Linux设备读取CPU温度的方法

    1 ARM 平台下 cat sys devices virtual thermal thermal zone0 temp62374 cat sys class thermal thermal zone0 temp 64036x86 平台下
  • vue项目配置rem移动端适配

    一 项目介绍 脚手架CLI vue cli Vue版本 2 6 11 移动UI组件库 Vant 2 10 14 CSS预处理器 sass 二 配置lib flexible插件 下载插件 npm i D lib flexible 导入 在sr
  • 据说,80%的人都搞不懂哈希算法 区块链 哈希算法

    本文约9000字 阅读 观看 需要52分钟 聊到区块链的时候也少不了会听到 哈希 哈希函数 哈希算法 是不是听得一头雾水 别急 这一讲我们来讲讲什么是哈希算法 哈希是一种加密算法 哈希函数 Hash Function 也称为散列函数或杂凑函
  • 车辆贷款违约预测

    1 案例介绍 国内某贷款机构的车贷业务面临借款人拖欠还款或拒不还款 导致该机构的不良贷款率居高不下的问题 该机构将部分贷款数据开放 诚邀大家帮助他们建立风险识别模型来预测可能违约的借款人 敏感信息已脱敏 给定某机构实际业务中的相关借款人信息
  • 数据库的4种隔离级别

    数据库事务的隔离级别有4种 由低到高分别为Read uncommitted Read committed Repeatable read Serializable 而且 在事务的并发操作中可能会出现脏读 不可重复读 幻读 下面通过事例一一阐
  • kafka 使用python消费consumer

    参考 python kafka 使用 大数据 kafka常见问题 kafka python之操作kafka Kafka基本了解 使用python读取consumer中的数据 安装kafka python pip install kafka