Spring Batch:组装作业而不是配置它(可扩展作业配置)

2023-12-31

背景

我正在设计一个文件读取层,它可以读取分隔文件并将其加载到List。我决定使用 Spring Batch,因为它提供了许多可扩展性选项,我可以根据文件的大小将其用于不同的文件集。

要求

  1. 我想设计一个通用的作业 API,可用于读取任何分隔文件。
  2. 应该有一个作业结构用于解析每个分隔文件。例如,如果系统需要读取5个文件,则将有5个作业(每个文件一个)。这 5 个作业唯一不同的地方是它们使用不同的FieldSetMapper、列名、目录路径和其他缩放参数,例如commit-interval and throttle-limit.
  3. 该API的用户不需要配置Spring 当系统中引入新的文件类型时,可以自行进行批处理、步骤、分块、分区等。
  4. 用户需要做的就是提供FieldsetMapper与作业一起使用commit-interval, throttle-limit以及每种类型的文件将放置的目录。
  5. 每个文件将有一个预定义目录。每个目录可以包含多个相同类型和格式的文件。 AMultiResourcePartioner将用于查看目录内部。分区数=目录中文件数。

我的要求是构建一个 Spring Batch 基础设施,为我提供一份独特的工作,一旦我掌握了构成该工作的零碎内容,我就可以启动该工作。

我的解决方案:

我创建了一个抽象配置类,它将由具体配置类扩展(每个要读取的文件将有 1 个具体类)。

    @Configuration
    @EnableBatchProcessing
    public abstract class AbstractFileLoader<T> {

    private static final String FILE_PATTERN = "*.dat";

    @Autowired
    JobBuilderFactory jobs;

    @Autowired
    ResourcePatternResolver resourcePatternResolver;

    public final Job createJob(Step s1, JobExecutionListener listener) {
        return jobs.get(this.getClass().getSimpleName())
                .incrementer(new RunIdIncrementer()).listener(listener)
                .start(s1).build();
    }

    public abstract Job loaderJob(Step s1, JobExecutionListener listener);

    public abstract FieldSetMapper<T> getFieldSetMapper();

    public abstract String getFilesPath();

    public abstract String[] getColumnNames();

    public abstract int getChunkSize();

    public abstract int getThrottleLimit();

    @Bean
    @StepScope
    @Value("#{stepExecutionContext['fileName']}")
    public FlatFileItemReader<T> reader(String file) {
        FlatFileItemReader<T> reader = new FlatFileItemReader<T>();
        String path = file.substring(file.indexOf(":") + 1, file.length());
        FileSystemResource resource = new FileSystemResource(path);
        reader.setResource(resource);
        DefaultLineMapper<T> lineMapper = new DefaultLineMapper<T>();
        lineMapper.setFieldSetMapper(getFieldSetMapper());
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(",");
        tokenizer.setNames(getColumnNames());
        lineMapper.setLineTokenizer(tokenizer);
        reader.setLineMapper(lineMapper);
        reader.setLinesToSkip(1);
        return reader;
    }

    @Bean
    public ItemProcessor<T, T> processor() {
        // TODO add transformations here
        return null;
    }

    @Bean
    @JobScope
    public ListItemWriter<T> writer() {
        ListItemWriter<T> writer = new ListItemWriter<T>();
        return writer;
    }

    @Bean
    @JobScope
    public Step readStep(StepBuilderFactory stepBuilderFactory,
            ItemReader<T> reader, ItemWriter<T> writer,
            ItemProcessor<T, T> processor, TaskExecutor taskExecutor) {

        final Step readerStep = stepBuilderFactory
                .get(this.getClass().getSimpleName() + " ReadStep:slave")
                .<T, T> chunk(getChunkSize()).reader(reader)
                .processor(processor).writer(writer).taskExecutor(taskExecutor)
                .throttleLimit(getThrottleLimit()).build();

        final Step partitionedStep = stepBuilderFactory
                .get(this.getClass().getSimpleName() + " ReadStep:master")
                .partitioner(readerStep)
                .partitioner(
                        this.getClass().getSimpleName() + " ReadStep:slave",
                        partitioner()).taskExecutor(taskExecutor).build();

        return partitionedStep;

    }

    /*
     * @Bean public TaskExecutor taskExecutor() { return new
     * SimpleAsyncTaskExecutor(); }
     */

    @Bean
    @JobScope
    public Partitioner partitioner() {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        Resource[] resources;
        try {
            resources = resourcePatternResolver.getResources("file:"
                    + getFilesPath() + FILE_PATTERN);
        } catch (IOException e) {
            throw new RuntimeException(
                    "I/O problems when resolving the input file pattern.", e);
        }
        partitioner.setResources(resources);
        return partitioner;
    }

    @Bean
    @JobScope
    public JobExecutionListener listener(ListItemWriter<T> writer) {
        return new JobCompletionNotificationListener<T>(writer);
    }

    /*
     * Use this if you want the writer to have job scope (JIRA BATCH-2269). Also
     * change the return type of writer to ListItemWriter for this to work.
     */
    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor() {
            @Override
            protected void doExecute(final Runnable task) {
                // gets the jobExecution of the configuration thread
                final JobExecution jobExecution = JobSynchronizationManager
                        .getContext().getJobExecution();
                super.doExecute(new Runnable() {
                    public void run() {
                        JobSynchronizationManager.register(jobExecution);

                        try {
                            task.run();
                        } finally {
                            JobSynchronizationManager.close();
                        }
                    }
                });
            }
        };
    }

}

