如何在 clojure 中从子进程执行非阻塞读取 stdout?

2024-01-06

我希望从 clojure 生成一个长期运行的子进程 通过标准流与该进程进行通信。

使用conch https://github.com/Raynes/conch图书馆,我可以 生成并读取进程,并从中读取数据out stream:

(def my-process (sh/proc "my_dumb_process"))
  ; read 10 lines from my-process's stdout. Will block until 10 lines taken
  (take 10 (line-seq (clojure.java.io/reader (:out p))))

我想在 my-process 打印时调用异步回调 到 stdout - 只要 stdout 流中有数据可用。

我对 clojure 有点陌生 - 有没有惯用的 clojure 方法 这?我浏览过 core.async 这很好,但我找不到 流的非阻塞解决方案。


用于我们目的的示例 shell 脚本(确保使其可执行),将其放置在 clojure 项目的根目录中以便于测试:

$ cat dumb.sh
#!/bin/bash

for i in 1 2 3 4 5
do
    echo "Loop iteration $i"
    sleep 2
done

现在我们将定义要执行的进程,启动它并获取标准输出((.getInputStream process)),一次读取一行并循环,直到完成。实时读取。

(defn run-proc
  [proc-name arg-string callback]
  (let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
        process (.start pbuilder)]
    (with-open [reader (clojure.java.io/reader (.getInputStream process))]
      (loop []
        (when-let [line (.readLine ^java.io.BufferedReader reader)]
          (callback line)
          (recur))))))

To test:

(run-proc "./dumb.sh" "" println)
About to start...
Loop iteration 1
Loop iteration 2
Loop iteration 3
Loop iteration 4
Loop iteration 5
=> nil

该函数将被阻止,对您的调用也将被阻止callback;你可以包裹在一个future如果你希望它在单独的线程中运行:

(future (callback line))

对于基于 core.async 的方法:

(defn run-proc-async
  [proc-name arg-string callback]
  (let [ch (async/chan 1000 (map callback))]
    (async/thread
      (let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
            process (.start pbuilder)]
        (with-open [reader (clojure.java.io/reader (.getInputStream process))]
          (loop []
            (when-let [line (.readLine ^java.io.BufferedReader reader)]
              (async/>!! ch line)
              (recur))))))
    ch))

这适用于您的callback函数作为通道上的转换器,结果被放置在函数返回的通道上:

(run-proc-async "./dumb.sh" "" #(let [cnt (count %)]
                                  (println "Counted" cnt "characters")
                                  cnt))

#object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters

(async/<!! *1)
=> 16

在此示例中,通道上有 1000 的缓冲区。因此,除非您开始从频道中获取信息,否则请致电>!!读取 1000 行后将阻塞。您也可以使用put!使用回调,但这里有一个内置的 1024 限制,无论如何你都应该处理结果。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 clojure 中从子进程执行非阻塞读取 stdout? 的相关文章

随机推荐