kafka 多消费者实现

2023-05-16

kafka官网: http://kafka.apache.org/quickstart

目录

kafka简单介绍:

实现方式   

1:kafka分区

2: 实现结果

3:kafka的consumer代码

4:kafka生产者


kafka简单介绍(网上找的):

     

实现方式   

必要条件: 

        kafka配置:

        > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic TEST

        topic存在多个分区(--partitions 5), 才会实现多个consumer消费一个topic, 注意:consumer的数量应小于partitions数量, 要不然会浪费。

误区: 多线程多个消费者, 在kafka多个线程消费者不安全

 KafkaConsumer是非线程安全的,那么怎么样实现多线程消费?
      每个线程一个消费者

Kafka中是怎么体现消息顺序性的?
      kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。
整个topic不保证有序

Kafka中的分区器如何处理数据?
    分区器:根据键值确定消息应该处于哪个分区中,默认情况下使用轮询分区,可以自行实现分区器接口自定义分区逻辑

实现方式:

        方法一: 开启多个进程消费者, 在每个进程里使用线程池异步做业务处理。

        方法二: 多个Consumer且每一个Consumer有自己的线程,

这里主要讲的方法一, 方法二(优秀人的博客): http://www.cnblogs.com/qizhelongdeyang/p/7355309.html

 

1:kafka分区

bin/kafka-topics.sh --describe --zookeeper localhost:2181

 

2: 实现结果

开启5个进程如下:

 

 

开启一个进程:

 

3:kafka的consumer代码

Kafka_Consumer.java
 

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;


public final class Kafka_Consumer {

    /**
     * kafka消费者不是线程安全的
     */
    private final KafkaConsumer<String, String> consumer;
    private ExecutorService executorService;

    public Kafka_Consumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers",
                "180.108.64.146:9099");    //180.108.64.146:9099 kafka的服务器和端口号
        props.put("group.id", "12334");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "100");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset","latest");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("TEST"));
    }


    public void execute() {
        executorService = Executors.newFixedThreadPool(6);     //线程池做异步清洗数据
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            if (null != records) {
                executorService.submit(new ConsumerThread(records));
            }
        }
    }

    public void shutdown() {
        try {
            if (consumer != null) {
                consumer.close();
            }

            if (executorService != null) {
                executorService.shutdown();
            }
            if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                System.out.println("Timeout");
            }
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
    }
}



/**
*   线程池做业务处理,   将kakfa接收消息和业务分离开来
*/
class ConsumerThread implements Runnable {
    
    private ConsumerRecords<String, String> records;
    public ConsumerThread(ConsumerRecords<String, String> records) {
        this.records = records;
    }

    @Override
    public void run() {
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("当前线程:" + Thread.currentThread() + ","
                        + "偏移量:" + record.offset() + "," + "主题:"
                        + record.topic() + "," + "分区:" + record.partition()
                        + "," + "获取的消息:" + record.value());

        }

    }

}

ConsumerMain.java

public class ConsumerMain {

    public static void main(String[] args) {
        Kafka_Consumer kafka_Consumer = new Kafka_Consumer();
        try {

            kafka_Consumer.execute();
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            kafka_Consumer.shutdown();
        }
    }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>data</groupId>
    <artifactId>analyticCore</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>analyticCore</name>
    <url>http://maven.apache.org</url>

    <properties>
        <drools.version>5.3.1.Final</drools.version>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.drools</groupId>
            <artifactId>drools-core</artifactId>
            <version>${drools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.drools</groupId>
            <artifactId>drools-compiler</artifactId>
            <version>${drools.version}</version>
        </dependency>
        <!-- required for drools and spring integration -->
        <dependency>
            <groupId>org.drools</groupId>
            <artifactId>drools-spring</artifactId>
            <version>${drools.version}</version>
        </dependency>
        <dependency>
            <groupId>com.thoughtworks.xstream</groupId>
            <artifactId>xstream</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.2.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>1.0.1</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.9</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.35</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20180130</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore-nio</artifactId>
            <version>4.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpasyncclient</artifactId>
            <version>4.1.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>
                    <skipTests>true</skipTests>
                    <verbose>true</verbose>
                    <showWarnings>true</showWarnings>
                    <fork>true</fork>
                    <meminitial>128m</meminitial>
                    <maxmem>512m</maxmem>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <configuration>
                    <mainClass>data.analyticCore.consumerMain</mainClass>
                </configuration>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/rules</directory>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
    </build>
</project>

4:kafka生产者

python代码:

from kafka import KafkaProducer
import json
import time
import random
import threading

producer = KafkaProducer(
                            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                            bootstrap_servers=['180.108.64.146:9099']
                         )
sj = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))


def send_msg(Num):
    for i in range(Num):
        time.sleep(1)
        data = {
            "name": "李四",
            "age": 23,
            "gender": "男",
            "id": i
        }
        producer.send('TEST', data)
        print("============%s" % i)
    end = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print("===开始时间:%s" % sj)
    print("=====截止时间: %s" % end)
    producer.close()


def thread_start():
    num = 1
    Num = 2000
    Threads = []

    for i in range(num):
        Threads.append(threading.Thread(target=send_msg, args=(Num,)))
    for t in Threads:
        # t.setDaemon(True)
        t.start()

