FlowX Engine setup
This guide provides instructions on how to set up and configure the FlowX.AI Engine to meet specific requirements.
Infrastructure prerequisites
Before initiating the FlowX.AI Engine, ensure the following infrastructure components are properly installed and configured:
- Docker Engine: Version 17.06 or higher.
- Kafka: Version 2.8 or higher.
- Elasticsearch: Version 7.11.0 or higher.
- Database Instance: Properly configured and accessible.
Dependencies
The FlowX Engine interacts with various components critical for its operation:
- Database: Primary storage for the engine. See Database Configuration.
- Redis Server: Used for caching purposes. Refer to Redis Configuration.
- Kafka: Facilitates messaging and event-driven architecture. Details on Configuring Kafka.
For a microservices architecture, it’s common for services to manage their data via dedicated databases.
Example: Basic PostgreSQL Configuration
Configure PostgreSQL using the Helm chart as demonstrated below:
onboardingdb:
existingSecret: {{secretName}}
metrics:
enabled: true
service:
annotations:
prometheus.io/port: {{prometheus port}}
prometheus.io/scrape: "true"
type: ClusterIP
serviceMonitor:
additionalLabels:
release: prometheus-operator
enabled: true
interval: 30s
scrapeTimeout: 10s
persistence:
enabled: true
size: 1Gi
postgresqlDatabase: onboarding
postgresqlExtendedConf:
maxConnections: 200
sharedBuffers: 128MB
postgresqlUsername: postgres
resources:
limits:
cpu: 6000m
memory: 2048Mi
requests:
cpu: 200m
memory: 512Mi
Required External Services
- Redis Cluster: Essential for caching process definitions, compiled scripts, and Kafka responses.
- Kafka Cluster: Serves as the communication backbone with external plugins and integration.
Configuration Setup
FlowX.AI Engine utilizes environment variables for configuration. Below are the key environment variables you need to configure:
- Configuring authorization and access roles
- Configuring the data source
- Configuring Redis
- Configuring logging
- Configuring Kafka
- Configuring access roles for processes
Authorization & access roles
This section outlines the OAuth2 configuration settings for securing the Spring application, including resource server settings, security type, and access authorizations.
Resource server settings (OAuth2 configuration)
Old configuration from < v4.1 releases (will be deprecated in v4.5):
SECURITY_OAUTH2_BASE_SERVER_URL
: The base URL for the OAuth 2.0 Authorization Server, which is responsible for authentication and authorization for clients and users, it is used to authorize clients, as well as to issue and validate access tokens.SECURITY_OAUTH2_CLIENT_CLIENT_ID
: A unique identifier for a client application that is registered with the OAuth 2.0 Authorization Server, this is used to authenticate the client application when it attempts to access resources on behalf of a user.SECURITY_OAUTH2_CLIENT_CLIENT_SECRET
: Secret Key that is used to authenticate requests made by an authorization client.SECURITY_OAUTH2_REALM
: Security configuration env var in the Spring Security OAuth2 framework, it is used to specify the realm name used when authenticating with OAuth2 providers.
New configuration, starting from v4.1 release, available below.
Environment variable | Description | Default Value |
---|---|---|
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_INTROSPECTION_URI | URI for token introspection to validate opaque tokens | ${security.oauth2.base-server-url}/realms/${security.oauth2.realm}/protocol/openid-connect/token/introspect |
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_CLIENT_ID | Client ID for token introspection | ${security.oauth2.client.client-id} |
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_CLIENT_SECRET | Client secret for token introspection | ${security.oauth2.client.client-secret} |
Service account settings
This section contains the environment variables for configuring process engine service account.
Environment Variable | Description | Default Value |
---|---|---|
SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_ID | Client ID for the service account | flowx-${SPRING_APPLICATION_NAME}-sa |
SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_SECRET | Client secret for the service account |
More details about the necessary service account can be found here.
Security configuration
Environment variable | Description | Default Value |
---|---|---|
SECURITY_TYPE | Type of security mechanism used | oauth2 |
SECURITY_BASIC_ENABLED | Enable basic authentication | false |
SECURITY_PUBLIC_PATHS_0 | List of public paths that do not require authentication | /api/platform/components-versions |
SECURITY_PUBLIC_PATHS_1 | List of public paths that do not require authentication | /manage/actuator/health |
SECURITY_PATH_AUTHORIZATIONS_0_PATH | Defines a security path or endpoint pattern. It specifies that the security settings apply to all paths under the "/api/" path. The ** is a wildcard that means it includes all subpaths under "/api/**" . | "/api/**" |
SECURITY_PATH_AUTHORIZATIONS_0_ROLES_ALLOWED | Specifies the roles allowed for accessing the specified path. In this case, the roles allowed are empty (""). This might imply that access to the "/api/**" paths is open to all users or that no specific roles are required for authorization. | "ANY_AUTHENTICATED_USER" |
Configuring Kafka
Kafka handles all communication between the FlowX.AI Engine and external plugins and integrations. It is also used for notifying running process instances when certain events occur.
Kafka connection settings
Environment Variable | Description | Default Value |
---|---|---|
SPRING_KAFKA_BOOTSTRAP_SERVERS | Kafka bootstrap servers | localhost:9092 |
SPRING_KAFKA_SECURITY_PROTOCOL | Security protocol for Kafka | "PLAINTEXT" |
Kafka consumer retry settings
Environment Variable | Description | Default Value |
---|---|---|
KAFKA_AUTH_EXCEPTION_RETRY_INTERVAL | Interval between retries after AuthorizationException is thrown. | 10 |
Consumer groups & consumer threads configuration
Both a producer and a consumer must be configured:
Consumer groups & consumer threads
In Kafka a consumer group is a group of consumers that jointly consume and process messages from one or more Kafka topics. Each consumer group has a unique identifier called a group ID, which is used by Kafka to manage message consumption and distribution among the members of the group.
Thread numbers, on the other hand, refer to the number of threads that a consumer application uses to process messages from Kafka. By default, each consumer instance runs in a single thread, which can limit the throughput of message processing. Increasing the number of consumer threads can help to improve the parallelism and efficiency of message consumption, especially when dealing with high message volumes.
Both group IDs and thread numbers can be configured in Kafka to optimize the processing of messages according to specific requirements, such as message volume, message type, and processing latency.
The configuration related to consumers (group ids and thread numbers) can be configured separately for each message type as it follows:
Consumer group configuration
Environment Variable | Description | Default Value |
---|---|---|
KAFKA_CONSUMER_GROUP_ID_NOTIFY_ADVANCE | Group ID for notifying advance actions | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_NOTIFY_PARENT | Group ID for notifying when a subprocess is blocked | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_ADAPTERS | Group ID for messages related to adapters | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_RUN_ACTION | Group ID for running scheduled actions | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_ADVANCING | Group ID for messages indicating continuing advancement | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_START | Group ID for starting processes | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_START_FOR_EVENT | Group ID for starting processes for an event | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_EXPIRE | Group ID for expiring processes | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_OPERATIONS | Group ID for processing operations from Task Management plugin | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_BATCH_PROCESSING | Group ID for processing bulk operations from Task Management plugin | notif123-preview |
Consumer thread configuration
Environment Variable | Description | Default Value |
---|---|---|
KAFKA_CONSUMER_THREADS_NOTIFY_ADVANCE | Number of threads for notifying advance actions | 6 |
KAFKA_CONSUMER_THREADS_NOTIFY_PARENT | Number of threads for notifying when a subprocess is blocked | 6 |
KAFKA_CONSUMER_THREADS_ADAPTERS | Number of threads for processing messages related to adapters | 6 |
KAFKA_CONSUMER_THREADS_SCHEDULER_ADVANCING | Number of threads for continuing advancement | 6 |
KAFKA_CONSUMER_THREADS_SCHEDULER_RUN_ACTION | Number of threads for running scheduled actions | 6 |
KAFKA_CONSUMER_THREADS_PROCESS_START | Number of threads for starting processes | 6 |
KAFKA_CONSUMER_THREADS_PROCESS_START_FOR_EVENT | Number of threads for starting processes for an event | 2 |
KAFKA_CONSUMER_THREADS_PROCESS_EXPIRE | Number of threads for expiring processes | 6 |
KAFKA_CONSUMER_THREADS_PROCESS_OPERATIONS | Number of threads for processing operations from task management | 6 |
KAFKA_CONSUMER_THREADS_PROCESS_BATCH_PROCESSING | Number of threads for processing bulk operations from task management | 6 |
It is important to know that all the events that start with a configured pattern will be consumed by the Engine. This makes it possible to create a new integration and connect it to the engine without changing the configuration.
Configuring Kafka topics
The suggested topic pattern naming convention is the following:
topic:
naming:
package: "ai.flowx."
environment: "dev."
version: ".v1"
prefix: ${kafka.topic.naming.package}${kafka.topic.naming.environment}
suffix: ${kafka.topic.naming.version}
engineReceivePattern: engine.receive.
pattern: ${kafka.topic.naming.prefix}${kafka.topic.naming.engineReceivePattern}*
Environment Variable | Description | Default FlowX.AI value (can be overwritten) |
---|---|---|
KAFKA_TOPIC_PROCESS_NOTIFY_ADVANCE | Kafka topic used internally by the Engine for advancing processes | ai.flowx.dev.core.notify.advance.process.v1 |
KAFKA_TOPIC_PROCESS_NOTIFY_PARENT | Kafka topic used for sub-processes to notify the parent process | ai.flowx.dev.core.notify.parent.process.v1 |
KAFKA_TOPIC_PATTERN | The topic name pattern that the Engine listens on for incoming events | ai.flowx.dev.engine.receive.* |
KAFKA_TOPIC_LICENSE_OUT | The topic name used by the Engine to generate licensing-related details | ai.flowx.dev.core.trigger.save.license.v1 |
Topics related to the Task Management plugin
Default parameter (env var) | Description | Default FlowX.AI value (can be overwritten) |
---|---|---|
KAFKA_TOPIC_TASK_OUT | Kafka topic used for sending notifications to the plugin | ai.flowx.dev.plugin.tasks.trigger.save.task.v1 |
KAFKA_TOPIC_PROCESS_OPERATIONS_IN | Kafka topic used for receiving calls from the task management plugin with | ai.flowx.dev.core.trigger.operations.v1 |
information regarding operations performed | ||
KAFKA_TOPIC_PROCESS_OPERATIONS_BULK_IN | Kafka topic where operations can be performed in bulk, allowing multiple | ai.flowx.core.trigger.operations.bulk.v1 |
operations to be sent at once |
OPERATIONS_IN request example
{
"operationType": "UNASSIGN", //type of operation performed in Task Management plugin
"taskId": "some task id",
"processInstanceUuid": "1cff0b7d-966b-4b35-9e9b-63b1d6757ec6",
"swimlaneName": "Default",
"swimlaneId": "51ec1241-fe06-4576-9c84-31598c05c527",
"owner": {
"firstName": null,
"lastName": null,
"username": "service-account-flowx-process-engine-account",
"enabled": false
},
"author": "admin@flowx.ai"
}
BULK_IN request example
{
"operations": [
{
"operationType": "HOLD",
"taskId": "some task id",
"processInstanceUuid": "d3aabfd8-d041-4c62-892f-22d17923b223", // the id of the process instance
"swimlaneName": "Default", //name of the swimlane
"owner": null,
"author": "john.doe@flowx.ai",
},
{
"operationType": "HOLD",
"taskId": "some task id",
"processInstanceUuid": "d3aabfd8-d041-4c62-892f-22d17923b223",
"swimlaneName": "Default", //name of the swimlane
"owner": null,
"author": "john.doe@flowx.ai",
}
]
}
If you need to send additional keys on the response, attach them in the header, as in the following example, where we used requestID
key.
A response should be sent on a callbackTopic
if it is mentioned in the headers, as in the following example:
{"processInstanceId": ${processInstanceId}, "callbackTopic": "test.operations.out", "requestID":"1234567890"}
Task manager operations could be the following: assignment, unassignment, hold, unhold, terminate and it is matched with the ...operations.out
topic on the engine side. For more information check the Task Management plugin documentation:
Topics related to the scheduler extension
Environment variable | Description | Default FlowX.AI value (can be overwritten) |
---|---|---|
KAFKA_TOPIC_PROCESS_EXPIRE_IN | Topic name for requests to expire processes | ai.flowx.dev.core.trigger.expire.process.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_SET | Topic name used for scheduling process expiration | ai.flowx.dev.core.trigger.set.schedule.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_STOP | Topic name used for stopping process expiration | ai.flowx.dev.core.trigger.stop.schedule.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_RUN_ACTION | Topic name for requests to run scheduled actions | ai.flowx.dev.core.trigger.run.action.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_ADVANCE | Topic name for events related to advancing through a database sent by the scheduler | ai.flowx.dev.core.trigger.advance.process.v1 |
Using the scheduler
Topics related to Timer Events
Environment variable | Description | Default FlowX.AI value (can be overwritten) |
---|---|---|
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_SET | Used to communicate with Scheduler microservice | ai.flowx.dev.core.trigger.set.timer-event-schedule.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_STOP | Used to communicate with Scheduler microservice | ai.flowx.dev.core.trigger.stop.timer-event-schedule.v1 |
Topics related to the Search Data service
Environment variable | Description | Default FlowX.AI value (can be overwritten) |
---|---|---|
KAFKA_TOPIC_DATA_SEARCH_IN | The topic name that the Engine listens on for requests to search for processes | ai.flowx.dev.core.trigger.search.data.v1 |
KAFKA_TOPIC_DATA_SEARCH_OUT | The topic name used by the Engine to reply after finding a process | ai.flowx.dev.engine.receive.core.search.data.results.v1 |
Topics related to the Audit service
Environment variable | Description | Default FlowX.AI value (can be overwritten) |
---|---|---|
KAFKA_TOPIC_AUDIT_OUT | Topic key for sending audit logs | ai.flowx.dev.core.save.audit.v1 |
Topics related to ES indexing
Environment variable | Default FlowX.AI value (can be overwritten) |
---|---|
KAFKA_TOPIC_PROCESS_INDEX_OUT | ai.flowx.dev.core.index.process.v1 |
Processes that can be started by sending messages to a Kafka topic
Environment variable | Description | Default FlowX.AI value (can be overwritten) |
---|---|---|
KAFKA_TOPIC_PROCESS_START_IN | The Engine listens on this topic for requests to start a new process instance | ai.flowx.dev.core.trigger.start.process.v1 |
KAFKA_TOPIC_PROCESS_START_OUT | Used for sending out the reply after starting a new process instance | ai.flowx.dev.core.confirm.start.process.v1 |
Topics related to Message Events
Environment variable | Default FLOWX.AI value (can be overwritten) |
---|---|
KAFKA_TOPIC_PROCESS_EVENT_MESSAGE | ai.flowx.dev.core.message.event.process.v1 |
KAFKA_TOPIC_PROCESS_START_FOR_EVENT | ai.flowx.dev.core.trigger.start-for-event.process.v1 |
Topics related to Events-gateway microservice
Environment variable | Description | Default FlowX.AI value (can be overwritten) |
---|---|---|
KAFKA_TOPIC_EVENTSGATEWAY_OUT_MESSAGE | Outgoing messages from process-engine to events-gateway | ai.flowx.eventsgateway.engine.commands.message.v1 |
KAFKA_TOPIC_EVENTSGATEWAY_OUT_DISCONNECT | Disconnect commands from process-engine to events-gateway | ai.flowx.eventsgateway.engine.commands.disconnect.v1 |
KAFKA_TOPIC_EVENTSGATEWAY_OUT_CONNECT | Connect commands from process-engine to events-gateway | ai.flowx.eventsgateway.engine.commands.connect.v1 |
Configuring file upload size
The maximum file size allowed for uploads can be set by using the following environment variables:
Environment variable | Description | Default FlowX.AI value (can be overwritten) |
---|---|---|
SPRING_SERVLET_MULTIPART_MAX_FILE_SIZE | Maximum file size allowed for uploads | 50MB |
SPRING_SERVLET_MULTIPART_MAX_REQUEST_SIZE | Maximum request size allowed for uploads | 50MB |
Connecting the Advancing controller
To use advancing controller, the following env vars are needed for process-engine
to connect to Advancing Postgres DB:
Environment variable | Description |
---|---|
ADVANCING_DATASOURCE_JDBC_URL | Specifies the connection URL for a JDBC data source, including the server, port, database name, and other parameters |
ADVANCING_DATASOURCE_USERNAME | Used to authenticate the user’s access to the data source |
ADVANCING_DATASOURCE_PASSWORD | Sets the password for a data source connection |
Configuring the Advancing controller
Environment variable | Description | Default FlowX.AI value (can be overwritten) |
---|---|---|
ADVANCING_TYPE | Specifies the type of advancing mechanism to be used. The advancing can be done either through Kafka or through the database (parallel) | PARALLEL (possible values #enum: KAFKA, PARALLEL) |
ADVANCING_THREADS | Number of parallel threads to be used | 20 |
ADVANCING_PICKING_BATCH_SIZE | Number of tasks to pick in each batch | 10 |
ADVANCING_PICKING_PAUSE_MILLIS | Pause duration between picking batches, in milliseconds. After picking a batch of tasks, the system will wait for 100 milliseconds before picking the next batch. This can help in controlling the rate of task intake and processing | 100 |
ADVANCING_COOLDOWN_AFTER_SECONDS | Cooldown period after processing a batch, in seconds. The system will wait for 120 seconds after completing a batch before starting the next cycle. This can be useful for preventing system overload and managing resource usage | 120 |
ADVANCING_SCHEDULER_HEARTBEAT_CRONEXPRESSION | A cron expression that defines the schedule for the heartbeat. The scheduler’s heartbeat will trigger every 2 seconds. This frequent heartbeat can be used to ensure the system is functioning correctly and tasks are being processed as expected | "*/2 * * * * ?" |
Advancing controller setup
Configuring cleanup mechanism
This section contains environment variables that configure the scheduler’s behavior, including thread count, cron jobs for data partitioning, process cleanup, and master election.
Environment Variable | Description | Default Value | Possible Values |
---|---|---|---|
SCHEDULER_THREADS | Number of threads for the scheduler | 10 | Integer values (e.g., 10 , 20 ) |
SCHEDULER_PROCESS_CLEANUP_ENABLED | Activates the cron job for process cleanup | false | true , false |
SCHEDULER_PROCESS_CLEANUP_CRON_EXPRESSION | Cron expression for the process cleanup scheduler | 0 */5 0-5 * * ? -> every day during the night, every 5 minutes, at the start of the minute | Cron expression (e.g., 0 0 1 * * ? ) |
SCHEDULER_PROCESS_CLEANUP_BATCH_SIZE | Number of processes to be cleaned up in one batch | 1000 | Integer values (e.g., 100 , 1000 ) |
SCHEDULER_MASTER_ELECTION_CRON_EXPRESSION | Cron expression for the master election process | 30 */3 * * * ? -> master election every 3 minutes | Cron expression (e.g., 0 0/3 * * * ? ) |
Managing subprocesses expiration
This section details the environment variable that controls the expiration of subprocesses within a parent process. It determines whether subprocesses should terminate when the parent process expires or follow their own expiration settings.
Environment Variable | Description | Default Value | Possible Values |
---|---|---|---|
FLOWX_PROCESS_EXPIRE_SUBPROCESSES | Governs subprocess expiration in a parent process. When true, terminates all associated subprocesses upon parent process expiration. When false, subprocesses follow their individual expiration settings or persist indefinitely if not configured | true | true , false |
Configuring application management
The FlowX helm chart provides a management service with the necessary parameters to integrate with the Prometheus operator. However, this integration is disabled by default.
Old configuration from < v4.1 releases (will be deprecated in v4.5):
MANAGEMENT_METRICS_EXPORT_PROMETHEUS_ENABLED
: Enables or disables Prometheus metrics export.
New configuration, starting from v4.1 release, available below. Note that this setup is backwards compatible, it does not affect the configuration from v3.4.x. The configuration files will still work until v4.5 release.
To configure Prometheus metrics export for the FlowX.AI Engine, the following environment variable is required:
Environment Variable | Description | Default Value | Possible Values |
---|---|---|---|
MANAGEMENT_PROMETHEUS_METRICS_EXPORT_ENABLED | Enables or disables Prometheus metrics export. | false | true , false |
Was this page helpful?