创建批处理服务

本指南将引导您完成创建基本批处理驱动解决方案的过程。

您将构建什么

您将构建一个服务,该服务从 CSV 电子表格导入数据,使用自定义代码对其进行转换,并将最终结果存储到数据库中。

您需要什么

如何完成本指南

与大多数 Spring 入门指南 一样,您可以从头开始并完成每个步骤,也可以跳过您已经熟悉的步骤。无论哪种方式,您最终都会获得可运行的代码。

从头开始,请继续进行 Spring Initializr 入门

跳过基础步骤,请执行以下操作

完成之后,您可以将您的结果与 gs-batch-processing/complete 中的代码进行比较。

Spring Initializr 入门

您可以使用此 预初始化项目 并单击“生成”以下载 ZIP 文件。此项目已配置为适合本教程中的示例。

手动初始化项目

  1. 导航到 https://start.spring.io。此服务将引入应用程序所需的所有依赖项,并为您完成大部分设置工作。

  2. 选择 Gradle 或 Maven 以及您要使用的语言。本指南假设您选择了 Java。

  3. 单击依赖项并选择Spring BatchHyperSQL 数据库

  4. 单击生成

  5. 下载生成的 ZIP 文件,这是一个使用您的选择配置的 Web 应用程序的存档。

如果您的 IDE 集成了 Spring Initializr,则可以在 IDE 中完成此过程。
您也可以从 Github 分叉项目并在您的 IDE 或其他编辑器中打开它。

业务数据

通常,您的客户或业务分析师会提供电子表格。在此简单示例中,您可以在 src/main/resources/sample-data.csv 中找到一些虚构的数据。

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

此电子表格每行包含一个名字和一个姓氏,用逗号分隔。这是一种相当常见的模式,Spring 可以无需自定义即可处理。

接下来,您需要编写一个 SQL 脚本以创建一个表来存储数据。您可以在 src/main/resources/schema-all.sql 中找到此类脚本。

DROP TABLE people IF EXISTS;

CREATE TABLE people  (
    person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
);
Spring Boot 在启动期间自动运行 schema-@@platform@@.sql-all 是所有平台的默认值。

创建业务类

现在您可以看到数据输入和输出的格式,您可以编写代码来表示一行数据,如下面的示例(来自 src/main/java/com/example/batchprocessing/Person.java)所示。

package com.example.batchprocessing;

public record Person(String firstName, String lastName) {

}

您可以通过构造函数使用名字和姓氏来实例化 Person 记录。

创建中间处理器

批处理中的一种常见范例是:摄取数据、转换数据,然后将其传递到其他地方。在这里,您需要编写一个简单的转换器,将名称转换为大写。下面的列表(来自 src/main/java/com/example/batchprocessing/PersonItemProcessor.java)显示了如何操作。

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

  private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

  @Override
  public Person process(final Person person) {
    final String firstName = person.firstName().toUpperCase();
    final String lastName = person.lastName().toUpperCase();

    final Person transformedPerson = new Person(firstName, lastName);

    log.info("Converting (" + person + ") into (" + transformedPerson + ")");

    return transformedPerson;
  }

}

PersonItemProcessor 实现 Spring Batch 的 ItemProcessor 接口。这使得可以轻松地将代码连接到您将在本指南后面定义的批处理作业中。根据接口,您将接收传入的 Person 对象,然后将其转换为大写的 Person

输入和输出类型不必相同。实际上,在读取一个数据源后,有时应用程序的数据流需要不同的数据类型。

组合批处理作业

现在您需要组合实际的批处理作业。Spring Batch 提供了许多实用程序类,从而减少了编写自定义代码的需求。相反,您可以专注于业务逻辑。

要配置您的作业,您必须首先创建一个 Spring @Configuration 类,例如 src/main/java/com/example/batchprocessing/BatchConfiguration.java 中的以下示例。此示例使用基于内存的数据库,这意味着完成后,数据将消失。现在,将以下 bean 添加到您的 BatchConfiguration 类中以定义读取器、处理器和写入器。