if __name__ == "__main__":
    send_msg(100000)


 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka 多消费者实现 的相关文章

  • symfonos 2

    目录 扫描 SMB SSH nbsp 提权 扫描 由于端口80是打开的 我们试图在浏览器中打开IP地址 但在网页上没有找到任何有用的信息 我们还尝试了dirb和其他目录暴力工具 但没有找到任何东西 SMB 为了进一步枚举 我们使用Enum4
  • Metasploit渗透测试:工作区使用帮助

    目录 编辑 help 查看工作区 添加工作区 显示工作区详情
  • 生成密码字典(特殊字符)

    目录 配置字典规则 含义 生成字典 特殊字符表 配置字典规则 打开john路径 nbsp cat etc john john conf 在 List Rules Wordlist 下添加 0 9 0 9 0 9 0 9 amp lt gt
  • Ubuntu系统下卸载命令apt-get remove/purge/autoremove/clean/autoclean的区别

    1 特意提醒新手注意一下 xff0c 下面的在桌面版的Ubuntu系统下尽量不要使用 xff1a apt get autoremove 删除已安装的软件包 xff08 保留配置文件 xff09 xff0c 不会删除依赖软件包 xff0c 且
  • FTP Entering Extended Passive Mode

    目录 原因 两种方法解决 哪个行用哪种 方法一 方法二 原因 FTP的连接建立有两种模式PORT
  • windows文件传输到linux

    Windows上传使用Windows脚本语言 在某些情况下 我们可能需要使用Windows客户端从目标网络中泄漏数据 这可能很复杂 因为Windows上默认很少启用标准的TFTP FTP和HTTP服务器 幸运的是 如果允许出站HTTP流量
  • 下载后直接运行ps1脚本(脚本文件不存入本地硬盘)

    kali 64 kali var www html sudo cat helloworld ps1 Write Output 34 Hello World 34 C Users Offsec gt powershell exe IEX Ne
  • Linux安装FTP服务(Pure-FTPd)

    目录 介绍 安装 客户端使用 介绍 攻击机器上快速安装Pure FTPd服务器 如果您已经在Kali系统上配置了FTP服务器 则可以跳过这些步骤 安装 kali kali sudo apt update amp amp sudo apt i
  • 环形缓存队列

    单片机开发中经常碰到需要用到缓存的地方 xff0c 例如串口 xff0c DMA等设备工作时 xff0c 下面介绍一种简单的环形缓存队列 定义数据结构 span class token keyword typedef span span c
  • Spring笔记(一):Ioc 之 Bean的管理

    前提 xff1a 1 需要 spring dom4j junit commons logging等的jar包 xff0c 配置web xml xff0c 新增 applicationContext xml 2 Spring主要核心是 xff
  • Cesium球心坐标与本地坐标系经纬转换的数学原理—矩阵变换

    之前整理过 xff1a 透析矩阵 xff0c 由浅入深娓娓道来 高数 线性代数 矩阵 三维旋转笔记 欧拉角 四元数 旋转矩阵 轴角 记忆点整理 xff0c 这次转载 FuckGIS的 Cesium之球心坐标与本地坐标 xff0c 算是线性代
  • 关于几种排序算法的时间性能比较

    以前经常看到各种排序算法 xff0c 今天也对以下6种排序算法的时间性能做了一次测试 xff1a 测试代码地址 1 冒泡排序 O n 2 span class hljs comment 冒泡排序 64 Param int arr 整形切片
  • VMware虚拟机系统没有声音?

    问题 有时 xff0c 我们使用VMware Workstation安装了系统 xff0c 但发现虚拟机系统播放视频或音乐时没有声音 xff0c 怎么办 xff1f 处理 点击菜单栏的 虚拟机 gt 设置 或虚拟机选项卡中的 编辑虚拟机设置
  • virtualbox启动虚拟机报错Failed to open/create the internal network 'HostInterfaceNetworking-VirtualBox Host

    VirtualBox 是6 0 14版本 在删除原有VirtualBox Host Only虚拟网卡并重新添加后 xff0c 虚拟机可能会无法启动 xff0c 启动虚拟机报以下错误 xff1a Failed to open create t
  • 几十万换来的Ddos攻击防护经验分享(转载)

    发布时间 xff1a 2017 01 05 来源 xff1a 服务器之家 本人从事网络安全行业20年 有15年防ddos攻击防护经验 被骗了很多回 xff08 都说能防300G xff0c 500G xff0c 买完就防不住了 xff09
  • C++语法学习笔记六十七:重载全局new、delete,定位new及重载等

    实例代码 xff1a span class token macro property span class token directive keyword include span span class token string lt io
  • python图像处理之scikit-image基本用法

    本文介绍Python语言用于数字图像处理 xff0c 那么要使用python进行各种开发和科学计算 xff0c 需要对应相对的python包 xff0c python有很多的数字图像处理相关的包 xff0c 像 PILPillowOpenC
  • inflate函数及其使用例子 笔记

    LayoutInflater的inflate函数用法详解 LayoutInflater作用是将layout的xml布局文件实例化为View类对象 获取LayoutInflater的方法有如下三种 LayoutInflater inflate
  • Linux学习之----socket网络编程基础

    分层模型 OSI七层模型 1 物理层 xff1a 主要定义物理设备标准 xff0c 如网线的接口类型 光纤的接口类型 各种传输介质的传输速率等 它的主要作用是传输比特流 xff08 就是由1 0转化为电流强弱来进行传输 xff0c 到达目的
  • SpringMvc常用注解

    1 64 Controller 用于标记一个类 xff0c 即一个SpringMVC Controller对象 xff0c 一个控制器类 Spring使用扫描机制查找应用程序中所有基于注解的控制器类 为了保证Spring能找到控制器 xff

随机推荐