这个版本仍在开发中,尚未考虑稳定。请使用最新的稳定版本:spring-cloud-task 5.0.1spring-doc.cadn.net.cn

批处理

本部分更详细地介绍了 Spring Cloud Task 与 Spring Batch 的集成。包括将作业执行与执行该作业的任务之间的关联跟踪,以及通过 Spring Cloud Deployer 远程分区均在本部分中介绍。spring-doc.cadn.net.cn

关联执行任务与其执行的任务

Spring Boot 提供了在 Spring Boot Uber-jar 中执行批处理作业的设施。 Spring Boot 对此功能的支持让开发者可以在同一执行中运行多个批处理作业。 Spring Cloud Task 提供了将作业的执行(即作业执行)与任务的执行关联起来的能力,从而可以将两者追溯到同一任务。spring-doc.cadn.net.cn

Spring Cloud Task 通过使用 TaskBatchExecutionListener 来实现此功能。 默认情况下, 此监听器会在任何同时具有配置了 Spring Batch 作业(通过在上下文中定义了类型为 Job 的 bean)且在类路径上有 spring-cloud-task-batch jar 的上下文中自动配置。该监听器会被注入到所有符合条件的作业中。spring-doc.cadn.net.cn

覆盖任务批处理执行监听器

为了防止侦听器被注入到当前上下文内的任何批处理作业中,您可以使用标准的 Spring Boot 机制来禁用自动配置。spring-doc.cadn.net.cn

仅在特定作业的上下文中将监听器注入到特定作业中,可重写 batchTaskExecutionListenerBeanPostProcessor 并提供作业bean ID列表,如下例所示:spring-doc.cadn.net.cn

public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
	TaskBatchExecutionListenerBeanPostProcessor postProcessor =
		new TaskBatchExecutionListenerBeanPostProcessor();

	postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));

	return postProcessor;
}
您可以在Spring Cloud Task项目samples模块中找到一个示例批处理应用程序, 在这里

远程分区

Spring Cloud Deployer 提供了在 大多数云基础设施上启动基于 Spring Boot 的应用程序的设施。The 0 和 The 1 将工作步骤执行的启动委托给 Spring Cloud Deployer。spring-doc.cadn.net.cn

要配置 DeployerStepExecutionHandler,必须提供一个表示要执行的 Spring Boot Uber-jar 的 Resource,以及一个 TaskLauncherHandler。 你还可以配置任何环境属性、同时执行的最大工作者数量、轮询结果的间隔(默认为 10 秒),以及超时(默认为 -1 或无限期)。以下示例展示了如何配置这个 PartitionHandlerspring-doc.cadn.net.cn

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
		JobExplorer jobExplorer) throws Exception {

	MavenProperties mavenProperties = new MavenProperties();
	mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
		new MavenProperties.RemoteRepository(repository))));

 	Resource resource =
		MavenResource.parse(String.format("%s:%s:%s",
				"io.spring.cloud",
				"partitioned-batch-job",
				"1.1.0.RELEASE"), mavenProperties);

	DeployerPartitionHandler partitionHandler =
		new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");

	List<String> commandLineArgs = new ArrayList<>(3);
	commandLineArgs.add("--spring.profiles.active=worker");
	commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
	commandLineArgs.add("--spring.batch.initializer.enabled=false");

	partitionHandler.setCommandLineArgsProvider(
		new PassThroughCommandLineArgsProvider(commandLineArgs));
	partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
	partitionHandler.setMaxWorkers(2);
	partitionHandler.setApplicationName("PartitionedBatchJobTask");

	return partitionHandler;
}
当将环境变量传递给分区时,每个分区可能位于不同的机器上,且环境设置不同。 因此,您应该只传递所需的环境变量。

注意在上面的例子中,我们设置了工作者的最大数量为 2。 设置工作者的最大数量确定了在一次运行中应同时运行的分区的最大数量。spring-doc.cadn.net.cn

待执行的 Resource 应该是一个带有配置的 Spring Boot Uber-jar,其中 DeployerStepExecutionHandler 被配置为在当前上下文中作为 CommandLineRunner。 前述示例中列举的仓库应该是包含 Spring Boot Uber-jar 的远程仓库。管理器和工作者都应该能够访问作为作业仓库和任务仓库所使用的相同数据存储。一旦底层基础设施启动了 Spring Boot jar 并且 Spring Boot 启动了 DeployerStepExecutionHandler,步骤处理程序就会执行所请求的 Step。以下示例展示了如何配置 DeployerStepExecutionHandler:spring-doc.cadn.net.cn

@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
	DeployerStepExecutionHandler handler =
		new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);

	return handler;
}
您可以在 Spring Cloud Task 项目中的 samples 模块找到一个远程分区应用程序的示例, 在这里

异步启动远程批处理分区

默认情况下,批处理分区是按顺序启动的。然而,在某些情况下这可能会影响性能,因为每个启动都会阻塞,直到资源(例如:在 Kubernetes 中为 Pod 进行配置)被提供。 在这些情况下,您可以将 ThreadPoolTaskExecutor 提供给 DeployerPartitionHandler。这将根据 ThreadPoolTaskExecutor 的配置启动远程批处理分区。 例如:spring-doc.cadn.net.cn

	@Bean
	public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(4);
		executor.setThreadNamePrefix("default_task_executor_thread");
		executor.setWaitForTasksToCompleteOnShutdown(true);
		executor.initialize();
		return executor;
	}

	@Bean
	public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
		TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
		Resource resource = this.resourceLoader
			.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");

		DeployerPartitionHandler partitionHandler =
			new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
				"workerStep", taskRepository, executor);
	...
	}
我们需要关闭上下文,因为使用 ThreadPoolTaskExecutor 会使线程保持活动状态,从而导致应用程序不会终止。为了适当关闭应用程序,我们需要将 spring.cloud.task.closecontextEnabled 属性设置为 true

关于在Kubernetes平台开发分批分区应用程序的说明

  • 在部署分区应用程序到 Kubernetes 平台时,必须使用以下依赖项:Spring Cloud Kubernetes Deployerspring-doc.cadn.net.cn

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId>
    </dependency>
  • 该任务应用程序的应用程序名称及其分区需要遵循 以下正则表达式模式: [a-z0-9]([-a-z0-9]*[a-z0-9])。 否则,会抛出异常。spring-doc.cadn.net.cn

批量信息提示消息

Spring Cloud Task 提供了批处理作业发送信息性消息的能力。 The “Spring Batch 事件”部分对此功能有详细说明。spring-doc.cadn.net.cn

批处理作业退出代码

正如之前讨论的,Spring Cloud Task应用程序支持记录任务执行的退出码。然而,在运行嵌入的Spring Batch作业时,无论Batch作业执行如何完成,当使用默认的Batch/Boot行为时,任务的结果总是零。请记住,任务就是一个boot应用程序,其返回的退出码与boot应用程序相同。 要覆盖这种行为,允许任务在Batch作业返回 BatchStatusFAILED 时返回除零以外的退出码,将 spring.cloud.task.batch.fail-on-job-failure 设置为 true。然后退出码可以是1(默认)或基于 指定的 ExitCodeGeneratorspring-doc.cadn.net.cn

此功能使用一个新的ApplicationRunner,它替代了由Spring Boot提供的那个。默认情况下,它配置为相同的顺序。但是,如果你想自定义ApplicationRunner的运行顺序,可以通过设置spring.cloud.task.batch.applicationRunnerOrder属性来设置。要使你的任务根据批处理作业执行结果返回退出代码,你需要编写自己的CommandLineRunnerspring-doc.cadn.net.cn