更新:我发现如果我设置ThreadPoolExecutor's
核心池大小与最大池大小相同(29 个线程)。但是,如果我将核心池大小设置为 11,最大池大小设置为 29,那么 Actor 系统只会创建 11 个线程。我该如何配置ActorSystem
/ ThreadPoolExecutor
继续创建线程以超过核心线程数并保持在最大线程数之内?我不想将核心线程计数设置为最大线程计数,因为我只需要额外的线程来取消作业(这应该是一个罕见的事件)。
我有一个针对 Oracle 数据库运行的批处理程序,使用 Java/Akka 类型的 actor 和以下 actor 来实现:
-
BatchManager
负责与 REST 控制器对话。它管理着一个Queue
未初始化的批处理作业;当从队列中轮询未初始化的批处理作业时,它将变成JobManager
演员并被处决。
-
JobManager
维护一个存储过程队列和一个池Workers
;它初始化每个Worker
使用存储过程,并且当Worker
完成后,它将过程的结果发送到JobManager
,以及JobManager
发送另一个存储过程到Worker
。当作业队列为空且所有作业均结束时,批处理终止Workers
处于空闲状态,此时JobManager
将其结果报告给BatchManager
,关闭其工作人员(通过TypedActor.context().stop()
),然后自行关闭。这JobManager
has a Promise<Status> completion
当作业成功完成时,或者当作业由于取消或致命异常而终止时,即完成。
-
Worker
执行存储过程。它创造了Oracle连接 https://docs.oracle.com/cd/E18283_01/appdev.112/e13995/oracle/jdbc/OracleConnection.html and a 可调用语句 https://docs.oracle.com/javase/7/docs/api/java/sql/CallableStatement.html用于执行存储过程,并注册一个onFailure
回调与JobManager.completion
to abort
连接和cancel
该声明。此回调不使用参与者系统的执行上下文,而是使用从创建的缓存执行器服务创建的执行上下文BatchManager
.
我的配置是
{"akka" : { "actor" : { "default-dispatcher" : {
"type" : "Dispatcher",
"executor" : "default-executor",
"throughput" : "1",
"default-executor" : { "fallback" : "thread-pool-executor" }
"thread-pool-executor" : {
"keep-alive-time" : "60s",
"core-pool-size-min" : coreActorCount,
"core-pool-size-max" : coreActorCount,
"max-pool-size-min" : maxActorCount,
"max-pool-size-max" : maxActorCount,
"task-queue-size" : "-1",
"task-queue-type" : "linked",
"allow-core-timeout" : "on"
}}}}}
目前worker的数量是在其他地方配置的workerCount = 8
; coreActorCount
is workerCount + 3
while maxActorCount
is workerCount * 3 + 5
。我正在配备两个核心和 8GB 内存的 Macbook Pro 10 上进行测试;生产服务器要大得多。我正在谈论的数据库位于速度极慢的 VPN 后面。我使用 Oracle 的 JavaSE 1.8 JVM 运行所有这些。本地服务器是 Tomcat 7。Oracle JDBC 驱动程序是版本 10.2(我也许能够说服当局使用更新版本)。所有方法要么返回void
or Future<>
并且应该是非阻塞的。
当一批成功终止时,就没有问题了——下一批立即开始,并有完整的工作人员。但是,如果我通过终止当前批次JobManager#completion.tryFailure(new CancellationException("Batch cancelled"))
那么onFailure
注册的回调Workers
熄火,然后系统变得无响应。调试 printlns 指示新批次以八个正常运行的工作线程中的三个开始,并且BatchManager
变得完全没有反应(我添加了一个Future<String> ping
只返回一个命令Futures.successful("ping")
并且这也会超时)。这onFailure
回调在单独的线程池上执行,即使它们在参与者系统的线程池上,我也应该有足够高的max-pool-size
为了容纳原来的JobManager
, its Workers
, its onFailure
回调,以及第二个JobManager
and is Workers
。相反,我似乎在适应原来的JobManager
和它的Workers
, 新的JobManager
以及不到一半的Workers
,并且没有剩下任何东西BatchManager.
我运行它的计算机资源不足,但看起来它应该能够运行十几个线程。
这是配置问题吗?这是由于 JVM 施加的限制和/或 Tomcat 施加的限制吗?这是由于我处理阻塞 IO 的方式有问题吗?可能还有其他几件事我可能做错了,这些只是我想到的。
Cancellable 语句的要点 https://gist.github.com/kcleereman/48d3bfaa4a6599160867哪里的CallableStatement
and OracleConnection
被取消
不变的要点 https://gist.github.com/kcleereman/f8f11d010f88df5b36ce where CancellableStatements
被创建
JobManager 清理代码要点 https://gist.github.com/kcleereman/d729d47d80262961f1a6
配置转储 https://gist.github.com/kcleereman/ee28474af7b4bc2c1f9e通过获得System.out.println(mergedConfig.toString());
编辑:我相信我已经将问题范围缩小到参与者系统(它的配置或其与阻塞数据库调用的交互)。我消除了Worker
演员并将他们的工作量转移到Runnables
以固定大小执行ThreadPoolExecutor
,其中每个JobManager
创建自己的ThreadPoolExecutor
并在批次完成时将其关闭(shutDown
正常终止时,shutDownNow
异常终止)。取消在实例化的缓存线程池上运行BatchManager
。 Actor系统的调度程序仍然是ThreadPoolExecutor
但只分配了六个线程。使用此替代设置,取消将按预期执行 - 当数据库连接中止时,工作线程终止,并且新的连接将终止。JobManager
立即执行完整的工作线程。这向我表明这不是硬件/JVM/Tomcat 问题。
更新:我使用了线程转储Eclipse 的内存分析器 https://eclipse.org/mat/。我发现取消线程挂在CallableStatement.close()
,所以我重新排序取消,以便OracleConnection.abort()
之前CallableStatement.cancel()
这解决了问题 - 取消全部(显然)正确执行。这Worker
不过,线程继续坚持他们的声明 - 我怀疑我的 VPN 可能部分或全部归咎于此。
PerformanceAsync-akka.actor.default-dispatcher-19
at java.net.SocketInputStream.socketRead0(Ljava/io/FileDescriptor;[BIII)I (Native Method)
at java.net.SocketInputStream.read([BIII)I (SocketInputStream.java:150)
at java.net.SocketInputStream.read([BII)I (SocketInputStream.java:121)
at oracle.net.ns.Packet.receive()V (Unknown Source)
at oracle.net.ns.DataPacket.receive()V (Unknown Source)
at oracle.net.ns.NetInputStream.getNextPacket()V (Unknown Source)
at oracle.net.ns.NetInputStream.read([BII)I (Unknown Source)
at oracle.net.ns.NetInputStream.read([B)I (Unknown Source)
at oracle.net.ns.NetInputStream.read()I (Unknown Source)
at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1()S (T4CMAREngine.java:1109)
at oracle.jdbc.driver.T4CMAREngine.unmarshalSB1()B (T4CMAREngine.java:1080)
at oracle.jdbc.driver.T4C8Oall.receive()V (T4C8Oall.java:485)
at oracle.jdbc.driver.T4CCallableStatement.doOall8(ZZZZ)V (T4CCallableStatement.java:218)
at oracle.jdbc.driver.T4CCallableStatement.executeForRows(Z)V (T4CCallableStatement.java:971)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout()V (OracleStatement.java:1192)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal()I (OraclePreparedStatement.java:3415)
at oracle.jdbc.driver.OraclePreparedStatement.execute()Z (OraclePreparedStatement.java:3521)
at oracle.jdbc.driver.OracleCallableStatement.execute()Z (OracleCallableStatement.java:4612)
at com.util.CPProcExecutor.execute(Loracle/jdbc/OracleConnection;Ljava/sql/CallableStatement;Lcom/controller/BaseJobRequest;)V (CPProcExecutor.java:57)
然而,即使在修复取消顺序之后,我仍然遇到演员系统没有创建足够线程的问题:我仍然只在新批次中获得八分之三的工作人员,并且随着取消的工作人员的增加而添加新的工作人员他们的网络连接超时。我总共有 11 个线程 - 我的核心池大小,29 个线程 - 我的最大池大小。显然,演员系统忽略了我的最大池大小参数,或者我没有正确配置最大池大小。