Infrastructure prerequisites

Before installing the FlowX.AI Engine, verify that the following infrastructure components are installed and configured:

  • Kafka: Version 2.8 or higher
  • Elasticsearch: Version 7.11.0 or higher
  • PostgreSQL: Version 13 or higher for storing application data
  • MongoDB: Version 4.4 or higher for managing runtime builds

Dependencies

The FlowX Engine requires the following components:

For a microservices architecture, services typically manage their data via dedicated databases.

Required external services

  • Redis Cluster: Caches process definitions, compiled scripts, and Kafka responses
  • Kafka Cluster: Enables communication with external plugins and integrations

Configuration setup

FlowX.AI Engine uses environment variables for configuration. This section covers key configuration areas:

Database configuration

PostgreSQL

Environment variableDescription
SPRING_DATASOURCE_URLDatabase URL for PostgreSQL
SPRING_DATASOURCE_USERNAMEUsername for PostgreSQL
SPRING_DATASOURCE_PASSWORDPassword for PostgreSQL
SPRING_DATASOURCE_DRIVER_CLASS_NAMEDriver class for PostgreSQL

MongoDB configuration

Configure connection to the Runtime MongoDB instance:

Environment variableDescriptionDefault value
SPRING_DATA_MONGODB_RUNTIME_URIURI for connecting to MongoDBFormat: mongodb://${RUNTIME_DB_USERNAME}:${DB_PASSWORD}@<host1>,<host2>,<arbiter-host>:<port>/${DB_NAME}?retryWrites=false
RUNTIME_DB_USERNAMEMongoDB usernameapp-runtime
DB_NAMEMongoDB database nameapp-runtime

Configuring script engine

FlowX.AI Engine supports multiple scripting languages for business rules and actions.

You must also enable these environment variables on the AI Developer agent, if you have it set up.

Python runtime configuration

By default, FlowX.AI 4.7.1 uses Python 2.7 (Jython) as the Python runtime. To enable Python 3 via GraalPy with its 3x performance improvements, you must explicitly set the feature toggle.

Environment variableDescriptionDefault valuePossible values
FLOWX_SCRIPTENGINE_USEGRAALVMDetermines which Python runtime to usefalsetrue, false

Python 2.7 support will be deprecated in FlowX.AI 5.0. We recommend migrating your Python scripts to Python 3 to take advantage of improved performance and modern language features.

GraalVM cache configuration

When using GraalVM (FLOWX_SCRIPTENGINE_USEGRAALVM=true), ensure the engine has proper access to a cache directory within the container. By default, this is configured in the /tmp directory.

For environments with filesystem restrictions or custom configurations, you need to properly configure the GraalVM cache.

There are two methods to configure the GraalVM cache location:

Option 1: Using Java options (Preferred)

Add the following Java option to your deployment configuration:

-Dpolyglot.engine.userResourceCache=/tmp

This option is set by default in the standard Docker image but might need to be included if you’re overriding the Java options.

Option 2: Using environment variables

Alternatively, set the following environment variable:

XDG_CACHE_HOME=/tmp

If you encounter errors related to Python script execution when using GraalVM, verify that the cache directory is properly configured and accessible.

For more details about supported scripting languages and their capabilities, see the Supported scripts documentation.

Authorization & access roles

This section covers OAuth2 configuration settings for securing the Spring application.

Resource server settings (OAuth2 configuration)

The following configuration from versions before 4.1 will be deprecated in version 5.0:

  • SECURITY_OAUTH2_BASE_SERVER_URL: Base URL for the OAuth 2.0 Authorization Server
  • SECURITY_OAUTH2_CLIENT_CLIENT_ID: Client identifier registered with the OAuth 2.0 Authorization Server
  • SECURITY_OAUTH2_CLIENT_CLIENT_SECRET: Secret key for authenticating client requests
  • SECURITY_OAUTH2_REALM: Realm name used for OAuth2 authentication

Starting from version 4.1, use the following configuration instead. This setup is backwards compatible until version 4.5.

