领先一步
VMware 提供培训和认证,助您加速进步。
了解更多Spring Batch 5.2 的新版本带来了海量新功能!Spring Batch 是处理大量但有限的顺序数据访问的引人注目的方式。例如:从 SQL 数据库读取数据并写入 CSV,或者从 FTP 服务器读取数据并分析 MongoDB 数据——这就是批处理。你们都知道这是什么。这项工作的一半(恕我双关!)是集成各种数据源和多个数据接收器。另一方面的考虑,正如你们可能想象到的,对于耗时且可能失败的工作负载,是维护与每个批处理作业运行相关的持久且广泛的元数据。同样,在这个版本中我无法深入介绍所有新颖之处!所以,让我们从宏观角度看看一些新功能。
JobRepository。在不久的过去,它有两个 JobRepository 实现:一个支持 JDBC,另一个通过内存中的 Map 支持“持久化”。Map 选项对于测试或持久化结果不太重要的纯粹性能导向的工作负载来说很方便。我们移除了 Map 实现,建议人们使用像 H2 这样的内存 SQL 数据库配合 JDBC JobRepository。有些人追求纯粹的性能,而 H2 选项不够好。在这个版本中,我们引入了一个“无资源”的 JobRepository,它不保存任何状态,甚至不保存在内存中。我们还为基于 JDBC 的 JobRepository 添加了一个持久化的替代方案,即引入了一个基于 MongoDB 的 JobRepository 实现。ItemReader 的 Spring Data JPA 查询注册提示提供了新的支持。ItemReader 时,为数据类——Kotlin data class 或 Java record 实例——提供了新的支持。Function<I,O>——适配到 ItemReader、ItemWriter 和 ItemProcessor 类型。CompositeItemReader<T>,它可以按顺序从多个委托的 ItemReader<T> 中读取数据。RecursiveCollectionLineAggregator 中支持可配置的行分隔符CompositeItemReader<T>让我们来看看我最喜欢的两个新功能:CompositeItemReader<T> 和 SEDA 友好的 BlockingQueueItemWriter 和 BlockingQueueItemReader 实现。
这是这个 Spring Batch 应用中唯一 Job 的定义
package com.example.bootiful_34.batch;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@Configuration
class BatchConfiguration {
static final BlockingQueue<Customer> CUSTOMERS = new LinkedBlockingQueue<>();
@Bean
Job job(JobRepository repository, Step one, Step two) {
return new JobBuilder("job", repository)//
.incrementer(new RunIdIncrementer()) //
.start(one)//
.next(two)//
.build();
}
}
这是一个简单的作业,包含两个 Step 实例,一个接着另一个。快速回顾一下:在 Spring Batch 中,Step 是一个工作单元。它描述了四件事:
ItemReader<T> 的实例表示)ItemWriter<T> 的实例表示)每个 Step 使用 ItemReader<I> 读取一个块(chunk)数据量,将一个类似集合的“块”传递给 ItemProcessor<I,O> 进行任意的修改,然后将 ItemProcessor<I,O> 的输出发送给 ItemWriter<O>。I 和 O 可以代表相同的泛型类型,也可以代表不同的类型。然后,循环继续,直到 ItemReader 中的所有数据都被耗尽。该步骤被认为是完成的,执行将继续到下一个步骤。
在这个示例应用程序中,我们将从 customer 表读取数据,读取 id、name、os 和 language 记录。我们还将从一个 .csv 文件读取类似的数据。我们将使用方便的新 CompositeItemReader<Customer> 来轻松完成这项工作,避免我们进行单独的规范化步骤。
package com.example.bootiful_34.batch;
import org.springframework.aot.hint.RuntimeHints;
import org.springframework.aot.hint.RuntimeHintsRegistrar;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.queue.BlockingQueueItemWriter;
import org.springframework.batch.item.support.CompositeItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportRuntimeHints;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.List;
@Configuration
@ImportRuntimeHints(StepOneConfiguration.CustomersCsvRuntimeHintsRegistrar.class)
class StepOneConfiguration {
private static final Resource CSV = new ClassPathResource("/customers.csv");
@Bean
FlatFileItemReader<Customer> customerCsvItemReader() {
return new FlatFileItemReaderBuilder<Customer>()//
.resource(CSV)
.delimited()
.names("id", "name", "language", "os")
.name("customerCsvItemReader")
.fieldSetMapper(fs -> new Customer(fs.readInt(0), fs.readString(1), fs.readString(2), fs.readString(3)))
.build();
}
@Bean
JdbcCursorItemReader<Customer> customerJdbcItemReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Customer>()//
.name("customerJdbcItemReader")//
.dataSource(dataSource)//
.sql("select id, name, language, os from customer")//
.rowMapper((rs, rowNum) -> new Customer(rs.getInt(1), rs.getString(2), rs.getString(3), rs.getString(4)))//
.build();
}
@Bean
CompositeItemReader<Customer> customerCompositeItemReader(JdbcCursorItemReader<Customer> customerJdbcItemReader,
FlatFileItemReader<Customer> customerCsvItemReader) {
return new CompositeItemReader<>(List.of(customerJdbcItemReader, customerCsvItemReader));
}
@Bean
BlockingQueueItemWriter<Customer> customerBlockingQueueItemWriter() {
return new BlockingQueueItemWriter<>(BatchConfiguration.CUSTOMERS);
}
@Bean
Step one(JobRepository repository, PlatformTransactionManager txm,
CompositeItemReader<Customer> customerCompositeItemReader,
BlockingQueueItemWriter<Customer> customerBlockingQueueItemWriter) {
return new StepBuilder("one", repository)//
.<Customer, Customer>chunk(10, txm)//
.reader(customerCompositeItemReader)//
.writer(customerBlockingQueueItemWriter)//
.build();
}
static class CustomersCsvRuntimeHintsRegistrar implements RuntimeHintsRegistrar {
@Override
public void registerHints(RuntimeHints hints, ClassLoader classLoader) {
hints.resources().registerResource(CSV);
}
}
}
在这个例子中,我们有三个 ItemReader bean,但该步骤只消耗一个 CompositeItemReader<T> bean。它又会按顺序读取 FlatFileItemReader<Customer> 和 JdbcCursorItemReader<Customer> bean 中的任何数据。
在这个例子中,我们没有配置 ItemProcessor<Customer,Customer>。
对于 ItemWriter<Customer>,我们使用了框架中另一个新颖的功能:BlockingQueueItemWriter<Customer>!这个想法很简单:写入器将数据写入 Java 的 java.util.concurrent.BlockingQueue。BlockingQueue 变量是 BatchConfiguration 类中定义的 static final 变量,名为 CUSTOMERS。下一个步骤将有一个配置的 BlockingQueueItemReader<T>,它将从同一个 java.util.concurrent.BlockingQueue 中读取。超级简单,对吧?是的!但这将节省大量时间。
传统上,Spring Batch 应用只具有与当前步骤相关的上下文。当数据通过作业流动时,Spring Batch Step 只提供了三次机会:从 ItemReader<I>、ItemProcessor<I,O> 和 ItemWriter<O>。想在数据写入后进行更多处理?那得等到下一步!你已经把它写到磁盘或其他持久化介质上了,然后你必须重新读取它。Spring Batch 会跟踪你的读写进度,那么为什么我们非要如此小心谨慎呢?为什么我们必须如此频繁地将所有内容持久化呢?
现在不再需要这样了,因为 Spring Batch 支持将给定的 Step 输出写入 BlockingQueue。值得注意的是,BlockingQueue 实例还具有支持限制写入数据量的额外好处。这与阶段式事件驱动架构(SEDA)的风格非常契合。SEDA 背后的思想是根据数据通过的不同阶段来定义工作。当数据从一个阶段移动到另一个阶段时,它会流入(有界)队列。这些有界队列提供反压。如果工作被拒绝,或者在容量超出时被简单地写入磁盘,你就不会压垮某个阶段的处理器。这就是所谓的反压,它对于可伸缩性至关重要。
每个阶段只从队列中获取其工作。这提供了一种自然的负载平衡方式:启动更多给定阶段的处理器实例,工作就会在它们之间平均分配。你可以通过 Spring Batch 的远程分区和分块范式将这种架构推向更远,让你能够跨集群分割工作。
这种架构通常与消息系统相关——队列通常被假定为消息总线中的队列(或主题);然而,这种架构背后的原则在批处理系统中同样适用。
让我们看看第二步!
package com.example.bootiful_34.batch;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.queue.BlockingQueueItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
class StepTwoConfiguration {
@Bean
Step two(JobRepository repository, PlatformTransactionManager transactionManager,
BlockingQueueItemReader<Customer> blockingQueueItemReader, ItemWriter<Customer> customerItemWriter) {
return new StepBuilder("two", repository)//
.<Customer, Customer>chunk(10, transactionManager)//
.reader(blockingQueueItemReader)//
.writer(customerItemWriter)//
.build();
}
@Bean
BlockingQueueItemReader<Customer> blockingQueueItemReader() {
return new BlockingQueueItemReader<>(BatchConfiguration.CUSTOMERS);
}
@Bean
ItemWriter<Customer> customerItemWriter() {
return chunk -> chunk.forEach(System.out::println);
}
}
在这里,我们定义了另一个 Step,它从同一个 BlockingQueue 读取,然后简单地打印出所有内容。
健壮、简单、可扩展的批处理?你还能要求什么?顺便说一句,请记住 Spring Batch 的大部分输入输出功能,都从 Java 21 的虚拟线程中获益良多,Spring Boot 已经支持了三年了!如果你使用 Java 21+,别忘了设置 spring.threads.virtual.enabled=true。(你至少在使用 Java 21,对吧?)