Spring R2dbc:有没有办法从postgresql数据库获取恒定流并处理它们?

2023-12-24

我想将 postgresql 中的表中新创建的记录作为实时/连续流获取。可以使用spring r2dbc吗?如果是这样,我有什么选择?

Thanks


你需要使用pg_notify并开始在上面列出。您想要看到的任何更改都应该包含在简单的触发器中,该触发器将向pg_notify.

我的上有一个这样的例子github https://github.com/Koziolek/wjug-r2dbc/,但长话短说:

准备函数和触发器:

CREATE OR REPLACE FUNCTION notify_member_saved()
    RETURNS TRIGGER
AS $$
BEGIN
    PERFORM pg_notify('MEMBER_SAVED',  row_to_json(NEW)::text);
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER member_saved_trigger
    AFTER INSERT OR UPDATE
    ON members
    FOR EACH ROW
EXECUTE PROCEDURE notify_member_saved();

在java代码中准备监听器


@Service
@RequiredArgsConstructor
@Slf4j
class NotificationService {


    private final ConnectionFactory connectionFactory;
    private final Set<NotificationTopic> watchedTopics = Collections.synchronizedSet(new HashSet<>());

    @Qualifier("postgres-event-mapper")
    private final ObjectMapper objectMapper;

    private PostgresqlConnection connection;


    @PreDestroy
    private void preDestroy() {
        this.getConnection().close().subscribe();
    }

