Kafka拉取某一个时间段內的消息

2023-11-10

一般来说我们都使用Kafka来记录用户的操作记录以便后续分析。
但是通常使用的时候需要按天来统计每天的去重用户数、点击量之类的。
这个时候如果直接拉某个topic的数据的话,就需要判断每个消息的时间戳,还要兼顾把所有的Partition都拉完才能保证数据的完整。
因此如果能只拉取某一个时间段内的消息,就能极大的简化后续的处理逻辑。

拉取时段内消息实现

为了实现这个目的借助于根据时间戳获取Partition内部偏移的方法,获取两个时间点在Partition内部的偏移,然后从第一个时间点的偏移开始拉取指定Partition的消息,当偏移超过第二个时间点的偏移的时候取消订阅。逐个partition操作拉全topic所有的数据。

实验例子,python+confluence kafka
具体代码如下:

#coding=utf8

from confluent_kafka import Consumer, KafkaError, TopicPartition, Message
import datetime

conf = {
  'bootstrap.servers': 'xxx',
  'group.id': 'xxx',
  'session.timeout.ms': 6000,
  'security.protocol': 'SASL_PLAINTEXT',
  'sasl.mechanism' : 'PLAIN',
  'sasl.username': 'xxx',
  'sasl.password': 'xxx',
  'auto.offset.reset': 'earliest'
}

topic = 'topic'

consumer = Consumer(conf)

# 拉取昨天一天的数据,start_time、end_time这两个时间可以随便设置
now = datetime.datetime.now() - datetime.timedelta(days=1)
start_time = datetime.datetime.strptime(now.strftime('%Y-%m-%d 00:00:00'),'%Y-%m-%d %H:%M:%S')
end_time = datetime.datetime.strptime(now.strftime('%Y-%m-%d 23:59:59'),'%Y-%m-%d %H:%M:%S')

# 5 是partition的数量
for index in range(5):
  # 查询开始时间的针对于某个partition的偏移
  start_tps = [TopicPartition(topic, index, int(start_time.timestamp() * 1000))]
  start_offset = consumer.offsets_for_times(start_tps)
  # 查询结束时间的针对于某个partition的偏移
  end_tps = [TopicPartition(topic, index, int(end_time.timestamp() * 1000))]
  end_offset = consumer.offsets_for_times(end_tps)
  # 从拉取指定partition的offset开始拉取数据
  consumer.assign(start_offset)

  while True:
    try:
      msg = consumer.poll(1.0)
      if msg == None:
        break

      offset = msg.offset()
      if offset > end_offset[0].offset:
        # 如果超过当前partition的偏移之后不再继续订阅当前的topic
        consumer.unassign()
        break

      pass
    except:
      pass
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka拉取某一个时间段內的消息 的相关文章

