Java架构直通车——ThreadLocal实现RabbitMQ消息的批量发送

2023-11-18

引入

之前,我们完成了单个消息的发送,以及单个消息发送的多线程池化
这里,我们继续完成批量发送消息的封装。

因为rabbitMq本身是不支持批量发消息的,所以我们可以直接使用上文所创建的连接池来发送。

最简单的代码是这样的:

# ProducerClient.class
	@Override
    public void send(List<Message> messages) throws MessageRuntimeException {
        messages.forEach(message -> {
            message.setMessageType(MessageType.RAPID);
        });
        rabbitBroker.sendMessages(messages);
    }
# RabbitBrokerImpl.class
    @Override
    public void sendMessages(List<Message> messages) {
        messages.forEach(message -> {
            sendKernel(message);
        });
    }

这里使用ThreadLocal。为什么要将批量发送的消息存储到ThreadLocal里呢?这样做的好处相比于直接调用rabbitBroker.sendMessages(List<Message> messages)把批量消息直接放到参数变量里,优点是什么?

这样做是一个请求线程里来做,好处就是在请求链路的任何时候都可以往里面put消息,而不是集中在一起put。

什么是ThreadLocal

ThreadLocal一般称为线程本地变量,它是一种特殊的线程绑定机制,将变量与线程绑定在一起,为每一个线程维护一个独立的变量副本。通过ThreadLocal可以将对象的可见范围限制在同一个线程内。

  • 举例说明:
    对于教师判分来说,每个教师登陆后会从答题记录表中抽取20道学生的答案进行阅卷,为了避免同一道题被多个教师抽到,需要加锁进行控制,保证当时只有一个教师在抽题,其他教师只能等待,只有当这名教师抽题完成,等待的教师才可以进行抽题。同步机制就是为了同步多个线程对相同资源的并发访问,解决了多个线程之间进行通信的问题。
    ThreadLocal就从另一个角度来解决多线程的并发访问,ThreadLocal会为每一个线程维护一个和该线程绑定的变量的副本,从而隔离了多个线程的数据,每一个线程都拥有自己的变量副本,从而也就没有必要对该变量进行同步了。最明显的,ThreadLoacl变量的活动范围为某线程,并且我的理解是该线程“专有的,独自霸占”,对该变量的所有操作均有该线程完成!ThreadLocal提供了线程安全的共享对象,在编写多线程代码时,可以把不安全的整个变量封装进ThreadLocal,或者把该对象的特定于线程的状态封装进ThreadLocal。

ThreadLocal是怎么进行线程绑定的
thread里有个全局变量threadlocalmap,范型为<ThreadLocal,Object>,threadlocal在执行set方法时先获取当前线程,拿到threadlocalmap,以以身为key把值放到map中,从而实现变量与线程的绑定。
源码如下:

public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }
public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }

可以看到内部有一个ThreadLocalMap,每次都会获取当前线程,在从当前线程中插入或拿取值,这样就保证了只能当前线程能插入和拿取自己的值,其他线程是访问不到的。这样也就避免了传参问题。

所以一般做多数据源切换时,会使用到ThreadLocal。

private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {  
    public Connection initialValue() {  
        return DriverManager.getConnection(DB_URL);  
    }  
};  
  
public static Connection getConnection() {  
    return connectionHolder.get();  
}  

在多数据源中,会频繁的调用数据库,采用传参来决定使用什么数据源肯定是不可取的,采用ThreadLocal完美解决数据库切换问题。缺点就是以空间换时间的方式(与synchronized相反),以耗费内存为代价,但是大大减少了线程同步(如synchronized)所带来性能消耗以及减少了线程并发控制的复杂度。

关于ThreadLocal以后会做一个详尽的专题讲解,在此不做赘述。

使用ThreadLocal

首先实现一个ThreadLocal类:

public class MessageHolder {
    private List<Message> messages=  new ArrayList<>();

    @SuppressWarnings({"rawtypes","unchecked"})
    public static final ThreadLocal<MessageHolder> holder=new ThreadLocal(){
        @Override
        protected Object initialValue() {
            return new MessageHolder();
        }
    };