    private PostgresqlConnection getConnection() {
        if(connection == null) {
            synchronized(NotificationService.class) {
                if(connection == null) {
                    try {
                        connection = Mono.from(connectionFactory.create())
                                .cast(Wrapped.class)
                                .map(Wrapped::unwrap)
                                .cast(PostgresqlConnection.class)
                                .toFuture().get();
                    } catch(InterruptedException e) {
                        throw new RuntimeException(e);
                    } catch(ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        }
        return this.connection;
    }

    public <T> Flux<T> listen(final NotificationTopic topic, final Class<T> clazz) {

        if(!watchedTopics.contains(topic)) {
            executeListenStatement(topic);
        }

        return getConnection().getNotifications()
                .log("notifications")
                .filter(notification -> topic.name().equals(notification.getName()) && notification.getParameter() != null)
                .handle((notification, sink) -> {
                    final String json = notification.getParameter();
                    if(!StringUtils.isBlank(json)) {
                        try {
                            sink.next(objectMapper.readValue(json, clazz));
                        } catch(JsonProcessingException e) {
                            log.error(String.format("Problem deserializing an instance of [%s] " +
                                    "with the following json: %s ", clazz.getSimpleName(), json), e);
                            Mono.error(new DeserializationException(topic, e));
                        }
                    }
                });
    }

    private void executeListenStatement(final NotificationTopic topic) {
        getConnection().createStatement(String.format("LISTEN \"%s\"", topic)).execute()
                .doOnComplete(() -> watchedTopics.add(topic))
                .subscribe();
    }

    public void unlisten(final NotificationTopic topic) {
        if(watchedTopics.contains(topic)) {
            executeUnlistenStatement(topic);
        }
    }

    private void executeUnlistenStatement(final NotificationTopic topic) {
        getConnection().createStatement(String.format("UNLISTEN \"%s\"", topic)).execute()
                .doOnComplete(() -> watchedTopics.remove(topic))
                .subscribe();
    }
}

从控制器开始列出

@GetMapping("/events")
    public Flux<ServerSentEvent<Object>> listenToEvents() {

        return Flux.merge(listenToDeletedItems(), listenToSavedItems())
                .map(o -> ServerSentEvent.builder()
                        .retry(Duration.ofSeconds(4L))
                        .event(o.getClass().getName())
                        .data(o).build()
                );

    }

    @GetMapping("/unevents")
    public Mono<ResponseEntity<Void>> unlistenToEvents() {
        unlistenToDeletedItems();
        unlistenToSavedItems();
        return Mono.just(
                ResponseEntity
                        .status(HttpStatus.I_AM_A_TEAPOT)
                        .body(null)
        );
    }

    private Flux<Member> listenToSavedItems() {
        return this.notificationService.listen(MEMBER_SAVED, Member.class);
    }


    private void unlistenToSavedItems() {
        this.notificationService.unlisten(MEMBER_SAVED);
    }

但请记住,如果有什么东西坏了,那么你就输了pg_notify事件已经持续了一段时间,因此它适用于非关键任务解决方案。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring R2dbc:有没有办法从postgresql数据库获取恒定流并处理它们? 的相关文章

随机推荐

  • PHP:将文档/文本解析为句子

    我正在寻找类似的 PHP 版本 http journals ecs soton ac uk java tutorial intl collat ion textBound html http journals ecs soton ac uk
  • vim:执行编辑器命令列表

    vim 有没有办法给出编辑器命令列表 我想执行一系列 全局 命令 并且这些命令有一些模式 因此 我理想地希望生成命令列表 使用正则表达式搜索和替换 然后运行它们 而不必键入每个命令 谢谢 高拉夫 更新 s buffer register g
  • 是否可以在 Mountain Lion 上当前的 Xcode 4.6.1 工具链中启用 _LIBCPP_DEBUG2?

    这个线程 http comments gmane org gmane comp compilers clang devel 16838是对 clang 的 libc 调试模式的早期讨论 该模式通过定义来启用 LIBCPP DEBUG2在编译
  • 在 R 中添加新行

    我需要在后面添加一个新行 我的数据如下所示 1 60112 486 100 xxx BS 1 1 486 100 yyy TE I need 1 60112 486 100 xxx BS 1 1 486 100 yyy TE 我怎样才能实现
  • 无法达到最佳性能

    我正在努力达到每个人的最佳表现SM从下面的代码 峰值位于 25 GFlops GTX275 GT200 Arch 之间 此代码最多提供 8 GFlops global void new ker float x int index threa
  • 适用于 GAE 的 Python 无头浏览器

    我正在尝试将 Angular js 客户端与 Google Appengine 上的 webapp2 一起使用 为了解决 SEO 问题 我们的想法是使用无头浏览器在服务器端运行 javascript 并将生成的 html 提供给爬虫 有没有
  • 如何覆盖 Nixos configuration.nix 中的 2(两个)包

    我的configuration nix 中有一些需要覆盖的包 所以我写的代码如下 nixpkgs config allowUnfree true packageOverrides pkgs rec mumble pulse audio mu
  • 在 JavaScript 中获取字体规格?

    我目前正在开发一个使用 HTML5 画布作为渲染目标的 JavaScript 项目 为了使我的代码能够很好地与我提供的 严格指定的 接口配合使用 我需要能够获取一种字体并提取该字体的上升和下降高度 这将使客户能够更准确地定位文本 我知道我可
  • 如何去掉多余的双引号?

    在格式错误的 csv 文件中 有一行数据带有额外的双引号 例如最后一行 Name Comment Peter Nice singer Paul Love folk songs 如何删除周围的双引号folk并将字符串替换为 Name Comm
  • 在 3NF 中找到关系,但在 BCNF 中找不到关系

    我一直在阅读许多不同的资料来了解如何区分 3NF BCNF 中的关系 到目前为止 这是我的理解 我将使用这种关系作为例子 R A B C D E and F A gt B B C gt E E D gt A 首先我们必须找到关系的关键 我用
  • 你如何解析悬空的 else ?

    我正在为 C 语言编写一个编译器 我只剩下一个问题需要解决 如何处理悬空的 else 原来的规则是这样的 A gt if 表达式 语句 if 表达式 语句 else 语句 摆脱左递归后 A gt if 表达式 语句 B B gt else
  • 如何有效地比较两个无序列表(而不是集合)?

    a 1 2 3 1 2 3 b 3 2 1 3 2 1 a 和 b 应该被认为是相等的 因为它们具有完全相同的元素 只是顺序不同 问题是 我的实际列表将由对象 我的类实例 组成 而不是整数 O n The 柜台 https docs pyt
  • 如何绘制多列 CSV 文件?

    我对 R 很陌生 所以请原谅我问一个可能很愚蠢的问题 我有一个多列 CSV 纯逗号分隔 无引号 文件 其中第一行是标题 第一列是连续的整数索引 其他 17 列是函数的浮点值 任务是在同一张图表上绘制所有 17 条线 具有相同的轴 听起来很简
  • 在不使用 startwith 函数的情况下从字符串中间提取特定单词?

    我正在尝试从列表中提取变量的名称 我已将列表声明为list end 并且在for循环中 我希望for循环读取列表中的每个字符串 如果在字符串中找到 int 则它从int开始并以list end结束 并且添加到空list variable d
  • 为什么这个渐进式 jpeg 不逐步加载?

    我的网站上有一些大图像 因此我将它们保存为渐进式 jpeg 这样用户应该可以在下载时看到正在发生的事情 但在下载整个 jpeg 之前的几秒钟内什么也没有显示 我究竟做错了什么 网站 大图应该很明显 http www heylookthats
  • Rails 5 白名单 css 属性用于清理助手

    我需要允许内联 style position absolute 输出由sanitize post content 我发现Rails 4 的文档 http apidock com rails v4 0 2 ActionView Helpers
  • WebSocket 版本和向后兼容性

    过去几天我一直在尝试 WebSocket 并且对这项非常酷的新技术有一些复杂的体验 我编写了一个简单的聊天客户端 它使用最新版本HTML5 实验室 http html5labs interoperabilitybridges com pro
  • 如何在java中使条形图中的条形从x轴开始? [关闭]

    Closed 这个问题需要调试细节 help minimal reproducible example 目前不接受答案 正如您在图片中看到的 图表中的条形从 x 轴下方开始 如何修复它以使其从 x 轴线开始 请建议我修复 我也会帮助知道我哪
  • WebRTC 视频流无法通过 http 工作

    我正在尝试使用 WebRTC Peer js 创建游戏 而且我无法通过 http 进行视频通话 也许它只能通过https 附 所有 我看到的 媒体通话的工作示例都使用 https 1 http cdn peerjs com demo vid
  • Spring R2dbc:有没有办法从postgresql数据库获取恒定流并处理它们?

    我想将 postgresql 中的表中新创建的记录作为实时 连续流获取 可以使用spring r2dbc吗 如果是这样 我有什么选择 Thanks 你需要使用pg notify并开始在上面列出 您想要看到的任何更改都应该包含在简单的触发器中