用于我们目的的示例 shell 脚本(确保使其可执行),将其放置在 clojure 项目的根目录中以便于测试:
$ cat dumb.sh
#!/bin/bash
for i in 1 2 3 4 5
do
echo "Loop iteration $i"
sleep 2
done
现在我们将定义要执行的进程,启动它并获取标准输出((.getInputStream process)
),一次读取一行并循环,直到完成。实时读取。
(defn run-proc
[proc-name arg-string callback]
(let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
process (.start pbuilder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))]
(loop []
(when-let [line (.readLine ^java.io.BufferedReader reader)]
(callback line)
(recur))))))
To test:
(run-proc "./dumb.sh" "" println)
About to start...
Loop iteration 1
Loop iteration 2
Loop iteration 3
Loop iteration 4
Loop iteration 5
=> nil
该函数将被阻止,对您的调用也将被阻止callback
;你可以包裹在一个future
如果你希望它在单独的线程中运行:
(future (callback line))
对于基于 core.async 的方法:
(defn run-proc-async
[proc-name arg-string callback]
(let [ch (async/chan 1000 (map callback))]
(async/thread
(let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
process (.start pbuilder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))]
(loop []
(when-let [line (.readLine ^java.io.BufferedReader reader)]
(async/>!! ch line)
(recur))))))
ch))
这适用于您的callback
函数作为通道上的转换器,结果被放置在函数返回的通道上:
(run-proc-async "./dumb.sh" "" #(let [cnt (count %)]
(println "Counted" cnt "characters")
cnt))
#object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
(async/<!! *1)
=> 16
在此示例中,通道上有 1000 的缓冲区。因此,除非您开始从频道中获取信息,否则请致电>!!
读取 1000 行后将阻塞。您也可以使用put!
使用回调,但这里有一个内置的 1024 限制,无论如何你都应该处理结果。