Infrastructure prerequisites

The FlowX Engine requires the following components to be set up before it can be started:

  • Docker Engine (version 17.06 or higher)
  • Kafka (version 2.8 or higher)
  • Elasticsearch (version 7.11.0 or higher)
  • Database instance

Dependencies

The FlowX Engine has dependencies on the following components:

For Microservices architecture, some Microservices holds their data individually using separate Databases.

A basic Postgres configuration can be set up using a helm values.yaml file as it follows:

  • helm values.yaml:

      onboardingdb:
        existingSecret: {{secretName}}
        metrics:
          enabled: true
          service:
            annotations:
              prometheus.io/port: {{prometheus port}}
              prometheus.io/scrape: "true"
            type: ClusterIP
          serviceMonitor:
            additionalLabels:
              release: prometheus-operator
            enabled: true
            interval: 30s
            scrapeTimeout: 10s
        persistence:
          enabled: true
          size: 1Gi
        postgresqlDatabase: onboarding
        postgresqlExtendedConf:
          maxConnections: 200
          sharedBuffers: 128MB
        postgresqlUsername: postgres
        resources:
          limits:
            cpu: 6000m
            memory: 2048Mi
          requests:
            cpu: 200m
            memory: 512Mi
    

Redis server

A Redis cluster is required for caching process definitions, compiled scripts, and Kafka responses.

Kafka cluster

Kafka serves as the backbone of the engine, facilitating communication with external plugins and integrations.

Details about setting up logging via Elasticsearch, monitoring, and tracing via Open Telemetry can be found here.

Configuration

FlowX.AI Engine utilizes environment variables for configuration. Below are the key environment variables you need to configure:

Authorization & access roles

This section outlines the OAuth2 configuration settings for securing the Spring application, including resource server settings, security type, and access authorizations.

Resource Server Settings (OAuth2 Configuration)

Old configuration from < v4.1 releases (will be deprecated in v4.5):

  • SECURITY_OAUTH2_BASE_SERVER_URL: The base URL for the OAuth 2.0 Authorization Server, which is responsible for authentication and authorization for clients and users, it is used to authorize clients, as well as to issue and validate access tokens.
  • SECURITY_OAUTH2_CLIENT_CLIENT_ID: A unique identifier for a client application that is registered with the OAuth 2.0 Authorization Server, this is used to authenticate the client application when it attempts to access resources on behalf of a user.
  • SECURITY_OAUTH2_CLIENT_CLIENT_SECRET: Secret Key that is used to authenticate requests made by an authorization client.
  • SECURITY_OAUTH2_REALM: Security configuration env var in the Spring Security OAuth2 framework, it is used to specify the realm name used when authenticating with OAuth2 providers.

New configuration, starting from v4.1 release, available below.

Environment variableDescriptionDefault Value
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_INTROSPECTION_URIURI for token introspection to validate opaque tokens${security.oauth2.base-server-url}/realms/${security.oauth2.realm}/protocol/openid-connect/token/introspect
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_CLIENT_IDClient ID for token introspection${security.oauth2.client.client-id}
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_CLIENT_SECRETClient secret for token introspection${security.oauth2.client.client-secret}

Service account settings

This section contains the environment variables for configuring process engine service account.

Environment VariableDescriptionDefault Value
SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_IDClient ID for the service accountflowx-${SPRING_APPLICATION_NAME}-sa
SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_SECRETClient secret for the service account

More details about the necessary service account, here:

Process engine service account

Security configuration

Environment variableDescriptionDefault Value
SECURITY_TYPEType of security mechanism usedoauth2
SECURITY_BASIC_ENABLEDEnable basic authenticationfalse
SECURITY_PUBLIC_PATHS_0List of public paths that do not require authentication/api/platform/components-versions
SECURITY_PUBLIC_PATHS_1List of public paths that do not require authentication/manage/actuator/health
SECURITY_PATH_AUTHORIZATIONS_0_PATHDefines a security path or endpoint pattern. It specifies that the security settings apply to all paths under the "/api/" path. The ** is a wildcard that means it includes all subpaths under "/api/**"."/api/**"
SECURITY_PATH_AUTHORIZATIONS_0_ROLES_ALLOWEDSpecifies the roles allowed for accessing the specified path. In this case, the roles allowed are empty (""). This might imply that access to the "/api/**" paths is open to all users or that no specific roles are required for authorization."ANY_AUTHENTICATED_USER"