    public static void add(Message message){
        holder.get().messages.add(message);
    }

    public static List<Message> clear(){
        List<Message> tmp=new ArrayList<>(holder.get().messages);
        holder.remove();
        return tmp;

    }

}

我们可以看到,有add()加入方法,也有取出ThreadLocal中的变量的方法clear()

那么我们的代码可以变为:

# ProducerClient.class
	@Override
    public void send(List<Message> messages) throws MessageRuntimeException {
        messages.forEach(message -> {
            message.setMessageType(MessageType.RAPID);
            MessageHolder.add(message);
        });
        rabbitBroker.sendMessages();
    }
# RabbitBrokerImpl.class
	@Override
    public void sendMessages() {
        List<Message> messages=MessageHolder.clear();
        messages.forEach(message -> {
            sendKernel(message);
        });
    }

很好理解,变量直接存入ThreadLocal。sendMessages()的实现方法直接从ThreadLocal中取值即可。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java架构直通车——ThreadLocal实现RabbitMQ消息的批量发送 的相关文章

  • x390拆机 升级内存和硬盘_战66拆机加内存折腾手记

    前言 今年6 18时入HP 战66二代AMD版时就在计划双十一时升级内存到16G 并顺带加个机械硬盘 结果用了4个多月后感觉目前硬盘空间尚无压力 所以只计划加内存 晒单 之前有朋友用过芝奇 说还不错 既有逼格也有保障 没有等到11 11当天
  • openGL增强表面细节----高度贴图

    openGL系列文章目录 文章目录 openGL系列文章目录 前言 一 代码 主程序c 效果 前言 现在我们扩展法线贴图的概念 从纹理图像用于扰动法向量到扰乱顶点位置本身 实 际上 以这种方式修改对象的几何体具有一定的优势 例如使表面特征沿

