使用 Spring 批处理文件项读取器进行多线程处理

2024-03-06

在 Spring Batch 中,我尝试读取 CSV 文件,并希望将每一行分配给一个单独的线程并处理它。我尝试通过使用 TaskExecutor 来实现它,但是所有线程正在发生的事情是一次选择同一行。我也尝试使用 Partioner 来实现这个概念,也发生了同样的事情。请参阅下面我的配置 Xml。

步骤说明

    <step id="Step2">
        <tasklet task-executor="taskExecutor">
            <chunk reader="reader" processor="processor" writer="writer" commit-interval="1" skip-limit="1">
            </chunk>
        </tasklet> 
    </step>

              <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="file:cvs/user.csv" />

<property name="lineMapper">
    <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
      <!-- split it -->
      <property name="lineTokenizer">
            <bean
          class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
            <property name="names" value="userid,customerId,ssoId,flag1,flag2" />
        </bean>
      </property>
      <property name="fieldSetMapper">   

          <!-- map to an object -->
          <bean
            class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
            <property name="prototypeBeanName" value="user" />
          </bean>           
      </property>

      </bean>
  </property>

       </bean>

      <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
 <property name="concurrencyLimit" value="4"/>   

我尝试过不同类型的任务执行器,但它们的行为方式都相同。如何将每一行分配给单独的线程?


FlatFileItemReader 不是线程安全的。在您的示例中,您可以尝试将 CSV 文件拆分为更小的 CSV 文件,然后使用多资源分区器 http://docs.spring.io/spring-batch/apidocs/org/springframework/batch/core/partition/support/MultiResourcePartitioner.html处理它们中的每一个。这可以分两步完成,一个用于分割原始文件(如 10 个较小的文件),另一个用于处理分割后的文件。这样您就不会遇到任何问题,因为每个文件将由一个线程处理。

Example:

<batch:job id="csvsplitandprocess">
     <batch:step id="step1" next="step2master">
    <batch:tasklet>
        <batch:chunk reader="largecsvreader" writer="csvwriter" commit-interval="500">
        </batch:chunk>
    </batch:tasklet>
    </batch:step>
    <batch:step id="step2master">
    <partition step="step2" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</batch:step>
</batch:job>

<batch:step id="step2">
    <batch:tasklet>
        <batch:chunk reader="smallcsvreader" writer="writer" commit-interval="100">
        </batch:chunk>
    </batch:tasklet>
</batch:step>


<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="10" />
            <property name="maxPoolSize" value="10" />
    </bean>

<bean id="partitioner" 
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">
<property name="resources" value="file:cvs/extracted/*.csv" />
</bean>