随机推荐

  • Qt之美(一):D指针/私有实现

    The English version is available at http xizhizhu blogspot com 2010 11 beauty of qt 1 d pointer private html 相信不少刚开始阅读Qt
  • 性能测试持续集成 CICD:JMeter+Jenkins+Ant+jmx

    Java JDK C Users Tommy gt java version java version 1 8 0 341 Java TM SE Runtime Environment build 1 8 0 341 b10 Java Ho
  • Ps如何制作动态图片

    制作动态图片 按操作慢慢来 下面是我们要使用的图片 0 首先我们新建文件 宽 500px 高 500px 1 之后我们简单的设计一下画面 美观一下 需要用的字也先一下 我的比较丑 2 之后重点来了 重点来了 重点来了 从菜单工具 gt 窗口
  • 大数据:频繁项集

    大数据 频繁项集 下面是我 下面是阅读 大数据 互联网大规模数据挖掘与分布式处理 一书第六章笔记 详细请见该书所述 1 购物篮数据 项与购物篮 多对多的关系 项存放于购物篮
  • Book I-IV of Power

    复杂度1 5 机密度3 5 最后更新2021 04 24 任何CPU都有自己的及相关的规范 这些规范用来协调跨公司的软硬件开发者 使用者 共同建设围绕该CPU的软硬件生态体系 Power CPU是IBM所有CPU最终集大成者 从最早的RIS
  • 线性代数(4)——特征值与二次型

  • Realtime_Multi-Person_Pose_Estimation训练问题

    https blog csdn net kkae8643150 article details 102711101 前言 最近在研究Realtime Multi Person Pose Estimation的训练和再训练的过程 参考 htt
  • element -ui table表格内容无限滚动 使用插件vue-seamless-scroll

    使用插件 一 安装组件依赖 npm install vue seamless scroll 二 引入组件 import vueSeamlessScroll from vue seamless scroll components vueSea
  • csdn积分获取攻略

    下载积分攻略 1 个人设置里进行手机绑定CSDN账户 奖励50分 右上角设置 账户安全 手机绑定 2 完成任务送若干分积分 http task csdn net 3 上传有效资源获取积分 上传非法 广告资源用户 将被扣除一定积分 严重者封号
  • matplotlib 画图总结

    1 图片基本设置 import matplotlib pyplot as plt 图片尺寸 plt figure width height 方式1 plt rcParams figure figuresize width height 方式
  • 导入spacy时报错OSError: [E050] Can‘t find model ‘en‘. It doesn‘t seem to be a shortcut link,

    报错如下 File home muli local lib python3 6 site packages spacy util py line 175 in load model raise IOError Errors E050 for
  • element-UI使用el-select做字典映射时label值不显示问题

    问题描述 在使用elementUI的el select组件时做了字典影射 但是在选择option选项后选择框内并没有选中的值出现 这是通过调试发现被绑定的值已经改变 进行别的操作更新完dom后发现选项更新 操作 点击选择test选项 此处是
  • 简单了解YOLOv8

    简单介绍YOLOv8 这里主要关注模型的backbone和后处理的过程 并通过对比YOLOv5的架构来更深入的了解YOLOv8 模型框架 YOLOv5中的C3替换为更精简的C2f 即增加了更多的跳跃连接和split操作 Backbone 中
  • uniapp 自定义标题情况下,让标题和右侧胶囊对齐

    实现效果 无论手机类型怎么切换 自定义标题始终跟胶囊平齐 实现 在pages json文件中配置标题自定义 在index vue页面 编写自定义的标题内容 在onLoad里可以计算高度
  • 【深度学习】入门理解ResNet和他的小姨子们(三)---ResNeXt

    文章名称 Aggregated Residual Transformations for Deep Neural Networks 文章链接 https arxiv org abs 1611 05431 其实ResNeXt这个网络结构严格说
  • 大规模流量下的云边端一体化流量调度体系

    火山引擎是字节跳动旗下的云服务平台 将字节跳动快速发展过程中积累的增长方法 技术能力和工具开放给外部企业 提供云基础 视频与内容分发 数智平台VeDI 人工智能 开发与运维等服务 帮助企业在数字化升级中实现持续增长 LiveVideoSta
  • 构建领域驱动的Java应用

    引言 在现代软件开发中 设计和构建复杂的应用程序是一项充满挑战的任务 为了更好地满足业务需求和提供可维护的代码 软件开发者需要采用一些强大的工具和技术 领域驱动设计 Domain Driven Design 简称DDD 是一种优秀的方法 它
  • Codeforces 1210 D Konrad and Company Evaluation —— 暴力

    This way 题意 现在有n个人 第i个人的工资一开始是i 现在有一些人相互讨厌 然后如果第x个人和第y个人相互讨厌 并且x的工资比y高 那么x就会向y炫耀 x y z这三个人的组合是危险的 当x会向y炫耀 y会向z炫耀 每次修改一个人
  • 用户消费行为分析

    消费品用户行为分析 根据CDNOW的一段用户订单数据进行消费行为分析 CDNow是一家在线音乐零售平台 后被德国波泰尔斯曼娱乐集团公司出资收购 其资产总价值在最辉煌时曾超过10亿美元 下面主要通过分析CDNow网站的用户购买明细来分析该网站
  • Kafka拉取某一个时间段內的消息

    一般来说我们都使用Kafka来记录用户的操作记录以便后续分析 但是通常使用的时候需要按天来统计每天的去重用户数 点击量之类的 这个时候如果直接拉某个topic的数据的话 就需要判断每个消息的时间戳 还要兼顾把所有的Partition都拉完才