python 自建kafka消息生成和消费小工具

2023-11-17

要将 Kafka 的消息生产和消费转换为 API 接口,我们可以使用 Python 的 Web 框架。其中 Flask 是一个轻量级且易于使用的选择。下面是一个简单的例子,使用 Flask 创建 API 来生成和消费 Kafka 消息。

1. 安装所需的库

pip install kafka-python flask

2. 创建 Flask API

from flask import Flask, request, jsonify
from kafka import KafkaProducer, KafkaConsumer

app = Flask(__name__)

# 配置 Kafka
KAFKA_BROKER_URL = 'localhost:9092'
TOPIC = 'test_topic'
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL)


@app.route('/send', methods=['POST'])
def send_message():
    message = request.json.get('message')
    if message:
        producer.send(TOPIC, value=message.encode('utf-8'))
        return jsonify({"status": "success", "message": "Message sent!"}), 200
    else:
        return jsonify({"status": "error", "message": "Message cannot be empty!"}), 400


@app.route('/receive', methods=['GET'])
def receive_message():
    consumer = KafkaConsumer(TOPIC, bootstrap_servers=KAFKA_BROKER_URL, auto_offset_reset='earliest')
    messages = []
    for message in consumer:
        messages.append(message.value.decode('utf-8'))
        if len(messages) > 5:  # 只收集最近的5条消息,可以根据需要调整
            break
    return jsonify(messages)


if __name__ == '__main__':
    app.run(debug=True, port=5000)

这个 Flask 应用程序定义了两个端点:

  • /send: 它接受 POST 请求并发送消息到 Kafka。
  • /receive: 它返回 Kafka 主题中的最近消息。

3. 使用 API

  • 发送消息:
curl -X POST http://localhost:5000/send -H "Content-Type: application/json" -d '{"message": "Hello, Kafka!"}'
  • 接收消息:
curl http://localhost:5000/receive

这只是一个简单的示例,您可能需要添加错误处理、日志记录、认证、消息序列化和反序列化等功能,以满足更复杂的需求。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

python 自建kafka消息生成和消费小工具 的相关文章