@Bean
public FlatFileItemReader<Person> reader() {
  return new FlatFileItemReaderBuilder<Person>()
    .name("personItemReader")
    .resource(new ClassPathResource("sample-data.csv"))
    .delimited()
    .names("firstName", "lastName")
    .targetType(Person.class)
    .build();
}

@Bean
public PersonItemProcessor processor() {
  return new PersonItemProcessor();
}

@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
  return new JdbcBatchItemWriterBuilder<Person>()
    .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
    .dataSource(dataSource)
    .beanMapped()
    .build();
}

第一段代码定义了输入、处理器和输出。

  • reader() 创建一个 ItemReader。它查找名为 sample-data.csv 的文件,并使用足够的信息解析每个行项目以将其转换为 Person

  • processor() 创建您之前定义的 PersonItemProcessor 的实例,用于将数据转换为大写。

  • writer(DataSource) 创建一个 ItemWriter。此写入器面向 JDBC 目标,并自动获得 Spring Boot 创建的 dataSource 的副本。它包含插入单个 Person 所需的 SQL 语句,由 Java 记录组件驱动。

最后一段代码(来自 src/main/java/com/example/batchprocessing/BatchConfiguration.java)显示了实际的作业配置。

@Bean
public Job importUserJob(JobRepository jobRepository,Step step1, JobCompletionNotificationListener listener) {
  return new JobBuilder("importUserJob", jobRepository)
    .listener(listener)
    .start(step1)
    .build();
}

@Bean
public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
          FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
  return new StepBuilder("step1", jobRepository)
    .<Person, Person> chunk(3, transactionManager)
    .reader(reader)
    .processor(processor)
    .writer(writer)
    .build();
}

第一个方法定义了作业,第二个方法定义了一个步骤。作业由步骤构建而成,其中每个步骤都可以包含读取器、处理器和写入器。

然后列出每个步骤(尽管此作业只有一个步骤)。作业结束,Java API 生成一个完美配置的作业。

在步骤定义中,您定义一次写入多少数据。在本例中,它一次写入最多三个记录。接下来,您可以使用前面注入的 bean 配置读取器、处理器和写入器。

chunk() 前缀为 <Person,Person>,因为它是一个泛型方法。这表示每个处理“块”的输入和输出类型,并与 ItemReader<Person>ItemWriter<Person> 对齐。

批处理配置的最后一点是,当作业完成时获得通知的一种方法。下面的示例(来自 src/main/java/com/example/batchprocessing/JobCompletionNotificationListener.java)显示了这样一个类。

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener implements JobExecutionListener {

  private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

  private final JdbcTemplate jdbcTemplate;

  public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
    this.jdbcTemplate = jdbcTemplate;
  }

  @Override
  public void afterJob(JobExecution jobExecution) {
    if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
      log.info("!!! JOB FINISHED! Time to verify the results");

      jdbcTemplate
          .query("SELECT first_name, last_name FROM people", new DataClassRowMapper<>(Person.class))
          .forEach(person -> log.info("Found <{{}}> in the database.", person));
    }
  }
}

JobCompletionNotificationListener 侦听作业何时 BatchStatus.COMPLETED,然后使用 JdbcTemplate 检查结果。

使应用程序可执行

尽管批处理可以嵌入到 Web 应用程序和 WAR 文件中,但下面演示的更简单的方法是创建一个独立应用程序。您将所有内容打包到单个可执行 JAR 文件中,由一个普通的 Java main() 方法驱动。

Spring Initializr 为您创建了一个应用程序类。对于此简单示例,它无需进一步修改即可运行。下面的列表(来自 src/main/java/com/example/batchprocessing/BatchProcessingApplication.java)显示了应用程序类。

package com.example.batchprocessing;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchProcessingApplication {

  public static void main(String[] args) {
    System.exit(SpringApplication.exit(SpringApplication.run(BatchProcessingApplication.class, args)));
  }
}

