FlowX Engine setup
This guide will provide instructions on how to set up and configure the FlowX Engine to meet your specific requirements.
Infrastructure prerequisites
The FlowX Engine requires the following components to be set up before it can be started:
- Docker engine - version 17.06 or higher
- Kafka - version 2.8 or higher
- Elasticsearch - version 7.11.0 or higher
- DB instance
Dependencies
For Microservices architecture, some Microservices holds their data individually using separate Databases.
A basic Postgres configuration can be set up using a helm values.yaml file as it follows:
-
helm values.yaml:
onboardingdb: existingSecret: {{secretName}} metrics: enabled: true service: annotations: prometheus.io/port: {{prometheus port}} prometheus.io/scrape: "true" type: ClusterIP serviceMonitor: additionalLabels: release: prometheus-operator enabled: true interval: 30s scrapeTimeout: 10s persistence: enabled: true size: 1Gi postgresqlDatabase: onboarding postgresqlExtendedConf: maxConnections: 200 sharedBuffers: 128MB postgresqlUsername: postgres resources: limits: cpu: 6000m memory: 2048Mi requests: cpu: 200m memory: 512Mi
Dependencies
- Redis server - a Redis cluster is required for the engine to cache process definitions, compiled scripts, and Kafka responses
- Kafka cluster - Kafka is the backbone of the engine and all plugins and integrations are accessed via the Kafka broker
- Additional dependencies - details about how to set up logging via Elasticsearch, monitoring, and tracing via Jaeger, can be found here
Configuration
- Datasource configuration
- Redis configuration
- Logging
- Configuring access roles for processes
- Kafka configuration
Authorization & access roles
SECURITY_TYPE
: Indicates that OAuth 2.0 is the chosen security type, default value:oauth2
.
security:
type: oauth2
SECURITY_PATHAUTHORIZATIONS_0_PATH
: Defines a security path or endpoint pattern. It specifies that the security settings apply to all paths under the “/api/” path. The**
is a wildcard that means it includes all subpaths under “/api/**“.SECURITY_PATHAUTHORIZATIONS_0_ROLESALLOWED
: Specifies the roles allowed for accessing the specified path. In this case, the roles allowed are empty (""). This might imply that access to the “/api/**” paths is open to all users or that no specific roles are required for authorization.
pathAuthorizations:
- path: "/api/**"
rolesAllowed: "ANY_AUTHENTICATED_USER"
SECURITY_OAUTH2_BASE_SERVER_URL
: The base URL for the OAuth 2.0 Authorization Server, which is responsible for authentication and authorization for clients and users, it is used to authorize clients, as well as to issue and validate access tokens.SECURITY_OAUTH2_CLIENT_CLIENT_ID
: A unique identifier for a client application that is registered with the OAuth 2.0 Authorization Server, this is used to authenticate the client application when it attempts to access resources on behalf of a user.SECURITY_OAUTH2_CLIENT_CLIENT_SECRET
: Secret Key that is used to authenticate requests made by an authorization client.SECURITY_OAUTH2_REALM
: Security configuration env var in the Spring Security OAuth2 framework, it is used to specify the realm name used when authenticating with OAuth2 providers.SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_ID
: The process engine requires a process engine service account to make direct calls to the Keycloak API. This setting specifies the service account that is essential so the of Start Catch Event node is possible. Ensure that you provide the correct client ID for this service account.SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_SECRET
: Along with the client ID, you must also specify the client secret associated with the service account for proper authentication.
oauth2:
base-server-url: http://localhost:8080/auth
realm: flowx
client:
client-id: flowx-platform-authorize
client-secret: somesecret
service-account:
admin:
client-id: flowx-process-engine-sa
client-secret: somesecret
More details about the necessary service account, here:
Process engine service account
Configuring Kafka
Kafka handles all communication between the FLOWX.AI Engine and external plugins and integrations. It is also used for notifying running process instances when certain events occur.
Both a producer and a consumer must be configured. The following Kafka-related configurations can be set by using environment variables:
-
SPRING_KAFKA_BOOTSTRAP_SERVERS
- the address of the Kafka server, it should be in the format “host:port” -
KAFKA_AUTH_EXCEPTION_RETRY_INTERVAL
- the interval between retries after AuthorizationException is thrown by Kafka consumer -
KAFKA_MESSAGE_MAX_BYTES
- this is the largest size of the message that can be received by the broker from a producer.
Consumer groups & consumer threads
In Kafka a consumer group is a group of consumers that jointly consume and process messages from one or more Kafka topics. Each consumer group has a unique identifier called a group ID, which is used by Kafka to manage message consumption and distribution among the members of the group.
Thread numbers, on the other hand, refer to the number of threads that a consumer application uses to process messages from Kafka. By default, each consumer instance runs in a single thread, which can limit the throughput of message processing. Increasing the number of consumer threads can help to improve the parallelism and efficiency of message consumption, especially when dealing with high message volumes.
Both group IDs and thread numbers can be configured in Kafka to optimize the processing of messages according to specific requirements, such as message volume, message type, and processing latency.
The configuration related to consumers (group ids and thread numbers) can be configured separately for each message type as it follows:
-
KAFKA_CONSUMER_GROUP_ID_NOTIFY_ADVANCE
- related to a Kafka consumer group that receives messages related to notifying advance actions, it is used to configure the group ID for this consumer group -
KAFKA_CONSUMER_GROUP_ID_NOTIFY_PARENT
- related to a Kafka consumer group that receives messages related to notifying when a subprocess is blocked and it sends a notification to the parent process, it is used to configure the group ID for this consumer group -
KAFKA_CONSUMER_GROUP_ID_ADAPTERS
- related to a Kafka consumer group that receives messages related to adapters, it is used to configure the group ID for this consumer group -
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_RUN_ACTION
- related to a Kafka consumer group that receives messages related to requests to run scheduled actions, it is used to configure the group ID for this consumer group -
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_ADVANCING
- related to a Kafka consumer group that receives messages related to messages sent by the scheduler when a timeout expires, indicating that the advancing should continue (parallel advancing), it is used to configure the group ID for this consumer group -
KAFKA_CONSUMER_GROUP_ID_MESSAGE_EVENTS
- -
KAFKA_CONSUMER_GROUP_ID_PROCESS_START
- related to a Kafka consumer group that receives messages related to starting processes, it is used to configure the group ID for this consumer group -
KAFKA_CONSUMER_GROUP_ID_PROCESS_START_FOR_EVENT
- -
KAFKA_CONSUMER_GROUP_ID_PROCESS_EXPIRE
- related to expiring processes, it is used to configure the group ID for this consumer group -
KAFKA_CONSUMER_GROUP_ID_PROCESS_OPERATIONS
- related to a Kafka consumer group that receives messages related to processing operations from task management, it is used to configure the group ID for this consumer group -
KAFKA_CONSUMER_GROUP_ID_PROCESS_BATCH_PROCESSING
- related to a Kafka consumer group that receives messages related to processing bulk operations from task management, it is used to configure the group ID for this consumer group -
KAFKA_CONSUMER_THREADS_NOTIFY_ADVANCE
- the number of threads used by a Kafka consumer application to notify the Kafka broker about the progress of -
KAFKA_CONSUMER_THREADS_NOTIFY_PARENT
- the number of threads used by a Kafka consumer application, related to a process when it is blocked -
KAFKA_CONSUMER_THREADS_ADAPTERS
- the number of threads used by a Kafka consumer application, related to adapters -
KAFKA_CONSUMER_THREADS_SCHEDULER_RUN_ACTION
- the number of threads used by a Kafka consumer application, related to requests to run scheduled actions -
KAFKA_CONSUMER_THREADS_SCHEDULER_ADVANCING
- the number of threads used by a Kafka consumer application, related to messages sent by the scheduler when a timeout expires, indicating that the advancing should continue (parallel advancing), it is used to configure the group ID for this consumer group -
KAFKA_CONSUMER_THREADS_PROCESS_START
- the number of threads used by a Kafka consumer application, related to starting processes -
KAFKA_CONSUMER_THREADS_PROCESS_START_FOR_EVENT
- -
KAFKA_CONSUMER_THREADS_PROCESS_EXPIRE
- the number of threads used by a Kafka consumer application, related to expiring processes -
KAFKA_CONSUMER_THREADS_PROCESS_OPERATIONS
- the number of threads used by a Kafka consumer application, related to processing operations from task management -
KAFKA_CONSUMER_THREADS_PROCESS_BATCH_PROCESSING
- the number of threads used by a Kafka consumer application, related to processing bulk operations from task management
Default parameter (env var) | Default FLOWX.AI value (can be overwritten) |
---|---|
KAFKA_CONSUMER_GROUP_ID_NOTIFY_ADVANCE | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_NOTIFY_PARENT | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_ADAPTERS | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_RUN_ACTION | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_ADVANCING | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_START | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_START_FOR_EVENT | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_EXPIRE | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_OPERATIONS | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_BATCH_PROCESSING | notif123-preview |
KAFKA_CONSUMER_THREADS_NOTIFY_ADVANCE | 6 |
KAFKA_CONSUMER_THREADS_NOTIFY_PARENT | 6 |
KAFKA_CONSUMER_THREADS_ADAPTERS | 6 |
KAFKA_CONSUMER_THREADS_SCHEDULER_ADVANCING | 6 |
KAFKA_CONSUMER_THREADS_SCHEDULER_RUN_ACTION | 6 |
KAFKA_CONSUMER_THREADS_PROCESS_START | 6 |
KAFKA_CONSUMER_THREADS_PROCESS_START_FOR_EVENT | 2 |
KAFKA_CONSUMER_THREADS_PROCESS_EXPIRE | 6 |
KAFKA_CONSUMER_THREADS_PROCESS_OPERATIONS | 6 |
KAFKA_CONSUMER_THREADS_PROCESS_BATCH_PROCESSING | 6 |
It is important to know that all the events that start with a configured pattern will be consumed by the engine. This makes it possible to create a new integration and connect it to the engine without changing the configuration of the engine.
The suggested topic pattern naming convention is the following:
topic:
naming:
package: "ai.flowx."
environment: "dev."
version: ".v1"
prefix: ${kafka.topic.naming.package}${kafka.topic.naming.environment}
suffix: ${kafka.topic.naming.version}
engineReceivePattern: engine.receive.
pattern: ${kafka.topic.naming.prefix}${kafka.topic.naming.engineReceivePattern}*
-
KAFKA_TOPIC_PROCESS_NOTIFY_ADVANCE
- Kafka topic used internally by the engine -
KAFKA_TOPIC_PROCESS_NOTIFY_PARENT
- topic used for sub-processes to notify parent process when finished -
KAFKA_TOPIC_PATTERN
- the topic name pattern that the Engine listens on for incoming Kafka events -
KAFKA_TOPIC_LICENSE_OUT
- the topic name used by the Engine to generate licensing-related details
Default parameter (env var) | Default FLOWX.AI value (can be overwritten) |
---|---|
KAFKA_TOPIC_PROCESS_NOTIFY_ADVANCE | ai.flowx.dev.core.notify.advance.process.v1 |
KAFKA_TOPIC_PROCESS_NOTIFY_PARENT | ai.flowx.dev.core.notify.parent.process.v1 |
KAFKA_TOPIC_PATTERN | ai.flowx.dev.engine.receive.* |
KAFKA_TOPIC_LICENSE_OUT | ai.flowx.dev.core.trigger.save.license.v1 |
Topics related to the Task Management plugin
-
KAFKA_TOPIC_TASK_OUT
- used for sending notifications to the plugin -
KAFKA_TOPIC_PROCESS_OPERATIONS_IN
- used for receiving calls from the task management plugin with information regarding operations performed
Request example
{
"operationType": "UNASSIGN", //type of operation performed in Task Management plugin
"taskId": "some task id",
"processInstanceUuid": "1cff0b7d-966b-4b35-9e9b-63b1d6757ec6",
"swimlaneName": "Default",
"swimlaneId": "51ec1241-fe06-4576-9c84-31598c05c527",
"owner": {
"firstName": null,
"lastName": null,
"username": "service-account-flowx-process-engine-account",
"enabled": false
},
"author": "admin@flowx.ai"
}
KAFKA_TOPIC_PROCESS_OPERATIONS_BULK_IN
- on this topic, you can perform operations from the “KAFKA_TOPIC_PROCESS_OPERATIONS_IN” topic and send them as an array, allowing you to send multiple operations at once
Request example
{
"operations": [
{
"operationType": "HOLD",
"taskId": "some task id",
"processInstanceUuid": "d3aabfd8-d041-4c62-892f-22d17923b223", // the id of the process instance
"swimlaneName": "Default", //name of the swimlane
"owner": null,
"author": "john.doe@flowx.ai",
},
{
"operationType": "HOLD",
"taskId": "some task id",
"processInstanceUuid": "d3aabfd8-d041-4c62-892f-22d17923b223",
"swimlaneName": "Default", //name of the swimlane
"owner": null,
"author": "john.doe@flowx.ai",
}
]
}
If you need to send additional keys on the response, attach them in the header, as in the following example, where we used requestID
key.
A response should be sent on a callbackTopic
if it is mentioned in the headers, as in the following example:
{"processInstanceId": ${processInstanceId}, "callbackTopic": "test.operations.out", "requestID":"1234567890"}
Default parameter (env var) | Default FLOWX.AI value (can be overwritten) |
---|---|
KAFKA_TOPIC_TASK_OUT | ai.flowx.dev.plugin.tasks.trigger.save.task.v1 |
KAFKA_TOPIC_PROCESS_OPERATIONS_IN | ai.flowx.dev.core.trigger.operations.v1 |
KAFKA_TOPIC_PROCESS_OPERATIONS_BULK_IN | ai.flowx.core.trigger.operations.bulk.v1 |
Task manager operations could be the following: assignment, unassignment, hold, unhold, terminate and it is matched with the ...operations.out
topic on the engine side. For more information check the Task Management plugin documentation:
Topics related to the scheduler extension
-
KAFKA_TOPIC_PROCESS_EXPIRE_IN
- the topic name that the Engine listens on for requests to expire processes -
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_SET
- the topic name used by the Engine to schedule a process expiration -
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_STOP
- the topic name used by the Engine to stop a process expiration -
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_RUN_ACTION
- the topic name that the Engine listens on for requests to run scheduled actions -
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_ADVANCE
- the topic name where Engine listens for events, where scheduler sends messages related to advancing through a database
Default parameter (env var) | Default FLOWX.AI value (can be overwritten) |
---|---|
KAFKA_TOPIC_PROCESS_EXPIRE_IN | ai.flowx.dev.core.trigger.expire.process.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_SET | ai.flowx.dev.core.trigger.set.schedule.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_STOP | ai.flowx.dev.core.trigger.stop.schedule.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_RUN_ACTION | ai.flowx.dev.core.trigger.run.action.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_ADVANCE | ai.flowx.dev.core.trigger.advance.process.v1 |
Using the scheduler
Topics related to Timer Events
Default parameter (env var) | Default FLOWX.AI value (can be overwritten) |
---|---|
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_SET | ai.flowx.dev.core.trigger.set.timer-event-schedule.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_STOP | ai.flowx.dev.core.trigger.stop.timer-event-schedule.v1 |
-
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_SET
- used to communicate with Scheduler microservice -
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_STOP
- used to communicate with Scheduler microservice
Topics related to the Search Data service
-
KAFKA_TOPIC_DATA_SEARCH_IN
- the topic name that the Engine listens on for requests to search for processes -
KAFKA_TOPIC_DATA_SEARCH_OUT
- the topic name used by the Engine to reply after finding a process
Default parameter (env var) | Default FLOWX.AI value (can be overwritten) |
---|---|
KAFKA_TOPIC_DATA_SEARCH_IN | ai.flowx.dev.core.trigger.search.data.v1 |
KAFKA_TOPIC_DATA_SEARCH_OUT | ai.flowx.dev.engine.receive.core.search.data.results.v1 |
Topics related to the Audit service
KAFKA_TOPIC_AUDIT_OUT
- topic key for sending audit logs
Default parameter (env var) | Default FLOWX.AI value (can be overwritten) |
---|---|
KAFKA_TOPIC_AUDIT_OUT | ai.flowx.dev.core.save.audit.v1 |
Topics related to ES indexing
Default parameter (env var) | Default FLOWX.AI value (can be overwritten) |
---|---|
KAFKA_TOPIC_PROCESS_INDEX_OUT | ai.flowx.dev.core.index.process.v1 |
Processes that can be started by sending messages to a Kafka topic
-
KAFKA_TOPIC_PROCESS_START_IN
- the Engine listens on this topic for requests to start a new process instance -
KAFKA_TOPIC_PROCESS_START_OUT
- used for sending out the reply after starting a new process instance
Default parameter (env var) | Default FLOWX.AI value (can be overwritten) |
---|---|
KAFKA_TOPIC_PROCESS_START_IN | ai.flowx.dev.core.trigger.start.process.v1 |
KAFKA_TOPIC_PROCESS_START_OUT | ai.flowx.dev.core.confirm.start.process.v1 |
Topics related to Message Events
Default parameter (env var) | Default FLOWX.AI value (can be overwritten) |
---|---|
KAFKA_TOPIC_PROCESS_EVENT_MESSAGE | ai.flowx.dev.core.message.event.process.v1 |
KAFKA_TOPIC_PROCESS_START_FOR_EVENT | ai.flowx.dev.core.trigger.start-for-event.process.v1 |
Configuring events-gateway
-
KAFKA_TOPIC_EVENTSGATEWAY_OUT_MESSAGE
- outgoing messages from process-engine to events-gateway -
KAFKA_TOPIC_EVENTSGATEWAY_OUT_DISCONNECT
- disconnect commands from process-engine to events-gateway -
KAFKA_TOPIC_EVENTSGATEWAY_OUT_CONNECT
- connect commands from process-engine to events-gateway
Topic Name | Default FLOWX.AI value (can be overwritten) |
---|---|
KAFKA_TOPIC_EVENTSGATEWAY_OUT_MESSAGE | ai.flowx.eventsgateway.engine.commands.message.v1 |
KAFKA_TOPIC_EVENTSGATEWAY_OUT_DISCONNECT | ai.flowx.eventsgateway.engine.commands.disconnect.v1 |
KAFKA_TOPIC_EVENTSGATEWAY_OUT_CONNECT | ai.flowx.eventsgateway.engine.commands.connect.v1 |
Configuring file upload size
The maximum file size allowed for uploads can be set by using the following environment variables:
-
SPRING_SERVLET_MULTIPART_MAX_FILE_SIZE
- maximum file size allowed for uploads -
SPRING_SERVLET_MULTIPART_MAX_REQUEST_SIZE
- maximum request size allowed for uploads
Configuring Advancing controller
To use advancing controller, the following env vars are needed for process-engine
to connect to Advancing Postgres DB:
-
ADVANCING_DATASOURCE_JDBC_URL
- environment variable used to configure a JDBC (Java database connectivity) data source, it specifies the connection URL for a particular database, including the server, port, database name, and any other connection parameters necessary -
ADVANCING_DATASOURCE_USERNAME
- environment variable used to authenticate the user access to the data source -
ADVANCING_DATASOURCE_PASSWORD
- environment variable used to set the password for a data source connection
Advancing controller setup
Configuring cleanup mechanism
Below you can find a configuration for cleaning up expired processes.
scheduler:
threads: 10
processCleanup:
enabled: false
cronExpression: 0 */5 0-5 * * ? #every day during the night, every 5 minutes, at the start of the minute.
batchSize: 1000
masterElection:
cronExpression: 30 */3 * * * ? #master election every 3 minutes
- processCleanup: A configuration for cleaning up processes.
- enabled: Specifies whether this feature is turned on or off.
- cronExpression: Is a schedule expression that determines when the cleanup process runs. In this case, it runs every day during the night (between 12:00 AM and 5:59 AM) and every 5 minutes, at the start of the minute.
- *batchSize: Specifies the number of processes to be cleaned up in one batch.
- masterElection: A configuration for electing a master.
Managing Subprocesses Expiration
The FLOWX_PROCESS_EXPIRE_SUBPROCESSES
environment variable governs subprocess expiration in a parent process. When set to true, the parent process’s expiration prompts simultaneous termination of all associated subprocesses. Conversely, when set to false, each subprocess follows its individual expiration configuration or persists indefinitely if not configured.
process:
expire-subprocesses: true
FLOWX_PROCESS_EXPIRE_SUBPROCESS
- default:true
Was this page helpful?