Using the scheduler
After deploying the scheduler service in your infrastructure, you can start using it to schedule messages that you need to process at a later time.
One such example would be to use the scheduler service to expire processes that were started but haven't been finished.
First you need to check the configured topics match the ones configured in the engine.
For example the engine topics KAFKA_TOPIC_SCHEDULE_MESSAGE and KAFKA_TOPIC_STOP_SCHEDULED_MESSAGE should be the same with the ones configured in the scheduler ( KAFKA_SCHEDULE_MESSAGE_IN_TOPIC_NAME and KAFKA_STOP_SCHEDULED_MESSAGE_IN_TOPIC_NAME )
When a process is scheduled to expire, the engine sends the following message to the scheduler service (on the topic KAFKA_SCHEDULE_MESSAGE_IN_TOPIC_NAME):
1
{
2
"applicationName": "onboarding",
3
"applicationId": "04f82408-ee66-4c68-8162-b693b06bba00",
4
"payload": {
5
"scheduledEventType": "EXPIRE_PROCESS",
6
"processInstanceUuid": "04f82408-ee66-4c68-8162-b693b06bba00"
7
},
8
"scheduledTime": 1621412209.353327,
9
"responseTopicName": "ai.flowx.process.expire.staging"
10
}
Copied!
The scheduled time should be defined as java.time.Instant.
At the scheduled time, the payload will be sent back to the response topic defined in the message, like so:
1
{
2
"scheduledEventType": "EXPIRE_PROCESS",
3
"processInstanceUuid": "04f82408-ee66-4c68-8162-b693b06bba00"
4
}
Copied!
If you don't need the scheduled message anymore, you can discard it by sending the following message ( on the topic KAFKA_STOP_SCHEDULED_MESSAGE_IN_TOPIC_NAME)
1
{
2
"applicationName": "onboarding",
3
"applicationId": "04f82408-ee66-4c68-8162-b693b06bba00"
4
}
Copied!
These fields, applicationName and applicationId are used to uniquely identify a scheduled message.
Last modified 8mo ago
Copy link