python-kafka多线程快速读取consumer消费者数据,同时使用批读取与无限流读取改进

2023-11-15

python单线程循环读取consumer会很浪费时间,而且速率远远低于生产者可容纳的速率,因此我们使用多线程来处理IO密集型的读取操作

极简的示例

我们直接上一个极简示例,没有任何花里胡哨的部分:

1. 生产者(先运行)

先运行生产者,再运行消费者部分哈:

from kafka import KafkaProducer
import datetime
import json

# 启动生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
my_topic = "python_test"

for i in range(100):
    data = {'num': i, 'data': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    producer.send(my_topic, json.dumps(data).encode('utf-8')).get(timeout=30)

2. 消费者部分多线程读取

from kafka import KafkaConsumer
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from kafka.structs import TopicPartition


class MultiThreadKafka(object):

    def __init__(self):
        self.seek = 0  # 偏移量

    def operate(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
        tp = TopicPartition("python_test", 0)
        consumer.assign([tp])
        for i in range(10):
            consumer.seek(tp, self.seek)
            self.seek += 1
            consumer_data = next(consumer)
            print(threading.current_thread().name) # 打印线程名
            print(consumer_data) # 打印
            time.sleep(1)

    def main(self):
        thread_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="threading_") # 我们使用线程池统一管理线程
        for i in range(4):
            thread_pool.submit(self.operate, ) # 把四个线程全部投入读取数据的部分


if __name__ == '__main__':
    thread_kafka = MultiThreadKafka()
    thread_kafka.main()

打印效果如下,我们可以看到每个线程都在运行,而且根据offset可以发现没有重复读取的问题,因此我们可以放心的使用这个模板来进行数据读取

threading__0
threading__3
ConsumerRecord(topic='python_test', partition=0, offset=1, timestamp=1646103355117, timestamp_type=0, key=None, value=b'{"num": 1, "ts": "2022-03-01 10:55:55"}', headers=[], checksum=1621741028, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=3, timestamp=1646103355120, timestamp_type=0, key=None, value=b'{"num": 3, "ts": "2022-03-01 10:55:55"}', headers=[], checksum=3977437705, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1)
threading__2
ConsumerRecord(topic='python_test', partition=0, offset=0, timestamp=1646103355013, timestamp_type=0, key=None, value=b'{"num": 0, "ts": "2022-03-01 10:55:55"}', headers=[], checksum=4023108697, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1)
threading__1
ConsumerRecord(topic='python_test', partition=0, offset=2, timestamp=1646103355119, timestamp_type=0, key=None, value=b'{"num": 2, "ts": "2022-03-01 10:55:55"}', headers=[], checksum=149040431, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1)
threading__0
ConsumerRecord(topic='python_test', partition=0, offset=4, timestamp=1646103355121, timestamp_type=0, key=None, value=b'{"num": 4, "ts": "2022-03-01 10:55:55"}', headers=[], checksum=4162098402, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1)
threading__3
threading__2
ConsumerRecord(topic='python_test', partition=0, offset=6, timestamp=1646473931267, timestamp_type=0, key=None, value=b'{"num": 1, "data": "2022-03-05 17:52:11"}', headers=[], checksum=2205992463, serialized_key_size=-1, serialized_value_size=41, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=5, timestamp=1646473931131, timestamp_type=0, key=None, value=b'{"num": 0, "data": "2022-03-05 17:52:11"}', headers=[], checksum=13992341, serialized_key_size=-1, serialized_value_size=41, serialized_header_size=-1)
threading__1
ConsumerRecord(topic='python_test', partition=0, offset=7, timestamp=1646473931268, timestamp_type=0, key=None, value=b'{"num": 2, "data": "2022-03-05 17:52:11"}', headers=[], checksum=4062066255, serialized_key_size=-1, serialized_value_size=41, serialized_header_size=-1)
threading__0
ConsumerRecord(topic='python_test', partition=0, offset=8, timestamp=1646473931269, timestamp_type=0, key=None, value=b'{"num": 3, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1088274253, serialized_key_size=-1, serialized_value_size=41, serialized_header_size=-1)
....

消费者改进 1:批次读取,并将读取到的数据返回

上面的程序是打印结果,但是我们往往不需要打印出来,而是通过一个函数获取消费端的数据:

from kafka import KafkaConsumer
from concurrent.futures import ThreadPoolExecutor, as_completed
from kafka.structs import TopicPartition


class MultiThreadKafka(object):

    def __init__(self):
        self.seek = 0  # 偏移量
        self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
        self.tp = TopicPartition("python_test", 0)
        self.consumer.assign([self.tp])

    def operate(self):
        data_list = []
        for i in range(3):  # 每个批次读取3条数据
            self.consumer.seek(self.tp, self.seek)
            self.seek += 1
            consumer_data = next(self.consumer)
            consumer_value: dict = eval(consumer_data.value.decode("utf-8"))  # 这里把核心数据提取出来
            data_list.append(consumer_value)
        return data_list  # 读取3个数据了

    def main(self):
        thread_pool = ThreadPoolExecutor(max_workers=4)
        thread_mission_list = []
        for i in range(20):
            run_thread = thread_pool.submit(self.operate)
            thread_mission_list.append(run_thread)
        for mission in as_completed(thread_mission_list):  # 这里会等待线程执行完毕,先完成的会先显示出来
            yield mission.result() # 获取线程执行的结果,打包成一个迭代器返回


if __name__ == '__main__':
    thread_kafka = MultiThreadKafka()
    kafka_data_generator = thread_kafka.main()  # 迭代器
    for i in kafka_data_generator: # 这里可以打印出结果,每个批次的值都附在了这个i上
        print(i)

这样所有的批处理数据都会保存在变量i

消费者改进 2:无限读取kafka数据

main函数中使用while True即可:

from kafka import KafkaConsumer
from concurrent.futures import ThreadPoolExecutor, as_completed
from kafka.structs import TopicPartition


class MultiThreadKafka(object):

    def __init__(self):
        self.seek = 0  # 偏移量
        self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
        self.tp = TopicPartition("python_test", 0)
        self.consumer.assign([self.tp])

    def operate(self):
        data_list = []
        for i in range(3):  # 每个批次读取3条数据
            self.consumer.seek(self.tp, self.seek)
            self.seek += 1
            consumer_data = next(self.consumer)
            consumer_value: dict = eval(consumer_data.value.decode("utf-8"))  # 这里把核心数据提取出来
            data_list.append(consumer_value)
        return data_list  # 读取3个数据了

    def main(self):
        thread_pool = ThreadPoolExecutor(max_workers=4)
        thread_mission_list = []
        while True: # 这里无限循环,就可以无限读取
            for i in range(5):
                run_thread = thread_pool.submit(self.operate)
                thread_mission_list.append(run_thread)
            for mission in as_completed(thread_mission_list):  # 这里会等待线程执行完毕,先完成的会先显示出来
                yield mission.result()


if __name__ == '__main__':
    thread_kafka = MultiThreadKafka()
    kafka_data_generator = thread_kafka.main()  # 迭代器
    for i in kafka_data_generator:
        print(i)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

python-kafka多线程快速读取consumer消费者数据,同时使用批读取与无限流读取改进 的相关文章

  • 指定的可执行文件不是该操作系统平台的有效应用程序。

    我不断收到错误消息System ComponentModel Win32Exception The specified executable is not a valid application for this OS platform 当
  • Conda 无法在 Powershell 中激活

    我已经在我的 Windows 10 笔记本电脑上安装了 anaconda 我正在尝试激活名为的Python环境pyenv 首先 我检查conda env list在我的笔记本电脑中 这是 powershell 上的输出 PS C Users
  • @monthly cron 作业不可靠

    我们的客户希望我们每月创建一份报告 过去 我们使用 monthly cron 作业来完成此任务 但这并不可靠 服务器可能会在这一分钟内宕机 Cron 不会重新运行这些作业 如果服务器已启动 此时数据库可能无法访问 如果服务器已启动且数据库已
  • Python 包?

    好吧 我认为无论我做错了什么 它可能都是显而易见的 但我无法弄清楚 我已经阅读并重新阅读了有关包的教程部分 我唯一能想到的是这不起作用 因为我直接执行它 这是目录设置 eulerproject init py euler1 py euler
  • Ttk Treeview:跟踪键盘选择

    这是一个带有 ttk 树视图的 Tk 小部件 当用户单击该行时 会执行某些功能 此处仅打印项目文本 我需要的是以下内容 最初的重点是文本输入 当用户按下 Tab 键时 焦点应该转到第一行 并且应该执行绑定到 Click 事件的函数 当用户使
  • bash双括号问题

    我对 bash 脚本非常陌生 在使用双括号时遇到了问题 我似乎无法让它们在 Ubuntu Server 11 10 中工作 我的下面的脚本位于 if test sh 中 bin bash if 14 14 then echo FOO fi
  • Python ttk.combobox 强制发布/打开

    我正在尝试扩展 ttk 组合框类以允许自动建议 我到目前为止的代码运行良好 但我想让它在输入一些文本后显示下拉列表 而不从小部件的输入部分移除焦点 我正在努力解决的部分是找到一种强制下拉的方法 在 python 文档中我找不到任何提及这一点
  • 使用 string.whitespace 删除 Python 中的空格

    Python 的 string whitespace 很棒 gt gt gt string whitespace t n x0b x0c r 如何在不手动输入 t n 等正则表达式的情况下将其与字符串一起使用 例如 它应该能够转动 请不要伤
  • 如何判断Python对象是否是字符串?

    如何检查 Python 对象是否是字符串 常规字符串或 Unicode Python 2 Use isinstance obj basestring 对于要测试的对象obj Docs https docs python org 2 7 li
  • Python Pandas:将参数传递给 agg() 中的函数

    我试图通过使用不同类型的函数和参数值来减少 pandas 数据框中的数据 但是 我无法更改聚合函数中的默认参数 这是一个例子 gt gt gt df pd DataFrame x 1 np nan 2 1 y a a b b gt gt g
  • 如何在python中合并具有相同键的嵌套字典

    我有一个这样的数据结构 SNAPSHOT SnapshotVersion 304 SNAPSHOT SnapshotCreationDate 2015 06 21 17 33 41 CafeData CafeVersion 2807 Caf
  • 使用 Python gdata 和 oAuth 2 对日历进行身份验证

    我正在将一个 Python 应用程序从 oAuth 1 迁移到 oAuth 2 该应用程序读取用户的 Google 日历提要 使用 oAuth 1 如果用户可以使用他的 GMail 进行身份验证 我的应用程序将打开浏览器 帐户并授权访问 我
  • 无法运行bjam编译boost python教程

    我正在尝试跟随本教程 http www boost org doc libs 1 55 0 libs python doc tutorial doc html python hello html关于为 Windows 的 python 包装
  • 解释 scipy.stats.entropy 值

    我正在尝试使用scipy stats 熵来估计库尔巴克 莱布勒 KL 两个分布之间的散度 更具体地说 我想使用 KL 作为衡量标准来确定两个分布的一致性 但是 我无法解释 KL 值 例如 t1 numpy random normal 2 5
  • Python:多重分配与单独分配速度

    我一直在寻求从我的代码中挤出更多的性能 最近 在浏览时这个 Python 维基页面 https wiki python org moin PythonSpeed 我发现了这个说法 多重分配比单独分配慢 例如 x y a b 比 x a y
  • Twitter 不再使用请求库 python

    我有一个 python 函数 它使用 requests 库和 BeautifulSoup 来抓取特定用户的推文 import requests from bs4 import BeautifulSoup contents requests
  • numpy 中的分层抽样

    在 numpy 中我有一个这样的数据集 前两列是索引 我可以通过索引将数据集分成多个块 即第一个块是 0 0 第二个块是 0 1 第三个块 0 2 然后是 1 0 1 1 1 2 等等 每个块至少有两个元素 索引列中的数字可能会有所不同 我
  • 开始使用 Python 在 CSV 的特定行上读写

    我有一个 CSV 文件 如下所示 COL A COL B 12345 A 1 B 2 C 3 如何读取该文件并将其写回新文件 但只写第二行 行 我希望输出文件包含 12345 A 1 B 2 C 3 Thanks 下面读取您的 csv 提取
  • 如何在Python中从stdin中逐行读取

    每个人都知道如何在 C 中计算 STDIN 中的字符 但是 当我尝试在 python3 中执行此操作时 我发现这是一个难题 计数器 py import sys chrCounter 0 for line in sys stdin readl
  • Python正则表达式:如何用不同的值替换出现的每个实例?

    假设我有这个字符串 s blah blah blah 使用Python正则表达式 如何用不同的值替换 blah 的每个实例 例如 我有一个值列表v 1 2 3 你可以使用re sub打回来 http docs python org libr

随机推荐

  • 更换持续集成工具,从 Travis 到 Github Actions

    我真傻 真的 单单受文档的推荐就选择了 Travis 作为部分项目的持续集成工具 没有料到它早已于 2020 年 12 月更换了免费政策 不再为开源项目提供免费的用于持续集成使用的 Credits 了 当赠送的 10000 个点数用完 就需
  • 【踩坑经历】Java Long 类型传给前端损失精度的问题

    最近在做一个 SpringBoot Vue 的项目 持久层框架用的是 MyBatis Plus 然后遇到了一个问题 一起来看下怎么回事 这个项目就是一个文章收藏器 可以收藏一些技术文章 然后可以选择星标 以便查找这篇文章 那么点击星标的按钮
  • 服务器的tomcat调优和jvm调化

    下面讲述的是tomcat的优化 及jvm的优化 Tomcat 的缺省配置是不能稳定长期运行的 也就是不适合生产环境 它会死机 让你不断重新启动 甚至在午夜时分唤醒你 对于操作系统优化来说 是尽可能的增大可使用的内存容量 提高CPU 的频率
  • 操作系统12----进程间通信IPC

    进程间通信IPC 1 进程通信 IPC Inter Process Communication 1 1直接通信 1 2间接通信 1 3阻塞通信 1 4非阻塞通信 2 信号 Signal 3 管道 pipe 4 消息队列 5 共享内存 1 进
  • 基于面板数据的熵值法介绍与实现

    熵值法是一种基于信息熵理论的客观赋值方法 即数据越离散 所含信息量越多 对综合评价影响越大 目录 一 基于面板数据熵值法介绍 二 R语言实现 参考文献 一 基于面板数据熵值法介绍 传统的熵值法有个弊端 只能针对于截面数据 即根据某一年 k
  • MySQL创建表时提示:1067 - Invalid default value for ‘sex‘

    问题 在创建表的时候如果有中文 则会提示 1067 Invalid default value for sex 比如 创建信息表 create table userInfo card id int primary key auto incr
  • unity 内嵌网页简单流程(3D WebView 3.14.1)

    我是用于 web 平台 特此记录 3D WebView 主要实现在unity 中制作网页浏览器 可使用平台 很强大 其他类似插件都有平台缺陷 Android iOS UWP Hololens Windows macOS WebGL 0 插件
  • 制造行业主数据同步集成

    主数据是描述企业核心业务实体的数据 是企业核心业务的主要构成 各个订单 合同以及业务的主体 在企业内部被重复 共享应用的数据 主数据跨越企业各个业务部门以及各类业务系统 是应用系统间数据交互的基础 近期一直北方某制造业进行主数据治理工作 谈
  • React Router源码解析

    虽然React Router已经到了V6版本了 但在我们项目中 目前主要用的还是React Router的V5版本 所以此处我们从V5版本着手 去了解前端路由的实现原理 目标 希望收获 前端路由的基本原理 React Router 的实现原
  • Scanner的.next()以及.nextLine()各自代表什么意思

    String str new Scanner System in nextLine String str new Scanner System in next next 方法一次读取一个无间隔子串 比如 TAB 空格 回车符 的时候 会终止
  • Chromium OS 初体验

    Chromium OS可是早有耳闻 但是一直没有尝试 最近很多评论甚至认为会对Windows和Mac都能够造成压力 于是迫不及待的想尝试一下了 百度下了官网 官网很贴心 不光给了用于写入U盘的镜像文件 最初是针对上网本的 所以自然不是刻录成
  • Python 基础知识6 字典

    字典 定义字典 d key1 22 key2 meng print d print type d 访问字典里的值 dict Name Runoob Age 7 Class First print dict Name dict Name pr
  • 在Unity中编写单元测试

    最近在我忙于我的最新项目时 我一直在思考 我如何能单元测试代码 我知道如果我先把它搁一边 在编写一大段游戏代码后 我可能再也不会回头来写测试了 编写单元测试对我有两个挑战 首先 游戏不同于其他类型的软件 没有好的代码分段来处理好输入 以及图
  • Hello World

    编写思路 创建 Java 源文件 将源文件编译为 class 文件 运行 class 文件 编写代码 代码块 我的第一个 Java 程序 class 类 Java 程序基本组成单位 HelloWorld 为类的名称 public class
  • 企业微信自建内部应用Demo源码,附在线Demo及视频讲解,创建测试公司及测试应用简单配置即可使用

    自建内部应用Demo源码 前端vite vuejs https github com liyuexi qywx vuejs qywx vuejs 企业微信自建内部应用demo源码前端vite vuejs https github com l
  • 最简单的实现[三栏布局中间自适应]方法

    一 float margin 左盒子 左浮动 右盒子 右浮动 中间盒子 左右加margin 注意 盒子的书写顺序是左右中 div class container div class left w div div class right w
  • docker 安装 nginx1.23.2

    注意 nginx 不能提前挂载配置 html 目录可以提前挂载 但提前挂载了访问默认nginx页面就没有了 部署前端时可提前挂载 所以我们先提前创建副本 1 提前创建挂载文件 创建容器副本 主要作用与获取配置文件 先创建一个没有的nginx
  • 错误隐藏学习手记(五)

    在H 264的研究中 我们可以看到目前有三种开源编码 很多测试都是在一个开源编码中实现的 这个开源编码就叫做 JM86 在这里呢具体介绍一下这三种开源编码 一 三大开源编码器介绍 1 JM H 264的官方测试源码 由德国hhi研究所负责开
  • oracle提高数据移植速度.

    author skatetime 2008 07 21 提高数据移植速度 序列也能影响数据迁移的速度 1 测试表 test skate1 SQL gt select count from test skate1 COUNT 5841920
  • python-kafka多线程快速读取consumer消费者数据,同时使用批读取与无限流读取改进

    python单线程循环读取consumer会很浪费时间 而且速率远远低于生产者可容纳的速率 因此我们使用多线程来处理IO密集型的读取操作 文章目录 极简的示例 1 生产者 先运行 2 消费者部分多线程读取 消费者改进 1 批次读取 并将读取