假设为了讨论我必须读取发票数据。因此,我可以扩展上面的类来创建一个InvoiceLoader :

@Configuration
public class InvoiceLoader extends AbstractFileLoader<Invoice>{

    private class InvoiceFieldSetMapper implements FieldSetMapper<Invoice> {

        public Invoice mapFieldSet(FieldSet f) {
            Invoice invoice = new Invoice();
            invoice.setNo(f.readString("INVOICE_NO");
            return e;
        }
    }

    @Override
    public FieldSetMapper<Invoice> getFieldSetMapper() {
        return new InvoiceFieldSetMapper();
    }

    @Override
    public String getFilesPath() {
        return "I:/CK/invoices/partitions/";
    }

    @Override
    public String[] getColumnNames() {
        return new String[] { "INVOICE_NO", "DATE"};
    }


    @Override
    @Bean(name="invoiceJob")
    public Job loaderJob(Step s1,
            JobExecutionListener listener) {
        return createJob(s1, listener);
    }

    @Override
    public int getChunkSize() {
        return 25254;
    }

    @Override
    public int getThrottleLimit() {
        return 8;
    }

}

假设我还有一个名为Inventory延伸AbstractFileLoader.

在应用程序启动时,我可以加载这两个注释配置,如下所示:

AbstractApplicationContext context1 = new   AnnotationConfigApplicationContext(InvoiceLoader.class, InventoryLoader.class);

在我的应用程序的其他地方,两个不同的线程可以启动作业,如下所示:

主题 1:

    JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
    Job job1 = context1.getBean("invoiceJob", Job.class);
    JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);

话题2:

    JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
    Job job1 = context1.getBean("inventoryJob", Job.class);
    JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);

这种方法的优点是每次有一个新文件要读取时,开发人员/用户所要做的就是子类化AbstractFileLoader并实现所需的抽象方法,而无需深入了解如何组装作业的细节。

