Spring Batch:组装作业而不是配置它(可扩展作业配置)



我正在设计一个文件读取层,它可以读取分隔文件并将其加载到List。我决定使用 Spring Batch,因为它提供了许多可扩展性选项,我可以根据文件的大小将其用于不同的文件集。


  1. 我想设计一个通用的作业 API,可用于读取任何分隔文件。
  2. 应该有一个作业结构用于解析每个分隔文件。例如,如果系统需要读取5个文件,则将有5个作业(每个文件一个)。这 5 个作业唯一不同的地方是它们使用不同的FieldSetMapper、列名、目录路径和其他缩放参数,例如commit-interval and throttle-limit.
  3. 该API的用户不需要配置Spring 当系统中引入新的文件类型时,可以自行进行批处理、步骤、分块、分区等。
  4. 用户需要做的就是提供FieldsetMapper与作业一起使用commit-interval, throttle-limit以及每种类型的文件将放置的目录。
  5. 每个文件将有一个预定义目录。每个目录可以包含多个相同类型和格式的文件。 AMultiResourcePartioner将用于查看目录内部。分区数=目录中文件数。

我的要求是构建一个 Spring Batch 基础设施,为我提供一份独特的工作,一旦我掌握了构成该工作的零碎内容,我就可以启动该工作。


我创建了一个抽象配置类,它将由具体配置类扩展(每个要读取的文件将有 1 个具体类)。

    public abstract class AbstractFileLoader<T> {

    private static final String FILE_PATTERN = "*.dat";

    JobBuilderFactory jobs;

    ResourcePatternResolver resourcePatternResolver;

    public final Job createJob(Step s1, JobExecutionListener listener) {
        return jobs.get(this.getClass().getSimpleName())
                .incrementer(new RunIdIncrementer()).listener(listener)

    public abstract Job loaderJob(Step s1, JobExecutionListener listener);

    public abstract FieldSetMapper<T> getFieldSetMapper();

    public abstract String getFilesPath();

    public abstract String[] getColumnNames();

    public abstract int getChunkSize();

    public abstract int getThrottleLimit();

    public FlatFileItemReader<T> reader(String file) {
        FlatFileItemReader<T> reader = new FlatFileItemReader<T>();
        String path = file.substring(file.indexOf(":") + 1, file.length());
        FileSystemResource resource = new FileSystemResource(path);
        DefaultLineMapper<T> lineMapper = new DefaultLineMapper<T>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(",");
        return reader;

    public ItemProcessor<T, T> processor() {
        // TODO add transformations here
        return null;

    public ListItemWriter<T> writer() {
        ListItemWriter<T> writer = new ListItemWriter<T>();
        return writer;

    public Step readStep(StepBuilderFactory stepBuilderFactory,
            ItemReader<T> reader, ItemWriter<T> writer,
            ItemProcessor<T, T> processor, TaskExecutor taskExecutor) {

        final Step readerStep = stepBuilderFactory
                .get(this.getClass().getSimpleName() + " ReadStep:slave")
                .<T, T> chunk(getChunkSize()).reader(reader)

        final Step partitionedStep = stepBuilderFactory
                .get(this.getClass().getSimpleName() + " ReadStep:master")
                        this.getClass().getSimpleName() + " ReadStep:slave",

        return partitionedStep;


     * @Bean public TaskExecutor taskExecutor() { return new
     * SimpleAsyncTaskExecutor(); }

    public Partitioner partitioner() {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        Resource[] resources;
        try {
            resources = resourcePatternResolver.getResources("file:"
                    + getFilesPath() + FILE_PATTERN);
        } catch (IOException e) {
            throw new RuntimeException(
                    "I/O problems when resolving the input file pattern.", e);
        return partitioner;

    public JobExecutionListener listener(ListItemWriter<T> writer) {
        return new JobCompletionNotificationListener<T>(writer);

     * Use this if you want the writer to have job scope (JIRA BATCH-2269). Also
     * change the return type of writer to ListItemWriter for this to work.
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor() {
            protected void doExecute(final Runnable task) {
                // gets the jobExecution of the configuration thread
                final JobExecution jobExecution = JobSynchronizationManager
                super.doExecute(new Runnable() {
                    public void run() {

                        try {
                        } finally {


假设为了讨论我必须读取发票数据。因此,我可以扩展上面的类来创建一个InvoiceLoader :

public class InvoiceLoader extends AbstractFileLoader<Invoice>{

    private class InvoiceFieldSetMapper implements FieldSetMapper<Invoice> {

        public Invoice mapFieldSet(FieldSet f) {
            Invoice invoice = new Invoice();
            return e;

    public FieldSetMapper<Invoice> getFieldSetMapper() {
        return new InvoiceFieldSetMapper();

    public String getFilesPath() {
        return "I:/CK/invoices/partitions/";

    public String[] getColumnNames() {
        return new String[] { "INVOICE_NO", "DATE"};

    public Job loaderJob(Step s1,
            JobExecutionListener listener) {
        return createJob(s1, listener);

    public int getChunkSize() {
        return 25254;

    public int getThrottleLimit() {
        return 8;




AbstractApplicationContext context1 = new   AnnotationConfigApplicationContext(InvoiceLoader.class, InventoryLoader.class);


主题 1:

    JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
    Job job1 = context1.getBean("invoiceJob", Job.class);
    JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);


    JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
    Job job1 = context1.getBean("inventoryJob", Job.class);
    JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);



  1. 我是 Spring Batch 的新手,所以我可能忽略了这种方法的一些不太明显的问题,例如 Spring Batch 中的共享内部对象可能会导致两个作业一起运行失败,或者明显的问题,例如 bean 的范围。
  2. 有更好的方法来实现我的目标吗?
  3. The fileName的属性@Value("#{stepExecutionContext['fileName']}")总是被赋值为I:/CK/invoices/partitions/这是返回的值getPath中的方法InvoiceLoader即使 getPathmethod inInventoryLoader`返回不同的值。


Job job() {

Step step1(@Value('#{jobParameters["commitInterval"]}') commitInterval) {
            .chunk((int) commitInterval)
            .reader(new IterableItemReader(iterable: [1, 2, 3, 4], name: 'foo'))

ItemWriter writer(@Value('#{jobParameters["writerClass"]}') writerClass) {

With MyWriter:

class MyWriter implements ItemWriter<Integer> {

    void write(List<? extends Integer> items) throws Exception {
        println "Write $items"


def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([
        commitInterval: new JobParameter(3),
        writerClass: new JobParameter('MyWriter'), ]))


INFO: Executing step: [step1]
Write [1, 2, 3]
Write [4]
Feb 24, 2016 2:30:22 PM org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [SimpleJob: [name=myJob]] completed with the following parameters: [{commitInterval=3, writerClass=MyWriter}] and the following status: [COMPLETED]
Status is: COMPLETED, job execution id 0
  #1 step1 COMPLETED

完整示例here https://gist.github.com/131635de24e21b1f905e.