随机推荐

  • WPF CommunityToolkit.Mvvm

    文章目录 前言 Toolkit Nuget安装 简单使用 SetProperty 通知更新 RealyCommand CanExecute 新功能 代码生成器 ObservableProperty NotifyCanExecuteChang
  • eclipse的下载及安装教程

    第一步 从官网下载eclipse 进行安装 下载链接https www eclipse org downloads packages 根据自己的电脑和需求去选择对应的版本 点击download 进行下载 然后会跳转到一个打赏页面 无需理会
  • You must set CMAKE_CUDA_ARCHITECTURES to e.g. ‘native‘, ‘all-major‘, ‘70‘,

    You must set CMAKE CUDA ARCHITECTURES to e g native all major 70 cmake 报错 CMake Error at CMakeLists txt 255 message You
  • Python并发编程之多线程

    前言 本文介绍并发编程中另一个重要的知识 线程 线程介绍 我们知道一个程序的运行过程是一个进程 在操作系统中每个进程都有一个地址空间 而且每个进程默认有一个控制线程 打个比方 在一个车间中有很多原材料通过流水线加工产品 而线程就是这个车间中
  • 小程序代码体积优化

    微信小程序在发布的时候 对提交的代码有 2 MB 大小的限制 开发之前就需要提前有个心理准备 由于我也是第一次做小程序开发代码大小就超过了2MB 开发者工具都无法预览了 这就很尴尬了 我自己的优化代码积的方式也不多 如果你有更好的方法 可以
  • linux分区满了,如何进行扩容

    图片中可以看到挂载点 的利用率移到100 空间不够 所以要对其进行分区 1 先进入虚拟机设置里增大磁盘空间 注意 将25改成50 以扩大空间 这里一定要写比25大的数 因为他是 增加到 50GB 而不是 增加了25GB 2 下图可以看到 硬
  • 统计学习方法(九)EM算法

    参考博客 https www cnblogs com bigmoyan p 4550375 html https en wikipedia org wiki Expectation E2 80 93maximization algorith
  • 【网络编程】协议定制+Json序列化与反序列化

    需要云服务器等云产品来学习Linux的同学可以移步 gt 腾讯云 lt gt 阿里云 lt gt 华为云 lt 官网 轻量型云服务器低至112元 年 新用户首次下单享超低折扣 目录 一 序列化与反序列化的概念 二 自定义协议设计一个网络计算
  • 软件测试重点

    第1章 软件测试概述 什么是软件测试 广义的概念 指软件生存周期中所有的检查 评审和确认工作 其中包括了对分析 设计阶段 以及完成开发后维护阶段的各类文档 代码的审查和确认 狭义概念 识别软件缺陷的过程 即实际结果与预期结果的不一致 软件测
  • 二项分布和泊松分布,二者的关系

    离散型随机变量中 经典的两个分布为二项分布和泊松分布 二项分布的定义 泊松分布的定义 注意 一 对泊松分布定义的右边式子 对k 0 1 2 求和的结果为1 即所有事件的概率之和为1 这可以从我们熟知的公式 e k 0 kk begin eq
  • 毕业设计-基于深度学习的人脸识别方法

    目录 前言 课题背景和意义 实现技术思路 一 人脸识别介绍 二 基于深度学习的人脸识别方法 实现效果图样例 最后 前言 大四是整个大学期间最忙碌的时光 一边要忙着备考或实习为毕业后面临的就业升学做准备 一边要为毕业设计耗费大量精力 近几年各
  • Pytorch——关于模型调用(推理)时是否需要GPU的理解

    如果模型在训练的时候 是纯CPU训练的 那么调用 推理 时 就只CPU参与 如果模型在训练的时候 是用到了GPU训练的 那么调用 推理 时 就CPU GPU都会参与
  • 【教程】Pytorch DDP 分布式训练详解

    Pytorch 分布式训练 知乎 Getting Started with Distributed Data Parallel PyTorch Tutorials 2 0 1 cu117 documentation
  • SpringBoot系列十二:SpringBoot整合 Shiro

    1 概念 SpringBoot 整合 Shiro 2 具体内容 Shiro 是现在最为流行的权限认证开发框架 与它起名的只有最初的 SpringSecurity 这个开发框架非常不好用 但是千万不要 以为 SpringSecurity 没有
  • Java程序控制语句

    1 Java程序控制语句 程序控制可以定义为对程序语句执行的顺序进行的规定 有三种结构 1 1顺序结构 1 2分支语句 条件分支语句 switch case语句 单分支条件语句 二分之条件语句 嵌套条件语句 多分支条件语句 if 条件表达式
  • 不要再狂按空格键了!Word 里文字对齐推荐这4种方法

    到底如何才能快速对齐Word文字呢 今天就教大家4个好方法 不用敲空格键 2秒对齐所有文字 1 Tab键对齐 首先选中要对齐的文本 点击 视图 选中 标尺 在文字需要对齐的位置设置 制表位 接着鼠标点击文本前 然后再按一个 Tab键 立马就
  • Java List 根据ID 去重复

    在实战业务场景中 可能需要去重List 重复数据 直接看如下示例 1 代码示例 import cn hutool core collection ListUtil import cn hutool json JSONUtil import
  • mysql 自增键的上限后异常处理

    一般情况下对于自增键 会使用 int 自增键上限 2的32次方 1 4294967295 达到自增上限后 数据就无法继续插入 报 ERROR 1062 23000 Duplicate entry 4294967295 for key PRI
  • tcc分布式事务源码解析系列(一)之项目结构

    happylifeplat tcc 是什么 有什么功能 这是碧桂园旺生活解决分布式事务的TCC开源方案 github地址 支持dubbo springcloud等rpc框架进行分布式事务 本地事务存储 支持redis mogondb zoo
  • python 自建kafka消息生成和消费小工具

    要将 Kafka 的消息生产和消费转换为 API 接口 我们可以使用 Python 的 Web 框架 其中 Flask 是一个轻量级且易于使用的选择 下面是一个简单的例子 使用 Flask 创建 API 来生成和消费 Kafka 消息 1