Clojure core.async。如何在 with-open 中使用 go block 延迟下载?

2024-01-10

这是我之前问题的延续如何在 Clojure 中按部分生成惰性序列? https://stackoverflow.com/questions/47049581/how-to-produce-a-lazy-sequence-by-portion-in-clojure

我想从数据库中分部分下载数据。最初,我下载前 500 行,然后发送请求以获取接下来的 500 行,依此类推,直到收到来自服务器的所有数据。

我写了代码:

(jdbc/atomic conn
 (with-open [cursor (jdbc/fetch-lazy conn [sql_query])]
   (let [lazyseq (jdbc/cursor->lazyseq cursor)
         counter (atom 1)]
     (swap! lazyseq_maps assoc :session_id {:get_next? (chan 1) :over_500 (chan 1) :data []})
     (>!! (:get_next? (:session_id @lazyseq_maps)) true)
     (go
       (doseq [row lazyseq]
         (swap! counter inc)
         (when (<! (:get_next? (:session_id @lazyseq_maps)))
           (swap! lazyseq_maps update-in [:session_id :data] conj row)
           (if (not= 0 (mod @counter 500))
             (>! (:get_next? (:session_id @lazyseq_maps)) true)
             (>! (:over_500 (:session_id @lazyseq_maps)) true))))
        ;
        (close! (:get_next? (:session_id @lazyseq_maps)))
        (close! (:over_500 (:session_id @lazyseq_maps)))
        (.close conn))

     (when (<!! (:over_500 (:session_id @lazyseq_maps))) {:message "over 500 rows"
                                                          :id :session_id
                                                          :data (:data (:session_id @lazyseq_maps))}))))

我在doseq循环的帮助下获取行。当doseq超过500行时我停放自行车(when (<! (:get_next? (:session_id @lazyseq_maps)))并等待来自外部的信号来检索接下来的 500 行。

但这里我有一个问题。当我发送信号时,程序抛出错误“结果集已关闭”。即连接在开放范围之外关闭。但我不明白为什么,因为 go 块被放置在 with-open 范围内。你能帮我解决这个问题吗?


(go ...)立即返回,因此,也是如此(with-open ...).

您可能想以相反的方式进行操作:

(go (with-open ...))

但是,请注意,此进程将保留数据库连接(稀缺资源!)很长一段时间,这可能是不可取的,并且有点违背了“轻量级”线程的好处,因为go块。以下是一些可供考虑的替代方案:

  • 也许您可以为每个批次重新打开数据库连接?
  • 也许您可以急切地将整个结果集流式传输到外部存储(例如AWS S3)并让客户端对此进行轮询?
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Clojure core.async。如何在 with-open 中使用 go block 延迟下载? 的相关文章

随机推荐