Environment variableDescriptionDefault value
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_INTROSPECTION_URIURI for token introspection${security.oauth2.base-server-url}/realms/${security.oauth2.realm}/protocol/openid-connect/token/introspect
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_CLIENT_IDClient ID for token introspection${security.oauth2.client.client-id}
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_CLIENT_SECRETClient secret for token introspection${security.oauth2.client.client-secret}

Service account settings

Configure the process engine service account:

Environment variableDescriptionDefault value
SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_IDClient ID for the service accountflowx-${SPRING_APPLICATION_NAME}-sa
SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_SECRETClient secret for the service account

For more information about the necessary service account, see Process Engine Service Account.

Security configuration

Environment variableDescriptionDefault value
SECURITY_TYPEType of security mechanismoauth2
SECURITY_BASIC_ENABLEDEnable basic authenticationfalse
SECURITY_PUBLIC_PATHS_0Public path not requiring authentication/api/platform/components-versions
SECURITY_PUBLIC_PATHS_1Public path not requiring authentication/manage/actuator/health
SECURITY_PATH_AUTHORIZATIONS_0_PATHSecurity path pattern"/api/**"
SECURITY_PATH_AUTHORIZATIONS_0_ROLES_ALLOWEDRoles allowed for path access"ANY_AUTHENTICATED_USER"

Configuring Kafka

Kafka handles all communication between the FlowX.AI Engine, external plugins, and integrations. It also notifies running process instances when certain events occur.

Kafka connection settings

Environment variableDescriptionDefault value
SPRING_KAFKA_BOOTSTRAP_SERVERSKafka bootstrap serverslocalhost:9092
SPRING_KAFKA_SECURITY_PROTOCOLSecurity protocol for Kafka"PLAINTEXT"

Message routing configuration

Environment variableDescriptionDefault value
KAFKA_DEFAULT_FX_CONTEXTDefault context value for message routing when no context is provided"" (empty string)

When KAFKA_DEFAULT_FX_CONTEXT is set and an event is received on Kafka without an fxContext header, the system will automatically apply the default context value to the message.

Kafka consumer retry settings

Environment variableDescriptionDefault value
KAFKA_AUTH_EXCEPTION_RETRY_INTERVALInterval between retries after AuthorizationException (seconds)10

Consumer groups & consumer threads configuration

Both a producer and a consumer must be configured:

About consumer groups and threads

A consumer group is a set of consumers that jointly consume messages from one or more Kafka topics. Each consumer group has a unique identifier (group ID) that Kafka uses to manage message distribution.

Thread numbers refer to the number of threads a consumer application uses to process messages. Increasing thread count can improve parallelism and efficiency, especially with high message volumes.

Consumer group configuration

Environment variableDescriptionDefault value
KAFKA_CONSUMER_GROUP_ID_NOTIFY_ADVANCEGroup ID for notifying advance actionsnotif123-preview
KAFKA_CONSUMER_GROUP_ID_NOTIFY_PARENTGroup ID for notifying when a subprocess is blockednotif123-preview
KAFKA_CONSUMER_GROUP_ID_ADAPTERSGroup ID for messages related to adaptersnotif123-preview
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_RUN_ACTIONGroup ID for running scheduled actionsnotif123-preview
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_ADVANCINGGroup ID for messages indicating continuing advancementnotif123-preview
KAFKA_CONSUMER_GROUP_ID_MESSAGE_EVENTSGroup ID for message eventsnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_STARTGroup ID for starting processesnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_START_FOR_EVENTGroup ID for starting processes for an eventnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_EXPIREGroup ID for expiring processesnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_OPERATIONSGroup ID for processing operations from Task Management pluginnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_BATCH_PROCESSINGGroup ID for processing bulk operations from Task Management pluginnotif123-preview

Consumer thread configuration