除了分区之外,另一种选择可能是自定义线程安全读取器,它将为每一行创建一个线程,但分区可能是您的最佳选择

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Spring 批处理文件项读取器进行多线程处理 的相关文章

  • 通过推送通知唤醒

    Suppose 有一些对象 例如 一个数组a 和依赖于对象的条件 例如 a empty 当前线程以外的某些线程可以操作该对象 a 因此条件评估值的真实性会随着时间的推移而变化 如何让当前线程在代码中的某个时刻休眠 并在条件满足时通过推送通知
  • 除非在后台线程中获取新的引用,否则存在潜在的引用计数问题

    我有一个second https stackoverflow com questions 28898966 prefer property accessor or kvc style for accessing core data prop
  • Spring Boot中ServletContext初始化后如何创建bean?

    我有一个 bean 它实现 ServletContextAware 和 BeanFactoryPostProcessor 接口 我需要在 ServletContext 完成初始化后将此 bean 注册到 applicationContext
  • 有没有一种简单的方法来为每个类创建一个记录器实例?

    我现在使用静态方法来记录 因为我发现在Android中登录非常容易 但是现在我需要为不同的类配置不同的appender 所以我对静态记录方法有一个问题 我读了Log4J 创建 Logger 实例的策略 https stackoverflow
  • Spring Tool Suite 3.5.0 上的 Spring Roo 项目 [已关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我已经安装了 Spring Tool Suite 3 5 0 STS 我期望能够在 STS 中创建一个 Spring Roo 项目 就像过
  • Spring Integration:SecurityContext 传播

    我对 Spring Integration 中的 SecurityContext 传播有一些困惑 这是文档的要点 http docs spring io spring integration reference htmlsingle sec
  • 如何停止提交给 ExecutorService 的 Callable?

    我正在尝试实现一个示例应用程序来测试Callable and ExecutorService接口 在我的应用程序中我已经声明 ExecutorService exSvc Executors newSingleThreadExecutor T
  • 如何在url请求中发送数组

    我的要求如下 我想给出演员姓名 开始日期 结束日期并获取他在该时期出演的所有电影 因此 我的服务请求是这样的 http localhost 8080 MovieDB GetJson name Actor startDate 20120101
  • 调用许多网络服务的最佳方式?

    我有 30 家子公司 每家都实施了他们的 Web 服务 使用不同的技术 我需要实现一个Web服务来聚合它们 例如 所有子公司的Web服务都有一个名为的Web方法GetUserPoint int nationalCode 我需要实现我的网络服
  • 用于层次结构树角色的 Spring Security / Java EE 解决方案

    我知道 Spring Security 非常适合标准角色和基于权限的授权 我不确定的是这种情况 系统中管理着 10 000 名员工 员工被组织成组织结构图 跨部门的谁向谁报告的树 其中一些员工是用户 这些用户仅被允许访问其职责范围内的员工
  • 使用单独的线程在java中读取和写入文件

    我创建了两个线程并修改了 run 函数 以便一个线程读取一行 另一个线程将同一行写入新文件 这种情况会发生直到整个文件被复制为止 我遇到的问题是 即使我使用变量来控制线程一一执行 但线程的执行仍然不均匀 即一个线程执行多次 然后控制权转移
  • java中使用多线程调用同一类的不同方法

    我有一个类 如下所示 具有三种方法 public class MyRunnable implements Runnable Override public void run what code need to write here to c
  • 升级到 Spring boot 1.4 后服务无法启动

    我刚刚升级了所有 Spring Boot 服务1 3 6 RELEASE to 1 4 0 RELEASE 现在它们都在启动时崩溃 Caused by java lang NoClassDefFoundError org springfra
  • 如何在 Spring Boot 1.4 中使用 @DataJpaTest 和 SpringFox @EnableSwagger2 进行切片测试

    Re https spring io blog 2016 04 15 testing improvements in spring boot 1 4 https spring io blog 2016 04 15 testing impro
  • 奇怪的跨线程 UI 错误

    我正在编写一个 WinForms 应用程序 它有两种模式 控制台或 GUI 同一解决方案中的三个项目 一个用于控制台应用程序 一个用于 UI 表单 第三个用于保存两个界面也将连接的逻辑 控制台应用程序运行绝对流畅 保存用户选择的模型 它有一
  • 多线程Spring-boot控制器方法

    因此 我的应用程序 spring boot 运行速度非常慢 因为它使用 Selenium 来抓取数据 处理数据并显示在主页中 我遇到了多线程 我认为它对我的应用程序很有用 可以让它运行得更快 但是教程似乎显示在带有 main c 的普通 j
  • Spring Data JPA 选择不同

    我有一个情况 我需要建立一个select distinct a address from Person a 其中地址是 Person 内的地址实体 类型的查询 我正在使用规范动态构建我的 where 子句并使用findAll Specifi
  • 什么时候可以在 Java 中使用 Thead.stop() ?

    Thread stop 的 Java 文档听起来好像如果您调用 Thread stop 世界就会终结 已弃用 这种方法本质上是不安全的 停止线程 Thread stop 导致它解锁所有已锁定的监视器 作为未经检查的 ThreadDeath
  • @Autowire注释的问题(空)

    我在验证器类中自动连接的两个服务有问题 这些服务工作正常 因为在我的控制器中是自动连接的 我有一个 applicationContext xml 文件和 MyApp servlet xml 文件 我的基础包是 es unican meteo
  • 为什么 Web Worker 性能在 30 秒后急剧下降?

    我正在尝试提高在网络工作人员中执行时脚本的性能 它旨在解析浏览器中的大型文本文件而不会崩溃 一切都运行得很好 但我注意到使用网络工作者时大文件的性能存在严重差异 于是我做了一个简单的实验 我在同一输入上运行脚本两次 第一次运行在页面的主线程

随机推荐

  • Keycloak 重定向 URI 正在将端口 0 添加到 url

    在keycloak 中遇到redirect uri 错误 发现 JIRA 记录了相同的问题KEYCLOAK 7237 https issues jboss org browse KEYCLOAK 7237 只是想检查一下是否有解决办法 有人
  • AngularJS:是否可以使控制器中的特定表单输入字段无效?

    我有一个地址文本输入字段 每当输入地址并单击输入字段旁边的搜索按钮时 我都会对地址进行地理编码 并将输入文本替换为地理编码器的结果 我还设置了一个范围变量 addressOk 是否可以根据以下内容使该特定表单输入字段无效 scope add
  • 将原始 JSON 加载到 Pig 中

    我有一个文件 其中每一行都是一个 JSON 对象 实际上 它是 stackoverflow 的转储 我想尽可能轻松地将其加载到 Apache Pig 中 但我无法弄清楚如何告诉 Pig 输入格式是什么 这是一个条目的示例 id oid 50
  • 为什么 C 提供的整数类型对于基本上任何项目来说都不够好?

    我更像是一名系统管理员而不是程序员 但我确实花费了大量的时间研究程序员的代码 试图找出问题所在 以及数量令人不安的that当程序员期望 u ll int32 t 或其他任何定义 是的 我知道这不是真的 但要么期望定义该类型的文件位于其他地方
  • 记录 CMake 脚本

    我发现自己处于一种情况 我想准确记录大量自定义 CMake 宏和函数 并且想知道如何做到这一点 首先想到的是简单地使用内置语法并且仅使用文档脚本 如下所示 FUNCTION NAME MACRO NAME description 这可以 但
  • 从适配器调用片段方法

    我需要在适配器中调用 Fragment 方法 但出现错误 ClassCastException Main MainActivity 无法转换为 PlayPauseClick Interface 我在我的片段中实现了我的界面 但我仍然收到此错
  • 每行字符数和每个文本区域的行数限制

    我正在尝试在 php 页面中创建一个多行文本区域 并且我想验证用户是否无法每行插入超过 50 个字符或超过 50 行 这个想法是 用户可以从电子表格中粘贴某些内容 但如果一行超过 50 个字符 则其余字符将被丢弃 并且不会插入到下一行中 这
  • 如何以编程方式检测位图是否具有 Alpha 通道?

    作为主题 最好使用C代码 MFC 版本 private static Boolean gc BitmapHasAlpha BitmapData gc bmpData if bmpData gt PixelFormat PixelFormat
  • Sitecore 文件夹和 IIS 权限

    设置或移动 Sitecore 解决方案时 您必须记住设置正确的文件夹权限和 IIS 权限 它类似于此处的第 3 3 3 2 3 3 3 9 节 http sdn sitecore net Products Sitecore 20V5 Sit
  • 存储一些大文本时哪个更好:XML资源文件中的字符串或类中的java字符串

    我必须存储一些长文本 以便在文本视图中使用 我一直在 xml 文件中使用字符串 但我不知道每个文本的 java 字符串是否会更好 所以 这是存储它们的最佳方式 在 XML 资源文件中创建字符串 创建一个类并将文本存储在java字符串中 Th
  • 在java中生成随机数列表

    我生成一个随机数 0 或 1 int randomColor Math random lt 0 5 0 1 我需要创建 52 个随机数 其中 26 个为 0 26 个为 1 您可以这样做 创建一个List共 52 个号码 用 26 个零和
  • 使用 sbt 从代码启动 scala repl 循环

    我正在尝试启动 scala repl 循环 使用breakif 并且正在从 SBT 构建 运行 并且我尝试遵循常见问题解答中的建议 但无法使其正常工作 有人可以给出一个用于配置设置的 MyType 示例吗 MyType 是一个代表性类 应包
  • 创建没有管理员访问权限的 venv python

    当我运行 python 时 m venv pathtomyvenv Error Command C Users user manageSQL Scripts python exe Im ensurepip upgrade default p
  • 服务器使用服务器端作为 servlet 发送事件

    我有一个使用 servlet 的简单服务器发送事件的运行实现 protected void doGet HttpServletRequest request HttpServletResponse response throws Servl
  • 在多个存储库中使用相同的 DbContext 是否明智?

    更深入地了解实体框架和存储库 以便更好地进行测试 想知道这是否明智 public interface IRepository int SaveChanges void Dispose using MyContext context new
  • Google colab 笔记本上的柯基模式是什么?

    什么是柯基模式在谷歌合作实验室做什么 可从以下位置访问Tools gt Preferences 柯基犬模式设置在标题中添加动画柯基犬 https twitter com GoogleColab status 1116487177364365
  • 在 RMarkdown 文档的 R 包中包含 TeX 标头

    我想创建一个包含 Latex 头文件的 R 包 然后可以从 RMarkdown 文档中使用该头文件来通过 TeX 创建带有幻灯片的 PDF 当我在 RMarkdown 文档的标题中包含对 Latex 文件的引用时 我可以创建幻灯片 但我不知
  • AVFoundation + AssetWriter:用图像和音频生成电影

    我必须从 iPhone 应用程序导出一部电影 其中包含 NSArray 中的 UIImage 并添加一些 caf 格式的音频文件 这些文件必须在预先指定的时间开始 现在我已经能够使用 AVAssetWriter 在浏览了本网站和其他网站上的
  • 是否可以制作一个按钮作为文件上传按钮?

    我是 php 新手 我有一个表单 在上面放置一个按钮 值作为Upload MB当用户单击此按钮时 它会重定向到一个 Web 表单 我在其中放置文件上传控件和用户上传文件 这是图像 单击此按钮后 用户将重定向到此表单 此处用户上传文件 我的问
  • 使用 Spring 批处理文件项读取器进行多线程处理

    在 Spring Batch 中 我尝试读取 CSV 文件 并希望将每一行分配给一个单独的线程并处理它 我尝试通过使用 TaskExecutor 来实现它 但是所有线程正在发生的事情是一次选择同一行 我也尝试使用 Partioner 来实现