Configuring Kafka

Kafka handles all communication between the FlowX.AI Engine and external plugins and integrations. It is also used for notifying running process instances when certain events occur.

Kafka connection settings

Environment VariableDescriptionDefault Value
SPRING_KAFKA_BOOTSTRAP_SERVERSKafka bootstrap serverslocalhost:9092
SPRING_KAFKA_SECURITY_PROTOCOLSecurity protocol for Kafka"PLAINTEXT"

Kafka consumer retry settings

Environment VariableDescriptionDefault Value
KAFKA_AUTH_EXCEPTION_RETRY_INTERVALInterval between retries after AuthorizationException is thrown.10

Consumer groups & consumer threads configuration

Both a producer and a consumer must be configured:

Consumer groups & consumer threads

In Kafka a consumer group is a group of consumers that jointly consume and process messages from one or more Kafka topics. Each consumer group has a unique identifier called a group ID, which is used by Kafka to manage message consumption and distribution among the members of the group.

Thread numbers, on the other hand, refer to the number of threads that a consumer application uses to process messages from Kafka. By default, each consumer instance runs in a single thread, which can limit the throughput of message processing. Increasing the number of consumer threads can help to improve the parallelism and efficiency of message consumption, especially when dealing with high message volumes.

Both group IDs and thread numbers can be configured in Kafka to optimize the processing of messages according to specific requirements, such as message volume, message type, and processing latency.

The configuration related to consumers (group ids and thread numbers) can be configured separately for each message type as it follows:

Consumer Group Configuration

Environment VariableDescriptionDefault Value
KAFKA_CONSUMER_GROUP_ID_NOTIFY_ADVANCEGroup ID for notifying advance actions.notif123-preview
KAFKA_CONSUMER_GROUP_ID_NOTIFY_PARENTGroup ID for notifying when a subprocess is blocked.notif123-preview
KAFKA_CONSUMER_GROUP_ID_ADAPTERSGroup ID for messages related to adapters.notif123-preview
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_RUN_ACTIONGroup ID for running scheduled actions.notif123-preview
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_ADVANCINGGroup ID for messages indicating continuing advancement.notif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_STARTGroup ID for starting processes.notif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_START_FOR_EVENTGroup ID for starting processes for an event.notif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_EXPIREGroup ID for expiring processes.notif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_OPERATIONSGroup ID for processing operations from task management.notif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_BATCH_PROCESSINGGroup ID for processing bulk operations from task management.notif123-preview

Consumer Thread Configuration

Environment VariableDescriptionDefault Value
KAFKA_CONSUMER_THREADS_NOTIFY_ADVANCENumber of threads for notifying advance actions.6
KAFKA_CONSUMER_THREADS_NOTIFY_PARENTNumber of threads for notifying when a subprocess is blocked.6
KAFKA_CONSUMER_THREADS_ADAPTERSNumber of threads for processing messages related to adapters.6
KAFKA_CONSUMER_THREADS_SCHEDULER_ADVANCINGNumber of threads for continuing advancement.6
KAFKA_CONSUMER_THREADS_SCHEDULER_RUN_ACTIONNumber of threads for running scheduled actions.6
KAFKA_CONSUMER_THREADS_PROCESS_STARTNumber of threads for starting processes.6
KAFKA_CONSUMER_THREADS_PROCESS_START_FOR_EVENTNumber of threads for starting processes for an event.2
KAFKA_CONSUMER_THREADS_PROCESS_EXPIRENumber of threads for expiring processes.6
KAFKA_CONSUMER_THREADS_PROCESS_OPERATIONSNumber of threads for processing operations from task management.6
KAFKA_CONSUMER_THREADS_PROCESS_BATCH_PROCESSINGNumber of threads for processing bulk operations from task management.6

It is important to know that all the events that start with a configured pattern will be consumed by the engine. This makes it possible to create a new integration and connect it to the engine without changing the configuration of the engine.

Configuring kafka topics

The suggested topic pattern naming convention is the following:

 topic:
    naming:
      package: "ai.flowx."
      environment: "dev."
      version: ".v1"
      prefix: ${kafka.topic.naming.package}${kafka.topic.naming.environment}
      suffix: ${kafka.topic.naming.version}
      engineReceivePattern: engine.receive.

    pattern: ${kafka.topic.naming.prefix}${kafka.topic.naming.engineReceivePattern}*