@SpringBootApplication 是一个便捷注释,它添加了以下所有内容:

  • @Configuration:将类标记为应用程序上下文的 bean 定义的来源。

  • @EnableAutoConfiguration:告诉 Spring Boot 基于类路径设置、其他 bean 和各种属性设置开始添加 bean。例如,如果 spring-webmvc 位于类路径中,此注释会将应用程序标记为 Web 应用程序并激活关键行为,例如设置 DispatcherServlet

  • @ComponentScan:告诉 Spring 在 com/example 包中查找其他组件、配置和服务,使其能够找到控制器。

main() 方法使用 Spring Boot 的 SpringApplication.run() 方法启动应用程序。您是否注意到没有一行 XML?也没有 web.xml 文件。此 Web 应用程序是 100% 纯 Java,您不必处理任何管道或基础设施的配置。

请注意,SpringApplication.exit()System.exit() 确保在作业完成后 JVM 退出。有关详细信息,请参阅Spring Boot 参考文档中的应用程序退出部分

出于演示目的,这里有代码可以创建 JdbcTemplate、查询数据库并打印出批处理作业插入的人员姓名。

请注意,应用程序不使用 @EnableBatchProcessing 注释。以前,可以使用 @EnableBatchProcessing 来启用 Spring Boot 对 Spring Batch 的自动配置。从 Spring Boot v3.0 开始,不再需要此注释,应将其从想要使用 Spring Boot 自动配置的应用程序中删除。现在可以定义一个用 @EnableBatchProcessing 注释或扩展 Spring Batch 的 DefaultBatchConfiguration 的 bean 来告诉自动配置后退,允许应用程序完全控制 Spring Batch 的配置方式。

构建可执行 JAR

您可以使用 Gradle 或 Maven 从命令行运行应用程序。您还可以构建一个包含所有必要依赖项、类和资源的单个可执行 JAR 文件,然后运行该文件。构建可执行 jar 文件可以轻松地将服务作为应用程序在整个开发生命周期中、跨不同环境等进行交付、版本控制和部署。

如果您使用 Gradle,可以使用./gradlew bootRun运行应用程序。或者,您可以使用./gradlew build构建 JAR 文件,然后运行 JAR 文件,如下所示

java -jar build/libs/gs-batch-processing-0.1.0.jar

如果您使用 Maven,可以使用./mvnw spring-boot:run运行应用程序。或者,您可以使用./mvnw clean package构建 JAR 文件,然后运行 JAR 文件,如下所示

java -jar target/gs-batch-processing-0.1.0.jar
此处描述的步骤会创建一个可运行的 JAR 文件。您也可以构建一个传统的 WAR 文件

该作业会为每个被转换的人打印一行。作业运行后,您还可以查看数据库查询的输出。它应该类似于以下输出

Converting (Person[firstName=Jill, lastName=Doe]) into (Person[firstName=JILL, lastName=DOE])
Converting (Person[firstName=Joe, lastName=Doe]) into (Person[firstName=JOE, lastName=DOE])
Converting (Person[firstName=Justin, lastName=Doe]) into (Person[firstName=JUSTIN, lastName=DOE])
Converting (Person[firstName=Jane, lastName=Doe]) into (Person[firstName=JANE, lastName=DOE])
Converting (Person[firstName=John, lastName=Doe]) into (Person[firstName=JOHN, lastName=DOE])
Found <{Person[firstName=JILL, lastName=DOE]}> in the database.
Found <{Person[firstName=JOE, lastName=DOE]}> in the database.
Found <{Person[firstName=JUSTIN, lastName=DOE]}> in the database.
Found <{Person[firstName=JANE, lastName=DOE]}> in the database.
Found <{Person[firstName=JOHN, lastName=DOE]}> in the database.

摘要

恭喜!您构建了一个批处理作业,它可以从电子表格中提取数据,处理数据,并将其写入数据库。

另请参阅

以下指南可能也有帮助

想要编写新的指南或为现有指南做出贡献?请查看我们的贡献指南

所有指南均采用 ASLv2 许可证发布代码,并采用署名-非衍生作品创作共用许可证发布文本。

获取代码