Environment variableDescriptionDefault value
KAFKA_CONSUMER_THREADS_NOTIFY_ADVANCENumber of threads for notifying advance actions6
KAFKA_CONSUMER_THREADS_NOTIFY_PARENTNumber of threads for notifying when a subprocess is blocked6
KAFKA_CONSUMER_THREADS_ADAPTERSNumber of threads for processing messages related to adapters6
KAFKA_CONSUMER_THREADS_SCHEDULER_ADVANCINGNumber of threads for continuing advancement6
KAFKA_CONSUMER_THREADS_SCHEDULER_RUN_ACTIONNumber of threads for running scheduled actions6
KAFKA_CONSUMER_THREADS_MESSAGE_EVENTSNumber of threads for message events6
KAFKA_CONSUMER_THREADS_PROCESS_STARTNumber of threads for starting processes6
KAFKA_CONSUMER_THREADS_PROCESS_START_FOR_EVENTNumber of threads for starting processes for an event2
KAFKA_CONSUMER_THREADS_PROCESS_EXPIRENumber of threads for expiring processes6
KAFKA_CONSUMER_THREADS_PROCESS_OPERATIONSNumber of threads for processing operations from task management6
KAFKA_CONSUMER_THREADS_PROCESS_BATCH_PROCESSINGNumber of threads for processing bulk operations from task management6

All events that start with a configured pattern will be consumed by the Engine. This enables you to create new integrations and connect them to the engine without changing the configuration.

Configuring Kafka topics

Recommended topic naming convention:

topic:
  naming:
    package: "ai.flowx."
    environment: "dev."
    version: ".v1"
    prefix: ${kafka.topic.naming.package}${kafka.topic.naming.environment}
    suffix: ${kafka.topic.naming.version}
    engineReceivePattern: engine.receive.
    integrationReceivePattern: integration.receive.

  pattern: ${kafka.topic.naming.prefix}${kafka.topic.naming.engineReceivePattern}*
  integrationPattern: ${kafka.topic.naming.prefix}${kafka.topic.naming.integrationReceivePattern}*

Core engine topics

Environment variableDescriptionDefault value
KAFKA_TOPIC_PROCESS_NOTIFY_ADVANCETopic used internally for advancing processesai.flowx.dev.core.notify.advance.process.v1
KAFKA_TOPIC_PROCESS_NOTIFY_PARENTTopic used for sub-processes to notify the parent processai.flowx.dev.core.notify.parent.process.v1
KAFKA_TOPIC_PATTERNPattern the Engine listens on for incoming eventsai.flowx.dev.engine.receive.*
KAFKA_TOPIC_LICENSE_OUTTopic used to generate licensing-related detailsai.flowx.dev.core.trigger.save.license.v1
KAFKA_TOPIC_PROCESS_EVENT_MESSAGETopic for process message eventsai.flowx.dev.core.message.event.process.v1
Environment variableDescriptionDefault value
KAFKA_TOPIC_TASK_OUTTopic used for sending notifications to the pluginai.flowx.dev.plugin.tasks.trigger.save.task.v1
KAFKA_TOPIC_PROCESS_OPERATIONS_INTopic for receiving information about operations performedai.flowx.dev.core.trigger.operations.v1
KAFKA_TOPIC_PROCESS_OPERATIONS_BULK_INTopic where operations can be performed in bulkai.flowx.core.trigger.operations.bulk.v1
OPERATIONS_IN request example
{
  "operationType": "UNASSIGN", //type of operation performed in Task Management plugin
  "taskId": "some task id",
  "processInstanceUuid": "1cff0b7d-966b-4b35-9e9b-63b1d6757ec6",
  "swimlaneName": "Default",
  "swimlaneId": "51ec1241-fe06-4576-9c84-31598c05c527",
  "owner": {
    "firstName": null,
    "lastName": null,
    "username": "service-account-flowx-process-engine-account",
    "enabled": false
  },
  "author": "admin@flowx.ai"
}
BULK_IN request example
{
  "operations": [
    {
      "operationType": "HOLD",
      "taskId": "some task id",
      "processInstanceUuid": "d3aabfd8-d041-4c62-892f-22d17923b223", // the id of the process instance
      "swimlaneName": "Default", //name of the swimlane
      "owner": null,
      "author": "john.doe@flowx.ai"
    },
    {
      "operationType": "HOLD",
      "taskId": "some task id",
      "processInstanceUuid": "d3aabfd8-d041-4c62-892f-22d17923b223",
      "swimlaneName": "Default", //name of the swimlane
      "owner": null,
      "author": "john.doe@flowx.ai"
    }
  ]
}      