Environment VariableDescriptionDefault FlowX value (can be overwritten)
KAFKA_TOPIC_PROCESS_NOTIFY_ADVANCEKafka topic used internally by the engine for advancing processes.ai.flowx.dev.core.notify.advance.process.v1
KAFKA_TOPIC_PROCESS_NOTIFY_PARENTKafka topic used for sub-processes to notify the parent process.ai.flowx.dev.core.notify.parent.process.v1
KAFKA_TOPIC_PATTERNThe topic name pattern that the Engine listens on for incoming events.ai.flowx.dev.engine.receive.*
KAFKA_TOPIC_LICENSE_OUTThe topic name used by the Engine to generate licensing-related details.ai.flowx.dev.core.trigger.save.license.v1
Default parameter (env var)DescriptionDefault FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_TASK_OUTKafka topic used for sending notifications to the plugin.ai.flowx.dev.plugin.tasks.trigger.save.task.v1
KAFKA_TOPIC_PROCESS_OPERATIONS_INKafka topic used for receiving calls from the task management plugin withai.flowx.dev.core.trigger.operations.v1
information regarding operations performed.
KAFKA_TOPIC_PROCESS_OPERATIONS_BULK_INKafka topic where operations can be performed in bulk, allowing multipleai.flowx.core.trigger.operations.bulk.v1
operations to be sent at once.

OPERATIONS_IN request example

{
  "operationType": "UNASSIGN", //type of operation performed in Task Management plugin
  "taskId": "some task id",
  "processInstanceUuid": "1cff0b7d-966b-4b35-9e9b-63b1d6757ec6",
  "swimlaneName": "Default",
  "swimlaneId": "51ec1241-fe06-4576-9c84-31598c05c527",
  "owner": {
    "firstName": null,
    "lastName": null,
    "username": "service-account-flowx-process-engine-account",
    "enabled": false
  },
  "author": "admin@flowx.ai"
}

BULK_IN request example

{

  "operations": [
    {
      "operationType": "HOLD",
      "taskId": "some task id",
      "processInstanceUuid": "d3aabfd8-d041-4c62-892f-22d17923b223", // the id of the process instance
      "swimlaneName": "Default", //name of the swimlane
      "owner": null,
      "author": "john.doe@flowx.ai",
    },

    {
      "operationType": "HOLD",
      "taskId": "some task id",
      "processInstanceUuid": "d3aabfd8-d041-4c62-892f-22d17923b223",
      "swimlaneName": "Default", //name of the swimlane
      "owner": null,
      "author": "john.doe@flowx.ai",
    }
  ]
}      

If you need to send additional keys on the response, attach them in the header, as in the following example, where we used requestID key.

A response should be sent on a callbackTopic if it is mentioned in the headers, as in the following example:

{"processInstanceId": ${processInstanceId}, "callbackTopic": "test.operations.out", "requestID":"1234567890"}

Task manager operations could be the following: assignment, unassignment, hold, unhold, terminate and it is matched with the ...operations.out topic on the engine side. For more information check the Task Management plugin documentation:

📄 Task management plugin

Scheduler

Environment variableDescriptionDefault FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_EXPIRE_INTopic name for requests to expire processes.ai.flowx.dev.core.trigger.expire.process.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_SETTopic name used for scheduling process expiration.ai.flowx.dev.core.trigger.set.schedule.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_STOPTopic name used for stopping process expiration.ai.flowx.dev.core.trigger.stop.schedule.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_RUN_ACTIONTopic name for requests to run scheduled actions.ai.flowx.dev.core.trigger.run.action.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_ADVANCETopic name for events related to advancing through a database sent by the scheduler.ai.flowx.dev.core.trigger.advance.process.v1

Using the scheduler

Environment variableDescriptionDefault FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_SETUsed to communicate with Scheduler microserviceai.flowx.dev.core.trigger.set.timer-event-schedule.v1
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_STOPUsed to communicate with Scheduler microserviceai.flowx.dev.core.trigger.stop.timer-event-schedule.v1
Environment variableDescriptionDefault FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_DATA_SEARCH_INThe topic name that the Engine listens on for requests to search for processesai.flowx.dev.core.trigger.search.data.v1
KAFKA_TOPIC_DATA_SEARCH_OUTThe topic name used by the Engine to reply after finding a processai.flowx.dev.engine.receive.core.search.data.results.v1
Environment variableDescriptionDefault FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_AUDIT_OUTTopic key for sending audit logsai.flowx.dev.core.save.audit.v1
Environment variableDefault FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_INDEX_OUTai.flowx.dev.core.index.process.v1

