AsyncContext优雅实现HTTP长轮询接口

2023-11-05

一、背景

接到一个需求,实现方案时需要提供一个HTTP接口,接口需要hold住5-8秒,轮询查询数据库,一旦数据库中值有变化,取出变化的值进行处理,处理完成后返回响应。这不就是长轮询吗,如何优雅的实现呢?

二、方案设计

在 Spring 中,AsyncContext 是用于支持异步处理的一个重要的特性。它允许我们在 servlet 请求处理过程中,将长时间运行的操作放在一个单独的线程中执行,而不会阻塞其他请求的处理。

AsyncContext 在以下两种情况下特别有用:

  1. 长时间运行的操作:当我们需要执行一些耗时的操作,例如网络请求、数据库查询或其他 I/O 操作时,通过将这些操作放在一个新的线程中,可以避免阻塞 servlet 容器中的线程,提高应用的并发性能。

  2. 推送异步响应:有时候,我们可能需要推送异步产生的响应,而不是等到所有操作都完成后再下发响应。通过 AsyncContext,我们可以在任何时间点上触发异步响应,将结果返回给客户端。

使用 AsyncContext 的步骤如下:

  1. 在 servlet 中启用异步模式:在 servlet 中,通过调用 startAsync() 方法,可以获取到当前请求的 AsyncContext 对象,从而启用异步处理模式。
HttpServletRequest request = ...;
AsyncContext asyncContext = request.startAsync();
  1. 指定异步任务:通过调用 AsyncContext 对象的 start() 方法,在新的线程中执行需要异步处理的任务。
asyncContext.start(() -> {
    // 异步任务逻辑
});
  1. 提交响应:在异步任务完成后,可以调用 AsyncContext 对象的 complete() 方法,以表示异步操作完成。
asyncContext.complete();

需要注意的是,我们在使用 AsyncContext 时需要特别注意线程安全。由于异步任务在单独的线程中执行,所以可能存在并发问题。因此,在编写异步任务逻辑时,需要注意线程安全性,使用合适的同步措施。

另外,AsyncContext 也支持超时设置、错误处理、事件监听等功能,这些可以通过相应的方法和回调进行配置。可以根据具体的需求使用这些功能来优化异步处理的逻辑。

总结来说,Spring 的 AsyncContext 提供了方便的异步处理机制,可以提高应用的并发性能,并支持推送异步响应,使得应用更具有响应性和可伸缩性。

三、代码1

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.*;

