FlowX Engine setup
This guide provides instructions on how to set up and configure the FlowX.AI Engine.
Infrastructure prerequisites
Before installing the FlowX.AI Engine, verify that the following infrastructure components are installed and configured:
- Kafka: Version 2.8 or higher
- Elasticsearch: Version 7.11.0 or higher
- PostgreSQL: Version 13 or higher for storing application data
- MongoDB: Version 4.4 or higher for managing runtime builds
Dependencies
The FlowX Engine requires the following components:
- Database: Primary storage for the engine
- Redis Server: Used for caching. See Redis Configuration
- Kafka: Handles messaging and event-driven architecture. See Configuring Kafka
For a microservices architecture, services typically manage their data via dedicated databases.
Required external services
- Redis Cluster: Caches process definitions, compiled scripts, and Kafka responses
- Kafka Cluster: Enables communication with external plugins and integrations
Configuration setup
FlowX.AI Engine uses environment variables for configuration. This section covers key configuration areas:
- Database configuration
- Script engine configuration
- Authorization and access roles
- Kafka configuration
- Advancing controller configuration
- Cleanup mechanism configuration
- File upload configuration
- Application management
- RBAC configuration
Database configuration
PostgreSQL
Environment variable | Description |
---|---|
SPRING_DATASOURCE_URL | Database URL for PostgreSQL |
SPRING_DATASOURCE_USERNAME | Username for PostgreSQL |
SPRING_DATASOURCE_PASSWORD | Password for PostgreSQL |
SPRING_DATASOURCE_DRIVER_CLASS_NAME | Driver class for PostgreSQL |
MongoDB configuration
Configure connection to the Runtime MongoDB instance:
Environment variable | Description | Default value |
---|---|---|
SPRING_DATA_MONGODB_RUNTIME_URI | URI for connecting to MongoDB | Format: mongodb://${RUNTIME_DB_USERNAME}:${DB_PASSWORD}@<host1>,<host2>,<arbiter-host>:<port>/${DB_NAME}?retryWrites=false |
RUNTIME_DB_USERNAME | MongoDB username | app-runtime |
DB_NAME | MongoDB database name | app-runtime |
Configuring script engine
FlowX.AI Engine supports multiple scripting languages for business rules and actions.
You must also enable these environment variables on the AI Developer agent, if you have it set up.
Python runtime configuration
By default, FlowX.AI 4.7.1 uses Python 2.7 (Jython) as the Python runtime. To enable Python 3 via GraalPy with its 3x performance improvements, you must explicitly set the feature toggle.
Environment variable | Description | Default value | Possible values |
---|---|---|---|
FLOWX_SCRIPTENGINE_USEGRAALVM | Determines which Python runtime to use | false | true , false |
Python 2.7 support will be deprecated in FlowX.AI 5.0. We recommend migrating your Python scripts to Python 3 to take advantage of improved performance and modern language features.
GraalVM cache configuration
When using GraalVM (FLOWX_SCRIPTENGINE_USEGRAALVM=true
), ensure the engine has proper access to a cache directory within the container. By default, this is configured in the /tmp
directory.
For environments with filesystem restrictions or custom configurations, you need to properly configure the GraalVM cache.
There are two methods to configure the GraalVM cache location:
Option 1: Using Java options (Preferred)
Add the following Java option to your deployment configuration:
This option is set by default in the standard Docker image but might need to be included if you’re overriding the Java options.
Option 2: Using environment variables
Alternatively, set the following environment variable:
If you encounter errors related to Python script execution when using GraalVM, verify that the cache directory is properly configured and accessible.
For more details about supported scripting languages and their capabilities, see the Supported scripts documentation.
Authorization & access roles
This section covers OAuth2 configuration settings for securing the Spring application.
Resource server settings (OAuth2 configuration)
The following configuration from versions before 4.1 will be deprecated in version 5.0:
SECURITY_OAUTH2_BASE_SERVER_URL
: Base URL for the OAuth 2.0 Authorization ServerSECURITY_OAUTH2_CLIENT_CLIENT_ID
: Client identifier registered with the OAuth 2.0 Authorization ServerSECURITY_OAUTH2_CLIENT_CLIENT_SECRET
: Secret key for authenticating client requestsSECURITY_OAUTH2_REALM
: Realm name used for OAuth2 authentication
Starting from version 4.1, use the following configuration instead. This setup is backwards compatible until version 4.5.
Environment variable | Description | Default value |
---|---|---|
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_INTROSPECTION_URI | URI for token introspection | ${security.oauth2.base-server-url}/realms/${security.oauth2.realm}/protocol/openid-connect/token/introspect |
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_CLIENT_ID | Client ID for token introspection | ${security.oauth2.client.client-id} |
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_CLIENT_SECRET | Client secret for token introspection | ${security.oauth2.client.client-secret} |
Service account settings
Configure the process engine service account:
Environment variable | Description | Default value |
---|---|---|
SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_ID | Client ID for the service account | flowx-${SPRING_APPLICATION_NAME}-sa |
SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_SECRET | Client secret for the service account |
For more information about the necessary service account, see Process Engine Service Account.
Security configuration
Environment variable | Description | Default value |
---|---|---|
SECURITY_TYPE | Type of security mechanism | oauth2 |
SECURITY_BASIC_ENABLED | Enable basic authentication | false |
SECURITY_PUBLIC_PATHS_0 | Public path not requiring authentication | /api/platform/components-versions |
SECURITY_PUBLIC_PATHS_1 | Public path not requiring authentication | /manage/actuator/health |
SECURITY_PATH_AUTHORIZATIONS_0_PATH | Security path pattern | "/api/**" |
SECURITY_PATH_AUTHORIZATIONS_0_ROLES_ALLOWED | Roles allowed for path access | "ANY_AUTHENTICATED_USER" |
Configuring Kafka
Kafka handles all communication between the FlowX.AI Engine, external plugins, and integrations. It also notifies running process instances when certain events occur.
Kafka connection settings
Environment variable | Description | Default value |
---|---|---|
SPRING_KAFKA_BOOTSTRAP_SERVERS | Kafka bootstrap servers | localhost:9092 |
SPRING_KAFKA_SECURITY_PROTOCOL | Security protocol for Kafka | "PLAINTEXT" |
Message routing configuration
Environment variable | Description | Default value |
---|---|---|
KAFKA_DEFAULT_FX_CONTEXT | Default context value for message routing when no context is provided | "" (empty string) |
When KAFKA_DEFAULT_FX_CONTEXT
is set and an event is received on Kafka without an fxContext header, the system will automatically apply the default context value to the message.
Kafka consumer retry settings
Environment variable | Description | Default value |
---|---|---|
KAFKA_AUTH_EXCEPTION_RETRY_INTERVAL | Interval between retries after AuthorizationException (seconds) | 10 |
Consumer groups & consumer threads configuration
Both a producer and a consumer must be configured:
About consumer groups and threads
A consumer group is a set of consumers that jointly consume messages from one or more Kafka topics. Each consumer group has a unique identifier (group ID) that Kafka uses to manage message distribution.
Thread numbers refer to the number of threads a consumer application uses to process messages. Increasing thread count can improve parallelism and efficiency, especially with high message volumes.
Consumer group configuration
Environment variable | Description | Default value |
---|---|---|
KAFKA_CONSUMER_GROUP_ID_NOTIFY_ADVANCE | Group ID for notifying advance actions | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_NOTIFY_PARENT | Group ID for notifying when a subprocess is blocked | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_ADAPTERS | Group ID for messages related to adapters | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_RUN_ACTION | Group ID for running scheduled actions | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_ADVANCING | Group ID for messages indicating continuing advancement | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_MESSAGE_EVENTS | Group ID for message events | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_START | Group ID for starting processes | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_START_FOR_EVENT | Group ID for starting processes for an event | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_EXPIRE | Group ID for expiring processes | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_OPERATIONS | Group ID for processing operations from Task Management plugin | notif123-preview |
KAFKA_CONSUMER_GROUP_ID_PROCESS_BATCH_PROCESSING | Group ID for processing bulk operations from Task Management plugin | notif123-preview |
Consumer thread configuration
Environment variable | Description | Default value |
---|---|---|
KAFKA_CONSUMER_THREADS_NOTIFY_ADVANCE | Number of threads for notifying advance actions | 6 |
KAFKA_CONSUMER_THREADS_NOTIFY_PARENT | Number of threads for notifying when a subprocess is blocked | 6 |
KAFKA_CONSUMER_THREADS_ADAPTERS | Number of threads for processing messages related to adapters | 6 |
KAFKA_CONSUMER_THREADS_SCHEDULER_ADVANCING | Number of threads for continuing advancement | 6 |
KAFKA_CONSUMER_THREADS_SCHEDULER_RUN_ACTION | Number of threads for running scheduled actions | 6 |
KAFKA_CONSUMER_THREADS_MESSAGE_EVENTS | Number of threads for message events | 6 |
KAFKA_CONSUMER_THREADS_PROCESS_START | Number of threads for starting processes | 6 |
KAFKA_CONSUMER_THREADS_PROCESS_START_FOR_EVENT | Number of threads for starting processes for an event | 2 |
KAFKA_CONSUMER_THREADS_PROCESS_EXPIRE | Number of threads for expiring processes | 6 |
KAFKA_CONSUMER_THREADS_PROCESS_OPERATIONS | Number of threads for processing operations from task management | 6 |
KAFKA_CONSUMER_THREADS_PROCESS_BATCH_PROCESSING | Number of threads for processing bulk operations from task management | 6 |
All events that start with a configured pattern will be consumed by the Engine. This enables you to create new integrations and connect them to the engine without changing the configuration.
Configuring Kafka topics
Recommended topic naming convention:
Core engine topics
Environment variable | Description | Default value |
---|---|---|
KAFKA_TOPIC_PROCESS_NOTIFY_ADVANCE | Topic used internally for advancing processes | ai.flowx.dev.core.notify.advance.process.v1 |
KAFKA_TOPIC_PROCESS_NOTIFY_PARENT | Topic used for sub-processes to notify the parent process | ai.flowx.dev.core.notify.parent.process.v1 |
KAFKA_TOPIC_PATTERN | Pattern the Engine listens on for incoming events | ai.flowx.dev.engine.receive.* |
KAFKA_TOPIC_LICENSE_OUT | Topic used to generate licensing-related details | ai.flowx.dev.core.trigger.save.license.v1 |
KAFKA_TOPIC_PROCESS_EVENT_MESSAGE | Topic for process message events | ai.flowx.dev.core.message.event.process.v1 |
Topics related to the Task Management plugin
Environment variable | Description | Default value |
---|---|---|
KAFKA_TOPIC_TASK_OUT | Topic used for sending notifications to the plugin | ai.flowx.dev.plugin.tasks.trigger.save.task.v1 |
KAFKA_TOPIC_PROCESS_OPERATIONS_IN | Topic for receiving information about operations performed | ai.flowx.dev.core.trigger.operations.v1 |
KAFKA_TOPIC_PROCESS_OPERATIONS_BULK_IN | Topic where operations can be performed in bulk | ai.flowx.core.trigger.operations.bulk.v1 |
OPERATIONS_IN request example
BULK_IN request example
To send additional keys in the response, attach them in the header. For example, you can use a requestID
key.
A response should be sent on a callbackTopic
if it is mentioned in the headers:
Task manager operations include: assignment, unassignment, hold, unhold, terminate. These are matched with the ...operations.out
topic on the engine side. For more information, see the Task Management plugin documentation:
Topics related to the scheduler extension
Environment variable | Description | Default value |
---|---|---|
KAFKA_TOPIC_PROCESS_EXPIRE_IN | Topic for requests to expire processes | ai.flowx.dev.core.trigger.expire.process.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_SET | Topic used for scheduling process expiration | ai.flowx.dev.core.trigger.set.schedule.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_STOP | Topic used for stopping process expiration | ai.flowx.dev.core.trigger.stop.schedule.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_RUN_ACTION | Topic for requests to run scheduled actions | ai.flowx.dev.core.trigger.run.action.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_ADVANCE | Topic for events related to advancing through a database | ai.flowx.dev.core.trigger.advance.process.v1 |
Topics related to Timer Events
Environment variable | Description | Default value |
---|---|---|
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_SET | Used to communicate with Scheduler microservice | ai.flowx.dev.core.trigger.set.timer-event-schedule.v1 |
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_STOP | Used to communicate with Scheduler microservice | ai.flowx.dev.core.trigger.stop.timer-event-schedule.v1 |
Topics related to the Search Data service
Environment variable | Description | Default value |
---|---|---|
KAFKA_TOPIC_DATA_SEARCH_IN | Topic that the Engine listens on for search requests | ai.flowx.dev.core.trigger.search.data.v1 |
KAFKA_TOPIC_DATA_SEARCH_OUT | Topic used by the Engine to reply after finding a process | ai.flowx.dev.engine.receive.core.search.data.results.v1 |
Topics related to the Audit service
Environment variable | Description | Default value |
---|---|---|
KAFKA_TOPIC_AUDIT_OUT | Topic for sending audit logs | ai.flowx.dev.core.trigger.save.audit.v1 |
Topics related to ES indexing
Environment variable | Default value |
---|---|
KAFKA_TOPIC_PROCESS_INDEX_OUT | ai.flowx.dev.core.index.process.v1 |
Processes that can be started by sending messages to a Kafka topic
Environment variable | Description | Default value |
---|---|---|
KAFKA_TOPIC_PROCESS_START_IN | Topic for requests to start a new process instance | ai.flowx.dev.core.trigger.start.process.v1 |
KAFKA_TOPIC_PROCESS_START_OUT | Topic for sending the reply after starting a new process instance | ai.flowx.dev.core.confirm.start.process.v1 |
Topics related to Message Events
Environment variable | Default value |
---|---|
KAFKA_TOPIC_PROCESS_START_FOR_EVENT | ai.flowx.dev.core.trigger.start-for-event.process.v1 |
Topics related to Events-gateway microservice
Environment variable | Description | Default value |
---|---|---|
KAFKA_TOPIC_EVENTSGATEWAY_OUT_MESSAGE | Outgoing messages from process-engine to events-gateway | ai.flowx.eventsgateway.engine.commands.message.v1 |
KAFKA_TOPIC_EVENTSGATEWAY_OUT_DISCONNECT | Disconnect commands from process-engine to events-gateway | ai.flowx.eventsgateway.engine.commands.disconnect.v1 |
KAFKA_TOPIC_EVENTSGATEWAY_OUT_CONNECT | Connect commands from process-engine to events-gateway | ai.flowx.eventsgateway.engine.commands.connect.v1 |
Topics related to platform components
Environment variable | Description | Default value |
---|---|---|
KAFKA_TOPIC_PLATFORM_COMPONENTS_VERSIONS_OUT | Topic for platform version caching | ai.flowx.dev.core.trigger.platform.versions.caching.v1 |
Inter-service topic coordination
When configuring FlowX services, ensure the following:
- The Engine’s
pattern
must match the pattern used by services sending messages to the Engine - The
integrationPattern
must match the pattern used by the Integration Designer - Output topics from one service must match the expected input topics of another service
For example:
- Services send to topics matching
ai.flowx.dev.engine.receive.*
→ Engine listens - Engine sends to topics matching
ai.flowx.dev.integration.receive.*
→ Integration Designer listens
Kafka message size configuration
Environment variable | Description | Default value |
---|---|---|
KAFKA_MESSAGE_MAX_BYTES | Maximum message size in bytes | 52428800 (50MB) |
This setting affects:
- Producer message max bytes
- Producer max request size
- Consumer max partition fetch bytes
Kafka authentication
For secure environments, you can enable OAuth authentication with the following configuration:
You need to set the following environment variables:
KAFKA_OAUTH_CLIENT_ID
KAFKA_OAUTH_CLIENT_SECRET
KAFKA_OAUTH_TOKEN_ENDPOINT_URI
Configuring file upload size
Environment variable | Description | Default value |
---|---|---|
SPRING_SERVLET_MULTIPART_MAX_FILE_SIZE | Maximum file size allowed for uploads | 50MB |
SPRING_SERVLET_MULTIPART_MAX_REQUEST_SIZE | Maximum request size allowed for uploads | 50MB |
Connecting the Advancing controller
To use the advancing controller, configure the following variables:
Environment variable | Description |
---|---|
ADVANCING_DATASOURCE_JDBC_URL | Connection URL for Advancing Postgres DB |
ADVANCING_DATASOURCE_USERNAME | Username for Advancing DB connection |
ADVANCING_DATASOURCE_PASSWORD | Password for Advancing DB connection |
Configuring the Advancing controller
Environment variable | Description | Default value |
---|---|---|
ADVANCING_TYPE | Type of advancing mechanism | PARALLEL (alternatives: KAFKA , PARALLEL ) |
ADVANCING_THREADS | Number of parallel threads | 20 |
ADVANCING_PICKING_BATCH_SIZE | Number of tasks to pick in each batch | 10 |
ADVANCING_PICKING_PAUSE_MILLIS | Pause duration between batches (ms) | 100 |
ADVANCING_COOLDOWN_AFTER_SECONDS | Cooldown period after processing a batch (seconds) | 120 |
ADVANCING_SCHEDULER_HEARTBEAT_CRONEXPRESSION | Cron expression for the heartbeat | "*/2 * * * * ?" |
Advancing controller setup
Configuring cleanup mechanism
Environment variable | Description | Default value |
---|---|---|
SCHEDULER_THREADS | Number of threads for the scheduler | 10 |
SCHEDULER_PROCESS_CLEANUP_ENABLED | Activates the cron job for process cleanup | false |
SCHEDULER_PROCESS_CLEANUP_CRON_EXPRESSION | Cron expression for the process cleanup scheduler | 0 */5 0-5 * * ? (every 5 minutes between 12 AM and 5 AM) |
SCHEDULER_PROCESS_CLEANUP_BATCH_SIZE | Number of processes to clean up in one batch | 1000 |
SCHEDULER_MASTER_ELECTION_CRON_EXPRESSION | Cron expression for the master election process | 30 */3 * * * ? (every 3 minutes) |
Managing subprocesses expiration
Environment variable | Description | Default value |
---|---|---|
FLOWX_PROCESS_EXPIRE_SUBPROCESSES | When true, terminates all subprocesses upon parent process expiration. When false, subprocesses follow their individual expiration settings | true |
Configuring application management
The following configuration from versions before 4.1 will be deprecated in version 5.0:
MANAGEMENT_METRICS_EXPORT_PROMETHEUS_ENABLED
: Enables or disables Prometheus metrics export.
Starting from version 4.1, use the following configuration instead. This setup is backwards compatible until version 5.0.
Environment variable | Description | Default value |
---|---|---|
MANAGEMENT_PROMETHEUS_METRICS_EXPORT_ENABLED | Enables Prometheus metrics export | false |
RBAC configuration
Process Engine requires specific RBAC permissions for proper access to Kubernetes resources:
Was this page helpful?