这是我之前问题的延续如何在 Clojure 中按部分生成惰性序列? https://stackoverflow.com/questions/47049581/how-to-produce-a-lazy-sequence-by-portion-in-clojure
我想从数据库中分部分下载数据。最初,我下载前 500 行,然后发送请求以获取接下来的 500 行,依此类推,直到收到来自服务器的所有数据。
我写了代码:
(jdbc/atomic conn
(with-open [cursor (jdbc/fetch-lazy conn [sql_query])]
(let [lazyseq (jdbc/cursor->lazyseq cursor)
counter (atom 1)]
(swap! lazyseq_maps assoc :session_id {:get_next? (chan 1) :over_500 (chan 1) :data []})
(>!! (:get_next? (:session_id @lazyseq_maps)) true)
(go
(doseq [row lazyseq]
(swap! counter inc)
(when (<! (:get_next? (:session_id @lazyseq_maps)))
(swap! lazyseq_maps update-in [:session_id :data] conj row)
(if (not= 0 (mod @counter 500))
(>! (:get_next? (:session_id @lazyseq_maps)) true)
(>! (:over_500 (:session_id @lazyseq_maps)) true))))
;
(close! (:get_next? (:session_id @lazyseq_maps)))
(close! (:over_500 (:session_id @lazyseq_maps)))
(.close conn))
(when (<!! (:over_500 (:session_id @lazyseq_maps))) {:message "over 500 rows"
:id :session_id
:data (:data (:session_id @lazyseq_maps))}))))
我在doseq循环的帮助下获取行。当doseq超过500行时我停放自行车(when (<! (:get_next? (:session_id @lazyseq_maps)))
并等待来自外部的信号来检索接下来的 500 行。
但这里我有一个问题。当我发送信号时,程序抛出错误“结果集已关闭”。即连接在开放范围之外关闭。但我不明白为什么,因为 go 块被放置在 with-open 范围内。你能帮我解决这个问题吗?
(go ...)
立即返回,因此,也是如此(with-open ...)
.
您可能想以相反的方式进行操作:
(go (with-open ...))
但是,请注意,此进程将保留数据库连接(稀缺资源!)很长一段时间,这可能是不可取的,并且有点违背了“轻量级”线程的好处,因为go
块。以下是一些可供考虑的替代方案:
- 也许您可以为每个批次重新打开数据库连接?
- 也许您可以急切地将整个结果集流式传输到外部存储(例如AWS S3)并让客户端对此进行轮询?
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)