随机推荐

  • 简单的移动阵型补全

    需求 模拟企鹅群以一种三角形的阵型移动 并向目标发动冲击 冲击时候的企鹅不在跟随阵型移动 阵型内的企鹅跟随阵型移动 并且会自动补全阵型 企鹅群体大概是这样的阵型 o o o o o o 可以抽象成一个数组 fromation new int
  • SQL中partition关键字的使用

    最近在写后台语句时候 运用到了partition这样一个关键字 先大致说一下背景 有一种数据表 如下 现在需要取出 每一个人最近的一次打卡时间 思路是 先把数据按照人名分组 然后在每个组里面按照时间排倒叙 最后取出每组的第一条数据即可 pa
  • 【Hadoop学起来】Linux配置$HADOOP_HOME/etc/hadoop/hadoop-env.sh时找不到JAVA_HOME?

    正文之前 今天很气愤 想要学点东西 但是老是被环境所限制 Hadoop这个见鬼的环境 我只是运行单机模式 结果就是都不成功 好不容易磕磕盼盼的终于把啥缺的东西都找出来了结果最后还是失败了 暂时我真的不想去看失败记录 因为快要睡了明天再说吧
  • Premiere Pro 2022有哪些新增功能吸引了你

    Premiere Pro2022正式出现在大家的面前 那么如此大版本的更新 都有哪些变化呢 今天我们就来谈谈pr22版本的变化 安装 Premiere Pro 2022中文 图形和标题 Premiere Pro 中的图形和字幕工作流程具有多
  • 华为星闪联盟:引领无线通信技术创新的先锋

    星闪 NearLink 是由华为倡导并发起的新一代无线短距通信技术 它从零到一全新设计 是为了满足万物互联时代个性化 多样化的极致 创新体验需求而诞生的 这项技术汇聚了中国300多家头部企业和机构的集体智慧 华为更是其中的主要贡献方 在过去
  • 微信小程序scroll-view滚动到指定位置

    自动滚动到指定位置 在小程序开发过程中 为了满足一个特殊的要求 因此需要一个特殊的功能 即在打开小程序数据加载完成的时候 需要页面自动滚动到指定位置 或者页面的底部 在各种度度 狗狗之后 通过各种尝试 算是找到了一个比较完美的方案 微信扫描
  • JVM-类加载详解

    一 JVM类加载过程 JVM类加载过程如下图 JVM类加载过程分为 加载 链接 初始化 使用 卸载 这五个阶段 其中链接阶段又包括 验证 准备 解析 加载 通过类的完全限定名 查找此类的二进制字节码文件 通过该字节码文件创建Class对象
  • springboot 结合 ice(飞冰) 实现上传功能

    ice 前端代码 我用的是一个拖拽的模板
  • Disk /dev/sdb doesn‘t contain a valid partition table

    使用fdisk l grep Disk命令查看磁盘情况提示Disk dev sdb doesn t contain a valid partition table 解决方案如下 使用 命令fdisk dev sdb 然后按照下面的步骤操作即
  • 【蓝桥杯 和与乘积】

    题目描述 解题思路 首先想想可以组成答案的区间有什么性质 很直观可以想到排除长度为1的和长度为2的 构成答案的区间肯定是由几个非1的数加上一堆1构成的 那么可以很容易的想到区间长度k有下面这个等式 k mul sm tot mul为区间非1
  • [Ubuntu]GTest安装和测试

    1 Ubuntu直接通过控制台安装 sudo apt get install libgtest dev 2 编译链接库 2 1进入gtest文件夹 cd usr src gtest 2 2编译 没有安装Cmake的请先安装cmake sud
  • 计算机无法识别华为m3,华为8寸M3(非青春版)电脑连接问题报告

    设备是一上市时候就买了 wifi版32g 具体时间忘了 用到现在整体感觉都挺好 就在这个长假因为某些原因想连电脑 这是第一次连 结果就发现了问题 设备情况 win10和win7电脑各一台 均安装了华为助手 5根usb线 本机EMUI5 01
  • 数据结构(3)— 线性表之顺序存储详解介绍(含代码)

    1 博客代码在 数据结构代码 GitHub仓库 线性表介绍 线性表的基础概念 1 甲骨文表示 线性表是零个或多个数据元素的有限序列 2 线性表 顾名思义 就是说这个数据存储是线性的 而线性的东西具有什么特征呢 lt 1 gt 数据是一对一的
  • JSWebrtc.js +对应视频实例化

    HTML div class context div
  • 大数据技术之 Flink-CDC

    第1章 CDC简介 1 1 什么是 CDC CDC 是 Change Data Capture 变更数据获取 的简称 核心思想是 监测并捕获数据库的变动 包括数据或数据表的插入 更新以及删除等 将这些变更按发生的顺序完整记录下来 写入到消息
  • Java RMI 调用链与源码解析

    本篇简单说一下RMI的调用流程和攻击历史 以及分析RMI的JDK源码 对于RMI攻击的各种反序列化并不做过多的分析 关于JDK源码 详情请参考 https github com frohoff jdk8u jdk tree master s
  • Python(4)list和tuple(类似js里的数组)

    6 list list其实就是js里的数组 放置在中括号里 用逗号分隔 就是数组的元素 例如 foo 1 2 3 print foo 1 2 3 len 访问list的长度 foo 1 2 3 print len foo 3 list n
  • 二叉树层次遍历的相关应用(伪代码)

    1 层次遍历 void LevelOrder BTree t Queue Q initQueue Q EnQueue Q t while Empty Q TNode p DeQueue Q if p gt lchird EnQueue Q
  • Redis主从复制(读写分离)、哨兵(主从切换)配置

    Redis的主从复制功能非常强大 一个master可以拥有多个slave 而一个slave又可以拥有多个slave 如此下去 形成了强大的多级服务器集群架构 官网 https redis io 环境 Master root Master u
  • Java架构直通车——ThreadLocal实现RabbitMQ消息的批量发送

    文章目录 引入 什么是ThreadLocal 使用ThreadLocal 引入 之前 我们完成了单个消息的发送 以及单个消息发送的多线程池化 这里 我们继续完成批量发送消息的封装 因为rabbitMq本身是不支持批量发消息的 所以我们可以直