Processes that can be started by sending messages to a Kafka topic

Environment variableDescriptionDefault FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_START_INThe Engine listens on this topic for requests to start a new process instanceai.flowx.dev.core.trigger.start.process.v1
KAFKA_TOPIC_PROCESS_START_OUTUsed for sending out the reply after starting a new process instanceai.flowx.dev.core.confirm.start.process.v1
Environment variableDefault FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_EVENT_MESSAGEai.flowx.dev.core.message.event.process.v1
KAFKA_TOPIC_PROCESS_START_FOR_EVENTai.flowx.dev.core.trigger.start-for-event.process.v1
Environment variableDescriptionDefault FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_EVENTSGATEWAY_OUT_MESSAGEOutgoing messages from process-engine to events-gatewayai.flowx.eventsgateway.engine.commands.message.v1
KAFKA_TOPIC_EVENTSGATEWAY_OUT_DISCONNECTDisconnect commands from process-engine to events-gatewayai.flowx.eventsgateway.engine.commands.disconnect.v1
KAFKA_TOPIC_EVENTSGATEWAY_OUT_CONNECTConnect commands from process-engine to events-gatewayai.flowx.eventsgateway.engine.commands.connect.v1

Configuring file upload size

The maximum file size allowed for uploads can be set by using the following environment variables:

Environment variableDescriptionDefault FLOWX.AI value (can be overwritten)
SPRING_SERVLET_MULTIPART_MAX_FILE_SIZEMaximum file size allowed for uploads50MB
SPRING_SERVLET_MULTIPART_MAX_REQUEST_SIZEMaximum request size allowed for uploads50MB

Configuring Advancing controller

To use advancing controller, the following env vars are needed for process-engine to connect to Advancing Postgres DB:

Environment variableDescription
ADVANCING_DATASOURCE_JDBC_URLSpecifies the connection URL for a JDBC data source, including the server, port, database name, and other parameters
ADVANCING_DATASOURCE_USERNAMEUsed to authenticate the user’s access to the data source
ADVANCING_DATASOURCE_PASSWORDSets the password for a data source connection

Advancing controller setup

Configuring cleanup mechanism

This section contains environment variables that configure the scheduler’s behavior, including thread count, cron jobs for data partitioning, process cleanup, and master election.

Environment VariableDescriptionDefault ValuePossible Values
SCHEDULER_THREADSNumber of threads for the scheduler.10Integer values (e.g., 10, 20)
SCHEDULER_PROCESS_CLEANUP_ENABLEDActivates the cron job for process cleanup.falsetrue, false
SCHEDULER_PROCESS_CLEANUP_CRON_EXPRESSIONCron expression for the process cleanup scheduler.0 */5 0-5 * * ? -> every day during the night, every 5 minutes, at the start of the minute.Cron expression (e.g., 0 0 1 * * ?)
SCHEDULER_PROCESS_CLEANUP_BATCH_SIZENumber of processes to be cleaned up in one batch.1000Integer values (e.g., 100, 1000)
SCHEDULER_MASTER_ELECTION_CRON_EXPRESSIONCron expression for the master election process.30 */3 * * * ? -> master election every 3 minutesCron expression (e.g., 0 0/3 * * * ?)

Managing Subprocesses Expiration

This section details the environment variable that controls the expiration of subprocesses within a parent process. It determines whether subprocesses should terminate when the parent process expires or follow their own expiration settings.

Environment VariableDescriptionDefault ValuePossible Values
FLOWX_PROCESS_EXPIRE_SUBPROCESSESGoverns subprocess expiration in a parent process. When true, terminates all associated subprocesses upon parent process expiration. When false, subprocesses follow their individual expiration settings or persist indefinitely if not configured.truetrue, false

Partitioning and archiving

The Partitioning and Archiving feature is optional and can be configured as needed.

For more details about process instance partitioning and archiving, including configuration of the feature, see:

Process instance partitioning and archiving