To send additional keys in the response, attach them in the header. For example, you can use a requestID key.

A response should be sent on a callbackTopic if it is mentioned in the headers:

{"processInstanceId": ${processInstanceId}, "callbackTopic": "test.operations.out", "requestID":"1234567890"}

Task manager operations include: assignment, unassignment, hold, unhold, terminate. These are matched with the ...operations.out topic on the engine side. For more information, see the Task Management plugin documentation:

📄 Task management plugin

Environment variableDescriptionDefault value
KAFKA_TOPIC_PROCESS_EXPIRE_INTopic for requests to expire processesai.flowx.dev.core.trigger.expire.process.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_SETTopic used for scheduling process expirationai.flowx.dev.core.trigger.set.schedule.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_STOPTopic used for stopping process expirationai.flowx.dev.core.trigger.stop.schedule.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_RUN_ACTIONTopic for requests to run scheduled actionsai.flowx.dev.core.trigger.run.action.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_ADVANCETopic for events related to advancing through a databaseai.flowx.dev.core.trigger.advance.process.v1
Environment variableDescriptionDefault value
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_SETUsed to communicate with Scheduler microserviceai.flowx.dev.core.trigger.set.timer-event-schedule.v1
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_STOPUsed to communicate with Scheduler microserviceai.flowx.dev.core.trigger.stop.timer-event-schedule.v1
Environment variableDescriptionDefault value
KAFKA_TOPIC_DATA_SEARCH_INTopic that the Engine listens on for search requestsai.flowx.dev.core.trigger.search.data.v1
KAFKA_TOPIC_DATA_SEARCH_OUTTopic used by the Engine to reply after finding a processai.flowx.dev.engine.receive.core.search.data.results.v1
Environment variableDescriptionDefault value
KAFKA_TOPIC_AUDIT_OUTTopic for sending audit logsai.flowx.dev.core.trigger.save.audit.v1
Environment variableDefault value
KAFKA_TOPIC_PROCESS_INDEX_OUTai.flowx.dev.core.index.process.v1

Processes that can be started by sending messages to a Kafka topic

Environment variableDescriptionDefault value
KAFKA_TOPIC_PROCESS_START_INTopic for requests to start a new process instanceai.flowx.dev.core.trigger.start.process.v1
KAFKA_TOPIC_PROCESS_START_OUTTopic for sending the reply after starting a new process instanceai.flowx.dev.core.confirm.start.process.v1
Environment variableDefault value
KAFKA_TOPIC_PROCESS_START_FOR_EVENTai.flowx.dev.core.trigger.start-for-event.process.v1
Environment variableDescriptionDefault value
KAFKA_TOPIC_EVENTSGATEWAY_OUT_MESSAGEOutgoing messages from process-engine to events-gatewayai.flowx.eventsgateway.engine.commands.message.v1
KAFKA_TOPIC_EVENTSGATEWAY_OUT_DISCONNECTDisconnect commands from process-engine to events-gatewayai.flowx.eventsgateway.engine.commands.disconnect.v1
KAFKA_TOPIC_EVENTSGATEWAY_OUT_CONNECTConnect commands from process-engine to events-gatewayai.flowx.eventsgateway.engine.commands.connect.v1
Environment variableDescriptionDefault value
KAFKA_TOPIC_PLATFORM_COMPONENTS_VERSIONS_OUTTopic for platform version cachingai.flowx.dev.core.trigger.platform.versions.caching.v1

Inter-service topic coordination

When configuring FlowX services, ensure the following:

  1. The Engine’s pattern must match the pattern used by services sending messages to the Engine
  2. The integrationPattern must match the pattern used by the Integration Designer
  3. Output topics from one service must match the expected input topics of another service

For example:

  • Services send to topics matching ai.flowx.dev.engine.receive.* → Engine listens
  • Engine sends to topics matching ai.flowx.dev.integration.receive.* → Integration Designer listens

Kafka message size configuration

Environment variableDescriptionDefault value
KAFKA_MESSAGE_MAX_BYTESMaximum message size in bytes52428800 (50MB)