@RestController
@RequestMapping("/api/test")
@Slf4j
public class AsyncTestController {

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    private final ExecutorService timeoutChecker = new ThreadPoolExecutor(1,1,1000,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d").build();


//    private static boolean result = false;

    @PostMapping("/async")
    public void async(HttpServletRequest request, HttpServletResponse response) {
        // 创建AsyncContext
        AsyncContext asyncContext = request.startAsync(request, response);
        // 设置处理超时时间8s
        asyncContext.setTimeout(8000L);
        // asyncContext监听
        JdAsyncTestListener asyncListener = new JdAsyncTestListener(redisTemplate,asyncContext);
        asyncContext.addListener(asyncListener);
        // 定时处理业务,处理成功后asyncContext.complete();完成异步请求
        asyncContext.start(asyncListener);
    }

    // 模拟业务处理完成
    @PostMapping("/set")
    public ResultModel notify(String key, String value) {
        redisTemplate.opsForValue().set(key, value);
        return ResultModel.success();
    }

    @PostMapping("/get")
    public ResultModel get(String key) {
        String s = redisTemplate.opsForValue().get(key);
        return ResultModel.success(s);
    }

    @PostMapping("/del")
    public ResultModel del(String key) {
        redisTemplate.delete(key);
        return ResultModel.success();
    }
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import java.io.IOException;


@Slf4j
public class JdAsyncTestListener implements AsyncListener,Runnable {
    boolean isComplete;

    private RedisTemplate<String, String> redisTemplate;
    private AsyncContext asyncContext;
    public JdAsyncTestListener(RedisTemplate<String, String> redisTemplate, AsyncContext asyncContext) {
        this.redisTemplate = redisTemplate;
        this.asyncContext = asyncContext;
    }

    @Override
    public void run() {
        try {
            while(true){
                if(isComplete){
                    log.info("已经退出");
                    break;
                }
                boolean b = redisTemplate.opsForValue().get(1) != null;
                log.info("获取标志位:"+b);
                Thread.sleep(300);
                if (b) {
                    asyncContext.getResponse().getWriter().print(1);
                    asyncContext.complete();
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void onComplete(AsyncEvent asyncEvent) throws IOException {
        log.info("结束了");
        isComplete = true;

    }

    @Override
    public void onTimeout(AsyncEvent asyncEvent) throws IOException {
        log.info("超时了");
        isComplete = true;
    }

    @Override
    public void onError(AsyncEvent asyncEvent) throws IOException {

    }

    @Override
    public void onStartAsync(AsyncEvent asyncEvent) throws IOException {

    }
}

四、代码二

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.dubbo.config.annotation.Reference;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

@Validated
@RestController
@RequestMapping("/api/test")
@Slf4j
public class TestController {

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    private final ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(10, threadFactory);
    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d").build();

    private static boolean result = false;

    private final boolean isTimeout = false;


    /**
     * 消息
     *
     * @return
     */
    @PostMapping("/test")
    public void callback(@RequestBody TestLongPollRequest testLongPollRequest, HttpServletRequest request, HttpServletResponse response) {
        // 创建AsyncContext
        AsyncContext asyncContext = request.startAsync(request, response);
        String jdCustomerId = jdLongPollRequest.getJdCustomerId();
        // 设置处理超时时间8s
        asyncContext.setTimeout(8000L);
        // asyncContext监听
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent asyncEvent) throws IOException {
                log.info("onComplete={}", asyncEvent);
            }

            @Override
            public void onTimeout(AsyncEvent asyncEvent) throws IOException {
                log.info("onTimeout={}", asyncEvent);
                ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
                map.put("code", "500");                asyncContext.getResponse().getWriter().print(JSON.toJSONString(map));
                asyncContext.complete();
            }

            @Override
            public void onError(AsyncEvent asyncEvent) throws IOException {
                log.info("onError={}", asyncEvent);
            }

            @Override
            public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
                log.info("onStartAsync={}", asyncEvent);
            }
        });
        // 定时处理业务,处理成功后asyncContext.complete();完成异步请求
        timeoutChecker.scheduleAtFixedRate(() -> {
            try {
                String redisKey = getRedisKey(customerId);
                String redisValue = redisTemplate.opsForValue().get(redisKey);
                result = StringUtils.isNotBlank(redisValue);
                if (result) {
                    //todo 长轮询查询数据库。通过customerId查询
                    send(customerId, redisValue);
                    ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
                    map.put("code", "200");
                    map.put("msg", redisValue);
                    asyncContext.getResponse().getWriter().print(JSON.toJSONString(map));
                    asyncContext.complete();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, 0, 100L, TimeUnit.MILLISECONDS);
    }

    /**
     * 发送消息
     */
    private void send(String customerId, String content) {
        
    }
  
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

AsyncContext优雅实现HTTP长轮询接口 的相关文章

  • 尝试克隆一个 git 存储库,但它卡在克隆到中

    我使用的是 Windows 10版本 10 0 19042 内部版本 19042 GIT Ver 2 32当尝试使用 git bash 执行以下命令时git clone depth 1 b carla https github com Ca
  • 如何在java中以编程方式访问网页

    有一个网页 我想从中检索某个字符串 为此 我需要登录 单击一些按钮 填充文本框 单击另一个按钮 然后就会出现字符串 我怎样才能编写一个java程序来自动执行此操作 是否有任何有用的库用于此目的 Thanks Try HtmlUnit htt
  • Go客户端程序生成大量TIME_WAIT状态的socket

    我有一个 Go 程序 它从多个 goroutine 生成大量 HTTP 请求 运行一段时间后 程序报错 connect cannot allocaterequestedaddress 当检查时netstat 我得到大量 28229 个连接T
  • 如何设置响应文件名而不强制“另存为”对话框

    我在某些响应中返回一个流 设置适当的content type标头 我正在寻找的行为是这样的 如果浏览器能够呈现给定内容类型的内容 那么它应该将其显示在浏览器窗口中 如果浏览器不知道如何呈现内容 那么它应该显示 另存为 对话框 其中文件名应该
  • C# HTTP 请求解析器[重复]

    这个问题在这里已经有答案了 可能的重复 将原始 HTTP 请求转换为 HTTPWebRequest 对象 https stackoverflow com questions 318506 converting raw http reques
  • 是否可以使用 http url 作为 DirectShow .Net 中源过滤器的源位置?

    我正在使用 DirectShow Net 库创建一个过滤器图 该过滤器图通过使用 http 地址和 WM Asf Writer 来流式传输视频 然后 在网页上 我可以使用对象元素在 Windows Media Player 对象中呈现视频源
  • 为 REST API 编写单元测试的最佳方法是什么?

    在为 API 包装器编写单元测试时 我应该对 REST API 端点进行真正的调用 还是应该使用 mocl 响应来模拟成功和错误的调用 单元测试意味着只测试你的unit API 包装器 仅此而已 因此 不幸的是 您应该模拟整个 API 另一
  • Node.js:在检索 http 请求正文之前断开 http 请求连接

    我正在用 Node js 编写一个 http 服务器 我有一个客户端通过 HTTP POST 多部分 数据 将大文件上传到该服务器 我想接受唯一使用有效文件名上传文件的连接 我有一些条件 在服务器检索数据之前应断开无效文件名连接 我不知道如
  • Spring RestTemplate - 带有请求正文的http GET [重复]

    这个问题在这里已经有答案了 可能的重复 带请求正文的 HTTP GET https stackoverflow com questions 978061 http get with request body 我在这里读过一些不提倡通过 HT
  • 如何解决 302 重定向上的 POST 更改为 GET 的问题?

    我网站的某些部分只能通过 HTTPS 访问 不是整个网站 安全与性能妥协 并且如果通过纯 HTTP 发送请求 则 HTTPS 是通过对安全部分的请求进行 302 重定向来强制执行的 问题是对于所有主流浏览器来说 如果您在 POST 上执行
  • 如何让 HttpClient 返回状态码和响应正文?

    我试图让 Apache HttpClient 触发 HTTP 请求 然后显示 HTTP 响应代码 200 404 500 等 以及 HTTP 响应正文 文本字符串 重要的是要注意我正在使用v4 2 2因为大多数 HttpClient 示例都
  • Chrome 问题 - 视频流和会话冲突

    我在使用 javascript 和 PHP 实现视频时遇到问题 索引 php session start do other stuff include video php 视频 php
  • 无法在 git 上获取 Http 工作

    我在拇指驱动器上使用 gitbash 作为 git 我的防火墙阻止了我 并且想设置我的 git 以进行 http 访问 我使用 github 并且已经看到了有关如何执行此操作的各种信息 但我还不够了解 无法让它为我自己工作 我在 php i
  • Django:如何测试“HttpResponsePermanentRedirect”

    我正在为我的 django 应用程序编写一些测试 在我看来 它使用 HttpResponseRedirect 重定向到其他一些网址 那么我该如何测试呢 姜戈TestCase类有一个方法assertRedirects https docs d
  • 为什么 Firefox 会忽略缓存标头并在刷新时重新验证?

    我有一些不可变的图像资源 可以永久缓存 Chrome 似乎尊重我的响应标头 并且不会重新验证资源 以下是 Chrome 中其中一项资源的示例 正如你所看到的 我包括cache control public max age expires e
  • 如何在GO中执行HEAD请求?

    我想使用 GO net http 获取页面的内容长度 我可以在终端中使用curl i X HEAD https golang org然后检查内容长度字段 use http Head https golang org pkg net http
  • 读取 GetResponseStream() 的最佳方式是什么?

    从 GetResponseStream 读取 HTTP 响应的最佳方法是什么 目前我正在使用以下方法 Using SReader As StreamReader New StreamReader HttpRes GetResponseStr
  • 在 Python 中发送 100,000 个 HTTP 请求的最快方法是什么?

    我正在打开一个包含 100 000 个 URL 的文件 我需要向每个 URL 发送 HTTP 请求并打印状态代码 我正在使用 Python 2 6 到目前为止 我已经了解了 Python 实现线程 并发的许多令人困惑的方式 我什至看过蟒蛇一
  • Java Servlet 中限制 HTTP 请求

    在 java servlet 中 如何根据客户端的 IP 地址限制来自用户的 http 请求 我不想每秒处理来自特定源 IP 地址的超过 X 个请求 其中 X 是可配置的并且具有 0 1 中的实际值 10 范围 从 10 秒内 1 个请求到
  • 当响应有正文时,内容长度或传输编码是必需的吗

    如果响应有正文 可以有正文 即状态代码不是 204 或 304 则响应标头中是否应始终具有内容长度或传输编码 在规范中 它不是很清楚 在我的场景中 我的主体没有内容长度或传输编码标头 因此卷曲继续等待no chunk no close no

随机推荐

  • Nacos配置中心原理(一)客户端部分

    基本概念 配置服务 在服务或者应用运行过程中 提供动态配置或者元数据以及配置管理的服务提供者 配置项 个具体的可配置的参数与其值域 通常以 param key param value 的形式存在 例如我们常 配置系统的日志输出级别 logL
  • OpenCV3.3中主成分分析(Principal Components Analysis, PCA)接口简介及使用

    OpenCV3 3中给出了主成分分析 Principal Components Analysis PCA 的实现 即cv PCA类 类的声明在include opencv2 core hpp文件中 实现在modules core src p
  • SAS9.4安装简易教程(保姆级)附带报错处理

    SAS安装教程 正常安装 环境准备 文件准备及安装 增强编辑器问题 一些报错处理方法 1 安装后处理 解决方案1 解决方案2 2 日期超过了SAS系统的最后截至日期 解决方案 3 逻辑库问题 解决方案 4 sid及产品许可问题 解决方案 卸
  • JT格式介绍(转换)

    JT Jupiter Tessellation 是一种高效 专注于行业且灵活的 ISO 标准化 3D 数据格式 由 Siemens PLM Software 开发 航空航天 汽车工业和重型设备的机械 CAD 领域使用 JT 作为其最领先的
  • 我的世界服务器无限刷东西指令,我的世界无限刷物品命令方块指令

    发布时间 2015 09 11 精华文章推荐 合成表大全 前期生存图文指南 怪物图鉴及属性一览 敖厂长生存解说视频 新手建筑指导班 豪华建筑建造教程 俾斯麦号建造方法 WE建筑辅助教程 创建服务器方法指南 加入服务器联机教程 多 标签 攻略
  • 学习实践-Alpaca-Lora (羊驼-Lora)(部署+运行+微调-训练自己的数据集)

    Alpaca Lora模型GitHub代码地址 1 Alpaca Lora内容简单介绍 三月中旬 斯坦福发布的 Alpaca 指令跟随语言模型 火了 其被认为是 ChatGPT 轻量级的开源版本 其训练数据集来源于text davinci
  • elasticsearch介绍

    什么是elasticsearch Elasticsearch是一个基于Lucene的搜索服务器 它提供了一个分布式多用户能力的全文搜索引擎 基于RESTful web接口 Elasticsearch是用Java语言开发的 并作为Apache
  • 知道这10个让你的API接口突然超时的原因吗?

    前言 不知道你有没有遇到过这样的场景 我们提供的某个API接口 响应时间原本一直都很快 但在某个不经意的时间点 突然出现了接口超时 也许你会有点懵 到底是为什么呢 今天跟大家一起聊聊接口突然超时的10个原因 希望对你会有所帮助 1 网络异常
  • CSS高级用法

    收藏一些css的高级用法 1 黑白图像 这段代码会让你的彩色照片显示为黑白照片 1 2 3 4 5 6 7 img desaturate filter grayscale 100 webkit filter grayscale 100 mo
  • java出现圅_java获取汉字拼音首字母A

    public class GetChinessFirstSpell 汉字拼音首字母列表 本列表包含了20901个汉字 用于配合 GetChineseSpell 函数使用 本表收录的字符的Unicode编码范围为19968至40869 南京
  • mac移动硬盘未装载解决方案

    一 现象 外置移动硬盘桌面不显示 只在磁盘工具应用中置灰显示 坑爹的是你无法进行任何操作只能查看详细信息 二 尝试解决方法 1 尝试了很多修复工具也没有用 包括Tuxera 因为你压根没有装载成功谈何其它操作 2 尝试手动装载 卸载 1 使
  • IC新人必看:芯片设计流程最全讲解!

    对于消费者而言 一个可以使用的系统 有数字集成电路部分 模拟集成电路部分 系统软件及上层应用部分 关于各个部分的功能 借用IC 咖啡胡总的精品图可以一目了然 外部世界是一个模拟世界 故所有需要与外部世界接口的部分都需要模拟集成电路 模拟集成
  • Kafka 入门三问

    目录 1 Kafka 是什么 1 1 背景 1 2 定位 1 3 产生的原因 1 4 Kafka 有哪些特征 消息和批次 模式 主题和分区 生产者和消费者 broker 和 集群 1 5 Kafka 可以做什么 Kafka作为消息系统 Ka
  • java开发中手动设置logback、jvm、容器的时区

    一 Logback xml配置日志输出时区为东八区 1 在日志格式配置后添加 CTT 或 GMT 8
  • electron 调试、问题追踪

    文章目录 前言 一 调试工具 1 生产环境调试工具 bugtron 2 日志 1 业务日志 2 网络日志 3 崩溃报告 二 捕获全局异常 1 开发网页时 2 在electron中全局异常捕获 3 从异常中恢复 保护用户界面 总结 前言 开发
  • Unity-世界坐标与屏幕坐标

    transform position x和transform position y的值含义是世界坐标 世界坐标与屏幕坐标有时一样 有时不同 这和Canvas的渲染模式有关 Canvas共有三种渲染模式 Screen Space Overla
  • 预处理等等

    预处理 define 宏定义是个演技非常高超的替身演员 但也会经常耍大牌的 所以我们用它要慎之又慎 它可以出现在代码的任何地方 从本行宏定义开始 以后的代码就就都认识这个宏了 也可以把任何东西定义成宏 因为编译器会在预编译的时候用真身替换替
  • Qt浅谈之一:内存泄露

    一 简介 Qt内存管理机制 Qt 在内部能够维护对象的层次结构 对于可视元素 这种层次结构就是子组件与父组件的关系 对于非可视元素 则是一个对象与另一个对象的从属关系 在 Qt 中 在 Qt 中 删除父对象会将其子对象一起删除 C 中del
  • 目标检测算法中,COCO评价指标的解析

    与图像分类的评价指标有所不同 图像分类是在所有的图像中分类正确和错误的概率 而目标检测显然不能这样来 那怎样才算检测正确 1 loU大于指定阈值 2 类别正确 3 confidence大于指定阈值 其实这三点在COCO评价指标当中都会运用到
  • AsyncContext优雅实现HTTP长轮询接口

    一 背景 接到一个需求 实现方案时需要提供一个HTTP接口 接口需要hold住5 8秒 轮询查询数据库 一旦数据库中值有变化 取出变化的值进行处理 处理完成后返回响应 这不就是长轮询吗 如何优雅的实现呢 二 方案设计 在 Spring 中