问题:

  1. 我是 Spring Batch 的新手,所以我可能忽略了这种方法的一些不太明显的问题,例如 Spring Batch 中的共享内部对象可能会导致两个作业一起运行失败,或者明显的问题,例如 bean 的范围。
  2. 有更好的方法来实现我的目标吗?
  3. The fileName的属性@Value("#{stepExecutionContext['fileName']}")总是被赋值为I:/CK/invoices/partitions/这是返回的值getPath中的方法InvoiceLoader即使 getPathmethod inInventoryLoader`返回不同的值。

一种选择是将它们作为作业参数传递。例如:

@Bean
Job job() {
    jobs.get("myJob").start(step1(null)).build()
}

@Bean
@JobScope
Step step1(@Value('#{jobParameters["commitInterval"]}') commitInterval) {
    steps.get('step1')
            .chunk((int) commitInterval)
            .reader(new IterableItemReader(iterable: [1, 2, 3, 4], name: 'foo'))
            .writer(writer(null))
            .build()
}

@Bean
@JobScope
ItemWriter writer(@Value('#{jobParameters["writerClass"]}') writerClass) {
    applicationContext.classLoader.loadClass(writerClass).newInstance()
}

With MyWriter:

class MyWriter implements ItemWriter<Integer> {

    @Override
    void write(List<? extends Integer> items) throws Exception {
        println "Write $items"
    }
}

然后执行:

def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([
        commitInterval: new JobParameter(3),
        writerClass: new JobParameter('MyWriter'), ]))

输出是:



INFO: Executing step: [step1]
Write [1, 2, 3]
Write [4]
Feb 24, 2016 2:30:22 PM org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [SimpleJob: [name=myJob]] completed with the following parameters: [{commitInterval=3, writerClass=MyWriter}] and the following status: [COMPLETED]
Status is: COMPLETED, job execution id 0
  #1 step1 COMPLETED
  

完整示例here https://gist.github.com/131635de24e21b1f905e.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Batch:组装作业而不是配置它(可扩展作业配置) 的相关文章

随机推荐

  • 具有自定义 JAX-B 绑定的 JAX-WS MarshalException:无法将类型“java.lang.String”封送为元素

    我似乎对 Jax WS 和 Jax b 协同工作有疑问 我需要使用一个具有预定义 WSDL 的 Web 服务 执行生成的客户端时 我收到以下错误 javax xml ws WebServiceException javax xml bind
  • Visual Studio 实体框架向导在 MySQL 上崩溃[重复]

    这个问题在这里已经有答案了 在使用实体框架向导对 MySQL 连接执行任何操作期 间 它会在第二页上崩溃而不会出现任何错误 问题与中相同实体框架向导在 MySQL 上崩溃 https stackoverflow com questions
  • Git-SVN 清除身份验证缓存

    如何让 git svn 忘记 svn 身份验证详细信息 我们有一台运行 Windows Server 2008 的配对机器 在该机器上有一个 git 存储库 并且我们签入到中央 subversion 存储库 我希望 git 在每次签入时提示
  • 嵌套目录中的 Symfony 2 项目

    我需要在生产服务器上的嵌套目录中部署 Symfony 2 项目 实际上 这意味着所有 URL 都以 subdirectory 路径为前缀 即 http host com subdirectory project web app php su
  • 导入错误:没有名为 xlwt 的模块

    我的系统 Windows Python 2 7 我下载了一个包并想将其包含在我的脚本中 解压包后 这是我的文件夹结构 Work xlwt 0 7 3 contains a setup py xlwt 包含 init py除其他外 我的脚本从
  • 如何从Python日期时间对象中删除秒? [复制]

    这个问题在这里已经有答案了 我有一个 python 日期时间对象 我想在网站上显示它 但是时间以 hh mm ss 格式显示 我想以 hh mm 格式显示它 我已尝试按照以下方式使用替换方法 message timestamp replac
  • 使用带有附加属性的“styled()”MUI 系统实用程序 (Typescript)

    我正在使用 MUI System v5 开发一个新项目 我在用着styled 这里的实用程序 不是样式组件 用于设计和创建简单的 UI 组件 该项目采用 TypeScript 我现在有很多困难 因为我不知道是否以及如何将道具传递给这些组件
  • jquery 中的 .clone() 方法不复制值[重复]

    这个问题在这里已经有答案了 可能的重复 没有内容的文本框的 Jquery 克隆 https stackoverflow com questions 4366159 jquery clone of a textbox without the
  • 用于将文本复制到剪贴板的独立于平台的工具

    我正在尝试编写一个函数将字符串参数复制到剪贴板 我打算在我一直在编写的 Python 脚本中使用它 这是我到目前为止所拥有的 在另一个堆栈溢出帖子中找到了大部分此片段 from tkinter import Tk def copy to c
  • 用Python在文件中间插入行?

    有没有办法做到这一点 假设我有一个文件 其中包含如下名称列表 Alfred Bill Donald 我如何在第 x 行 本例中为 3 插入第三个名字 Charlie 并自动将所有其他名字发送到一行 我见过其他类似的问题 但没有得到有用的答案
  • PUT 和 DELETE HTTP 请求方法有什么用处?

    我从未使用过 PUT 或 DELETE HTTP 请求方法 我的倾向是 当系统 我的应用程序或网站 的状态可能不受影响 如产品列表 时使用 GET 而当系统状态 如下订单 受到影响时 我倾向于使用 POST 这两个不是总是足够的 还是我错过
  • 布尔玛旋转木马没有响应

    我正在尝试将 bulma carousel 合并到我的 React 应用程序中 但它似乎不起作用 我尝试使用它来实现它布尔玛旋转木马 https wikiki github io components carousel 这个文档也是如此 但
  • Blazor 服务器客户端中的引导工具提示问题

    I am trying to get the formatting right for the tooltips but i cant figure out how to The code below works perfectly
  • 在未安装 Tensorflow 的情况下运行 Tensorflow 模型

    我有一个运行良好的 TF 模型 是用 Python 和 TFlearn 构建的 有没有办法在另一个系统上运行这个模型而不需要安装 Tensorflow 它已经经过预先训练 所以我只需要通过它运行数据即可 我知道 tfcompile 在这里发
  • QLineEdit python 方式大写输入

    我使用 QT Designer 绘制了一个 UI 但发现没有参数可供我将 QLineEdit 输入设置为大写 经过一些在线搜索后 我只看到了极少数满足我需求的结果 但所有结果都是用 Qt 编写的 例如 这个link http www qtf
  • Spring Initializr 项目导致不支持的类文件主要版本 64

    当我使用创建一个新项目时弹簧初始化 https start spring io Gradle 不会构建该项目 我使用 IntelliJ IDEA 错误信息是 Exception is org gradle cache CacheOpenEx
  • 从 Scipy 稀疏矩阵中获取唯一行

    我正在 python 中处理稀疏矩阵 我想知道是否有一种有效的方法来删除稀疏矩阵中的重复行 并且只保留唯一的行 我没有找到与之相关的函数 并且不知道如何在不将稀疏矩阵转换为密集矩阵并使用 numpy unique 的情况下执行此操作 没有快
  • 执行许多数据帧连接时出现 PySpark OutOfMemoryErrors

    关于这个问题的帖子很多 但没有一个回答我的问题 我遇到了OutOfMemoryError在 PySpark 中尝试将许多不同的数据帧连接在一起 我的本地机器有 16GB 内存 我的 Spark 配置如下 class SparkRawCons
  • 一次更改 pandas DataFrame 的多列中的某些值

    假设我有以下数据框 In 1 df Out 1 apple banana cherry 0 0 3 good 1 1 4 bad 2 2 5 good 这按预期工作 In 2 df apple df cherry bad np nan In
  • Spring Batch:组装作业而不是配置它(可扩展作业配置)

    背景 我正在设计一个文件读取层 它可以读取分隔文件并将其加载到List 我决定使用 Spring Batch 因为它提供了许多可扩展性选项 我可以根据文件的大小将其用于不同的文件集 要求 我想设计一个通用的作业 API 可用于读取任何分隔文