|
这个版本仍在开发中,尚未考虑稳定。请使用最新的稳定版本:spring-cloud-task 5.0.1! |
Spring Cloud Stream 集成
一个单独的任务本身是有用的,但将其集成到更大的生态系统中可以让其为更复杂的处理和编排变得有用。本节 涵盖了 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。
发起一个 Spring Cloud Stream 任务
你可以从一个流中启动任务。要做到这一点,创建一个监听消息负载中包含一个 TaskLaunchRequest 的 sink。其中 TaskLaunchRequest 包含的内容为:
-
uri: 为要执行的任务 artifact。 -
applicationName: 与任务关联的名称。如果没有设置applicationName,TaskLaunchRequest会生成一个由以下内容组成的任务名称:Task-<UUID>。 -
commandLineArguments: 一个包含该任务命令行参数的列表。 -
environmentProperties: 一个包含任务将使用的环境变量的映射。 -
deploymentProperties: 一个包含部署器用于部署任务的属性的映射。
| 如果负载的类型不同,接收器会抛出异常。 |
例如,可以创建一个流,该流包含一个处理器,它从
HTTP 源接收数据,并创建一个包含 TaskLaunchRequest 的 GenericMessage,并将消息发送到其输出通道。任务接收器然后会从其输入通道接收消息,并随后启动任务。
要创建一个taskSink,你只需要创建一个包含EnableTaskLauncher注解的Spring Boot应用程序,如下例所示:
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
The samples
module of the Spring Cloud Task project contains a 样本 Sink and Processor. To install
these samples into your local maven repository, run a maven build from the
spring-cloud-task-samples directory with the skipInstall property set to false, as
shown in the following example:
mvn clean install
maven.remoteRepositories.springRepo.url 属性必须设置为远程仓库的位置,该远程仓库包含 Spring Boot Uber-jar。如果未设置,则没有远程仓库,只依赖于本地仓库。 |
Spring Cloud 数据流
To create a 流 in Spring Cloud Data Flow,you must first register the Task Sink Application we created. In the following example, we are registering the Processor and Sink sample applications by using the Spring Cloud Data Flow shell:
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下示例显示了如何在 Spring Cloud Data Flow shell 中创建一个流:
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
Spring Cloud 任务 事件
Spring Cloud Task 提供了通过 Spring Cloud Stream 频道在任务运行时发送事件的能力。使用任务监听器将 TaskExecution 发布到名为 task-events 的消息频道。该功能会自动注入到任何具有 spring-cloud-stream、spring-cloud-stream-<binder>,并且在类路径上定义了任务的 task 中。
要禁用事件发射监听器,请将spring.cloud.task.events.enabled
属性设置为false。 |
在适当配置了类路径的情况下,以下任务会在task-events通道上发出TaskExecution作为事件(在任务开始和结束时都会发出):
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
| 一个绑定器实现也需要包含在类路径中。 |
| 一个示例任务事件应用程序可以在 Spring Cloud Task 项目中的 samples 模块找到, 这里。 |
Spring 批处理 事件
当通过任务执行一个 Spring Batch 作业时,Spring Cloud Task 可以根据 Spring Batch 中可用的监听器配置为基于信息性消息发出。具体而言,以下 Spring Batch 监听器会自动配置到每个作业中,并在通过 Spring Cloud Task 运行时,根据运行情况在关联的 Spring Cloud Stream 通道上发送消息:
-
JobExecutionListener监听job-execution-events -
StepExecutionListener监听step-execution-events -
ChunkListener监听chunk-events -
ItemReadListener监听item-read-events -
ItemProcessListener监听item-process-events -
ItemWriteListener监听item-write-events -
SkipListener监听skip-events
这些 listeners 是 autoconfigured 到任何 AbstractJob 当适当的 beans(一个 Job 和一个 TaskLifecycleListener)存在于上下文时。监听这些事件的配置方式与绑定到任何其他 Spring Cloud Stream 通道的方式相同。我们的任务(运行批处理作业的那个任务)充当一个 Source,而监听应用程序则充当 Processor 或 Sink。
一个示例是让一个应用程序监听作业的启动和停止的job-execution-events通道。要配置监听应用程序,您可以将输入配置为job-execution-events,如下所示:
spring.cloud.stream.bindings.input.destination=job-execution-events
| 一个绑定器实现也需要包含在类路径中。 |
| 一个示例批处理事件应用程序可以在 Spring Cloud Task 项目中的 samples 模块找到, 在这里。 |
发送批处理事件到不同频道
Spring Cloud Task 提供的批处理事件选项之一是能够更改特定监听器发送消息的通道。要做到这一点,请使用以下配置:
spring.cloud.stream.bindings.<the channel>.destination=<new destination>。例如,
如果 StepExecutionListener 需要将其消息发送到另一个名为
my-step-execution-events 的通道而不是默认的 step-execution-events,可以添加以下配置:
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
禁用批处理事件
禁用所有批处理事件的侦听器功能,请使用以下 配置:
spring.cloud.task.batch.events.enabled=false
禁用特定批处理事件,请使用以下配置:
spring.cloud.task.batch.events.<batch event listener>.enabled=false:
以下列表显示了您可以禁用的各个监听器:
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
发出批处理事件的订单
默认情况下,批处理事件的值为Ordered.LOWEST_PRECEDENCE。要更改此值(例如,设置为5),请使用以下配置:
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5