This setting affects:

  • Producer message max bytes
  • Producer max request size
  • Consumer max partition fetch bytes

Kafka authentication

For secure environments, you can enable OAuth authentication with the following configuration:

spring.config.activate.on-profile: kafka-auth

spring:
  kafka:
    security.protocol: "SASL_PLAINTEXT"
    properties:
      sasl:
        mechanism: "OAUTHBEARER"
        jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.client.id=\"${KAFKA_OAUTH_CLIENT_ID:kafka}\" oauth.client.secret=\"${KAFKA_OAUTH_CLIENT_SECRET:kafka-secret}\"  oauth.token.endpoint.uri=\"${KAFKA_OAUTH_TOKEN_ENDPOINT_URI:kafka.auth.localhost}\" ;"
        login.callback.handler.class: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

You need to set the following environment variables:

  • KAFKA_OAUTH_CLIENT_ID
  • KAFKA_OAUTH_CLIENT_SECRET
  • KAFKA_OAUTH_TOKEN_ENDPOINT_URI

Configuring file upload size

Environment variableDescriptionDefault value
SPRING_SERVLET_MULTIPART_MAX_FILE_SIZEMaximum file size allowed for uploads50MB
SPRING_SERVLET_MULTIPART_MAX_REQUEST_SIZEMaximum request size allowed for uploads50MB

Connecting the Advancing controller

To use the advancing controller, configure the following variables:

Environment variableDescription
ADVANCING_DATASOURCE_JDBC_URLConnection URL for Advancing Postgres DB
ADVANCING_DATASOURCE_USERNAMEUsername for Advancing DB connection
ADVANCING_DATASOURCE_PASSWORDPassword for Advancing DB connection

Configuring the Advancing controller

Environment variableDescriptionDefault value
ADVANCING_TYPEType of advancing mechanismPARALLEL (alternatives: KAFKA, PARALLEL)
ADVANCING_THREADSNumber of parallel threads20
ADVANCING_PICKING_BATCH_SIZENumber of tasks to pick in each batch10
ADVANCING_PICKING_PAUSE_MILLISPause duration between batches (ms)100
ADVANCING_COOLDOWN_AFTER_SECONDSCooldown period after processing a batch (seconds)120
ADVANCING_SCHEDULER_HEARTBEAT_CRONEXPRESSIONCron expression for the heartbeat"*/2 * * * * ?"

Advancing controller setup

Configuring cleanup mechanism

Environment variableDescriptionDefault value
SCHEDULER_THREADSNumber of threads for the scheduler10
SCHEDULER_PROCESS_CLEANUP_ENABLEDActivates the cron job for process cleanupfalse
SCHEDULER_PROCESS_CLEANUP_CRON_EXPRESSIONCron expression for the process cleanup scheduler0 */5 0-5 * * ? (every 5 minutes between 12 AM and 5 AM)
SCHEDULER_PROCESS_CLEANUP_BATCH_SIZENumber of processes to clean up in one batch1000
SCHEDULER_MASTER_ELECTION_CRON_EXPRESSIONCron expression for the master election process30 */3 * * * ? (every 3 minutes)

Managing subprocesses expiration

Environment variableDescriptionDefault value
FLOWX_PROCESS_EXPIRE_SUBPROCESSESWhen true, terminates all subprocesses upon parent process expiration. When false, subprocesses follow their individual expiration settingstrue

Configuring application management

The following configuration from versions before 4.1 will be deprecated in version 5.0:

  • MANAGEMENT_METRICS_EXPORT_PROMETHEUS_ENABLED: Enables or disables Prometheus metrics export.

Starting from version 4.1, use the following configuration instead. This setup is backwards compatible until version 5.0.

Environment variableDescriptionDefault value
MANAGEMENT_PROMETHEUS_METRICS_EXPORT_ENABLEDEnables Prometheus metrics exportfalse

RBAC configuration

Process Engine requires specific RBAC permissions for proper access to Kubernetes resources:

rbac:
  create: true
  rules:
    - apiGroups:
        - ""
      resources:
        - secrets
        - configmaps
        - pods
      verbs:
        - get
        - list
        - watch