Infrastructure prerequisites

Before initiating the FlowX.AI Engine, ensure the following infrastructure components are properly installed and configured:

  • Docker Engine: Version 17.06 or higher.
  • Kafka: Version 2.8 or higher.
  • Elasticsearch: Version 7.11.0 or higher.
  • Database Instance: Properly configured and accessible.

Dependencies

The FlowX Engine interacts with various components critical for its operation:

For a microservices architecture, it’s common for services to manage their data via dedicated databases.

Example: Basic PostgreSQL Configuration

Configure PostgreSQL using the Helm chart as demonstrated below:

onboardingdb:
  existingSecret: {{secretName}}
  metrics:
    enabled: true
    service:
      annotations:
        prometheus.io/port: {{prometheus port}}
        prometheus.io/scrape: "true"
      type: ClusterIP
    serviceMonitor:
      additionalLabels:
        release: prometheus-operator
      enabled: true
      interval: 30s
      scrapeTimeout: 10s
  persistence:
    enabled: true
    size: 1Gi
  postgresqlDatabase: onboarding
  postgresqlExtendedConf:
    maxConnections: 200
    sharedBuffers: 128MB
  postgresqlUsername: postgres
  resources:
    limits:
      cpu: 6000m
      memory: 2048Mi
    requests:
      cpu: 200m
      memory: 512Mi

Required External Services

  • Redis Cluster: Essential for caching process definitions, compiled scripts, and Kafka responses.
  • Kafka Cluster: Serves as the communication backbone with external plugins and integration.

Configuration Setup

FlowX.AI Engine utilizes environment variables for configuration. Below are the key environment variables you need to configure:

Authorization & access roles

This section outlines the OAuth2 configuration settings for securing the Spring application, including resource server settings, security type, and access authorizations.

Resource server settings (OAuth2 configuration)

Old configuration from < v4.1 releases (will be deprecated in v4.5):

  • SECURITY_OAUTH2_BASE_SERVER_URL: The base URL for the OAuth 2.0 Authorization Server, which is responsible for authentication and authorization for clients and users, it is used to authorize clients, as well as to issue and validate access tokens.
  • SECURITY_OAUTH2_CLIENT_CLIENT_ID: A unique identifier for a client application that is registered with the OAuth 2.0 Authorization Server, this is used to authenticate the client application when it attempts to access resources on behalf of a user.
  • SECURITY_OAUTH2_CLIENT_CLIENT_SECRET: Secret Key that is used to authenticate requests made by an authorization client.
  • SECURITY_OAUTH2_REALM: Security configuration env var in the Spring Security OAuth2 framework, it is used to specify the realm name used when authenticating with OAuth2 providers.

New configuration, starting from v4.1 release, available below.

Environment variableDescriptionDefault Value
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_INTROSPECTION_URIURI for token introspection to validate opaque tokens${security.oauth2.base-server-url}/realms/${security.oauth2.realm}/protocol/openid-connect/token/introspect
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_CLIENT_IDClient ID for token introspection${security.oauth2.client.client-id}
SPRING_SECURITY_OAUTH2_RESOURCE_SERVER_OPAQUE_TOKEN_CLIENT_SECRETClient secret for token introspection${security.oauth2.client.client-secret}

Service account settings

This section contains the environment variables for configuring process engine service account.

Environment VariableDescriptionDefault Value
SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_IDClient ID for the service accountflowx-${SPRING_APPLICATION_NAME}-sa
SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_SECRETClient secret for the service account

More details about the necessary service account can be found here.

Security configuration

Environment variableDescriptionDefault Value
SECURITY_TYPEType of security mechanism usedoauth2
SECURITY_BASIC_ENABLEDEnable basic authenticationfalse
SECURITY_PUBLIC_PATHS_0List of public paths that do not require authentication/api/platform/components-versions
SECURITY_PUBLIC_PATHS_1List of public paths that do not require authentication/manage/actuator/health
SECURITY_PATH_AUTHORIZATIONS_0_PATHDefines a security path or endpoint pattern. It specifies that the security settings apply to all paths under the "/api/" path. The ** is a wildcard that means it includes all subpaths under "/api/**"."/api/**"
SECURITY_PATH_AUTHORIZATIONS_0_ROLES_ALLOWEDSpecifies the roles allowed for accessing the specified path. In this case, the roles allowed are empty (""). This might imply that access to the "/api/**" paths is open to all users or that no specific roles are required for authorization."ANY_AUTHENTICATED_USER"

Configuring Kafka

Kafka handles all communication between the FlowX.AI Engine and external plugins and integrations. It is also used for notifying running process instances when certain events occur.

Kafka connection settings

Environment VariableDescriptionDefault Value
SPRING_KAFKA_BOOTSTRAP_SERVERSKafka bootstrap serverslocalhost:9092
SPRING_KAFKA_SECURITY_PROTOCOLSecurity protocol for Kafka"PLAINTEXT"

Kafka consumer retry settings

Environment VariableDescriptionDefault Value
KAFKA_AUTH_EXCEPTION_RETRY_INTERVALInterval between retries after AuthorizationException is thrown.10

Consumer groups & consumer threads configuration

Both a producer and a consumer must be configured:

Consumer groups & consumer threads

In Kafka a consumer group is a group of consumers that jointly consume and process messages from one or more Kafka topics. Each consumer group has a unique identifier called a group ID, which is used by Kafka to manage message consumption and distribution among the members of the group.

Thread numbers, on the other hand, refer to the number of threads that a consumer application uses to process messages from Kafka. By default, each consumer instance runs in a single thread, which can limit the throughput of message processing. Increasing the number of consumer threads can help to improve the parallelism and efficiency of message consumption, especially when dealing with high message volumes.

Both group IDs and thread numbers can be configured in Kafka to optimize the processing of messages according to specific requirements, such as message volume, message type, and processing latency.

The configuration related to consumers (group ids and thread numbers) can be configured separately for each message type as it follows:

Consumer group configuration

Environment VariableDescriptionDefault Value
KAFKA_CONSUMER_GROUP_ID_NOTIFY_ADVANCEGroup ID for notifying advance actionsnotif123-preview
KAFKA_CONSUMER_GROUP_ID_NOTIFY_PARENTGroup ID for notifying when a subprocess is blockednotif123-preview
KAFKA_CONSUMER_GROUP_ID_ADAPTERSGroup ID for messages related to adaptersnotif123-preview
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_RUN_ACTIONGroup ID for running scheduled actionsnotif123-preview
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_ADVANCINGGroup ID for messages indicating continuing advancementnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_STARTGroup ID for starting processesnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_START_FOR_EVENTGroup ID for starting processes for an eventnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_EXPIREGroup ID for expiring processesnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_OPERATIONSGroup ID for processing operations from Task Management pluginnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_BATCH_PROCESSINGGroup ID for processing bulk operations from Task Management pluginnotif123-preview

Consumer thread configuration

Environment VariableDescriptionDefault Value
KAFKA_CONSUMER_THREADS_NOTIFY_ADVANCENumber of threads for notifying advance actions6
KAFKA_CONSUMER_THREADS_NOTIFY_PARENTNumber of threads for notifying when a subprocess is blocked6
KAFKA_CONSUMER_THREADS_ADAPTERSNumber of threads for processing messages related to adapters6
KAFKA_CONSUMER_THREADS_SCHEDULER_ADVANCINGNumber of threads for continuing advancement6
KAFKA_CONSUMER_THREADS_SCHEDULER_RUN_ACTIONNumber of threads for running scheduled actions6
KAFKA_CONSUMER_THREADS_PROCESS_STARTNumber of threads for starting processes6
KAFKA_CONSUMER_THREADS_PROCESS_START_FOR_EVENTNumber of threads for starting processes for an event2
KAFKA_CONSUMER_THREADS_PROCESS_EXPIRENumber of threads for expiring processes6
KAFKA_CONSUMER_THREADS_PROCESS_OPERATIONSNumber of threads for processing operations from task management6
KAFKA_CONSUMER_THREADS_PROCESS_BATCH_PROCESSINGNumber of threads for processing bulk operations from task management6

It is important to know that all the events that start with a configured pattern will be consumed by the Engine. This makes it possible to create a new integration and connect it to the engine without changing the configuration.

Configuring Kafka topics

The suggested topic pattern naming convention is the following:

 topic:
    naming:
      package: "ai.flowx."
      environment: "dev."
      version: ".v1"
      prefix: ${kafka.topic.naming.package}${kafka.topic.naming.environment}
      suffix: ${kafka.topic.naming.version}
      engineReceivePattern: engine.receive.

    pattern: ${kafka.topic.naming.prefix}${kafka.topic.naming.engineReceivePattern}*
Environment VariableDescriptionDefault FlowX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_NOTIFY_ADVANCEKafka topic used internally by the Engine for advancing processesai.flowx.dev.core.notify.advance.process.v1
KAFKA_TOPIC_PROCESS_NOTIFY_PARENTKafka topic used for sub-processes to notify the parent processai.flowx.dev.core.notify.parent.process.v1
KAFKA_TOPIC_PATTERNThe topic name pattern that the Engine listens on for incoming eventsai.flowx.dev.engine.receive.*
KAFKA_TOPIC_LICENSE_OUTThe topic name used by the Engine to generate licensing-related detailsai.flowx.dev.core.trigger.save.license.v1
Default parameter (env var)DescriptionDefault FlowX.AI value (can be overwritten)
KAFKA_TOPIC_TASK_OUTKafka topic used for sending notifications to the pluginai.flowx.dev.plugin.tasks.trigger.save.task.v1
KAFKA_TOPIC_PROCESS_OPERATIONS_INKafka topic used for receiving calls from the task management plugin withai.flowx.dev.core.trigger.operations.v1
information regarding operations performed
KAFKA_TOPIC_PROCESS_OPERATIONS_BULK_INKafka topic where operations can be performed in bulk, allowing multipleai.flowx.core.trigger.operations.bulk.v1
operations to be sent at once

OPERATIONS_IN request example

{
  "operationType": "UNASSIGN", //type of operation performed in Task Management plugin
  "taskId": "some task id",
  "processInstanceUuid": "1cff0b7d-966b-4b35-9e9b-63b1d6757ec6",
  "swimlaneName": "Default",
  "swimlaneId": "51ec1241-fe06-4576-9c84-31598c05c527",
  "owner": {
    "firstName": null,
    "lastName": null,
    "username": "service-account-flowx-process-engine-account",
    "enabled": false
  },
  "author": "admin@flowx.ai"
}

BULK_IN request example

{

  "operations": [
    {
      "operationType": "HOLD",
      "taskId": "some task id",
      "processInstanceUuid": "d3aabfd8-d041-4c62-892f-22d17923b223", // the id of the process instance
      "swimlaneName": "Default", //name of the swimlane
      "owner": null,
      "author": "john.doe@flowx.ai",
    },

    {
      "operationType": "HOLD",
      "taskId": "some task id",
      "processInstanceUuid": "d3aabfd8-d041-4c62-892f-22d17923b223",
      "swimlaneName": "Default", //name of the swimlane
      "owner": null,
      "author": "john.doe@flowx.ai",
    }
  ]
}      

If you need to send additional keys on the response, attach them in the header, as in the following example, where we used requestID key.

A response should be sent on a callbackTopic if it is mentioned in the headers, as in the following example:

{"processInstanceId": ${processInstanceId}, "callbackTopic": "test.operations.out", "requestID":"1234567890"}

Task manager operations could be the following: assignment, unassignment, hold, unhold, terminate and it is matched with the ...operations.out topic on the engine side. For more information check the Task Management plugin documentation:

📄 Task management plugin

Scheduler

Environment variableDescriptionDefault FlowX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_EXPIRE_INTopic name for requests to expire processesai.flowx.dev.core.trigger.expire.process.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_SETTopic name used for scheduling process expirationai.flowx.dev.core.trigger.set.schedule.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_STOPTopic name used for stopping process expirationai.flowx.dev.core.trigger.stop.schedule.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_RUN_ACTIONTopic name for requests to run scheduled actionsai.flowx.dev.core.trigger.run.action.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_ADVANCETopic name for events related to advancing through a database sent by the schedulerai.flowx.dev.core.trigger.advance.process.v1

Using the scheduler

Environment variableDescriptionDefault FlowX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_SETUsed to communicate with Scheduler microserviceai.flowx.dev.core.trigger.set.timer-event-schedule.v1
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_STOPUsed to communicate with Scheduler microserviceai.flowx.dev.core.trigger.stop.timer-event-schedule.v1
Environment variableDescriptionDefault FlowX.AI value (can be overwritten)
KAFKA_TOPIC_DATA_SEARCH_INThe topic name that the Engine listens on for requests to search for processesai.flowx.dev.core.trigger.search.data.v1
KAFKA_TOPIC_DATA_SEARCH_OUTThe topic name used by the Engine to reply after finding a processai.flowx.dev.engine.receive.core.search.data.results.v1
Environment variableDescriptionDefault FlowX.AI value (can be overwritten)
KAFKA_TOPIC_AUDIT_OUTTopic key for sending audit logsai.flowx.dev.core.save.audit.v1
Environment variableDefault FlowX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_INDEX_OUTai.flowx.dev.core.index.process.v1

Processes that can be started by sending messages to a Kafka topic

Environment variableDescriptionDefault FlowX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_START_INThe Engine listens on this topic for requests to start a new process instanceai.flowx.dev.core.trigger.start.process.v1
KAFKA_TOPIC_PROCESS_START_OUTUsed for sending out the reply after starting a new process instanceai.flowx.dev.core.confirm.start.process.v1
Environment variableDefault FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_EVENT_MESSAGEai.flowx.dev.core.message.event.process.v1
KAFKA_TOPIC_PROCESS_START_FOR_EVENTai.flowx.dev.core.trigger.start-for-event.process.v1
Environment variableDescriptionDefault FlowX.AI value (can be overwritten)
KAFKA_TOPIC_EVENTSGATEWAY_OUT_MESSAGEOutgoing messages from process-engine to events-gatewayai.flowx.eventsgateway.engine.commands.message.v1
KAFKA_TOPIC_EVENTSGATEWAY_OUT_DISCONNECTDisconnect commands from process-engine to events-gatewayai.flowx.eventsgateway.engine.commands.disconnect.v1
KAFKA_TOPIC_EVENTSGATEWAY_OUT_CONNECTConnect commands from process-engine to events-gatewayai.flowx.eventsgateway.engine.commands.connect.v1

Configuring file upload size

The maximum file size allowed for uploads can be set by using the following environment variables:

Environment variableDescriptionDefault FlowX.AI value (can be overwritten)
SPRING_SERVLET_MULTIPART_MAX_FILE_SIZEMaximum file size allowed for uploads50MB
SPRING_SERVLET_MULTIPART_MAX_REQUEST_SIZEMaximum request size allowed for uploads50MB

Connecting the Advancing controller

To use advancing controller, the following env vars are needed for process-engine to connect to Advancing Postgres DB:

Environment variableDescription
ADVANCING_DATASOURCE_JDBC_URLSpecifies the connection URL for a JDBC data source, including the server, port, database name, and other parameters
ADVANCING_DATASOURCE_USERNAMEUsed to authenticate the user’s access to the data source
ADVANCING_DATASOURCE_PASSWORDSets the password for a data source connection

Configuring the Advancing controller

Environment variableDescriptionDefault FlowX.AI value (can be overwritten)
ADVANCING_TYPESpecifies the type of advancing mechanism to be used. The advancing can be done either through Kafka or through the database (parallel)PARALLEL (possible values #enum: KAFKA, PARALLEL)
ADVANCING_THREADSNumber of parallel threads to be used20
ADVANCING_PICKING_BATCH_SIZENumber of tasks to pick in each batch10
ADVANCING_PICKING_PAUSE_MILLISPause duration between picking batches, in milliseconds. After picking a batch of tasks, the system will wait for 100 milliseconds before picking the next batch. This can help in controlling the rate of task intake and processing100
ADVANCING_COOLDOWN_AFTER_SECONDSCooldown period after processing a batch, in seconds. The system will wait for 120 seconds after completing a batch before starting the next cycle. This can be useful for preventing system overload and managing resource usage120
ADVANCING_SCHEDULER_HEARTBEAT_CRONEXPRESSIONA cron expression that defines the schedule for the heartbeat. The scheduler’s heartbeat will trigger every 2 seconds. This frequent heartbeat can be used to ensure the system is functioning correctly and tasks are being processed as expected"*/2 * * * * ?"

Advancing controller setup

Configuring cleanup mechanism

This section contains environment variables that configure the scheduler’s behavior, including thread count, cron jobs for data partitioning, process cleanup, and master election.

Environment VariableDescriptionDefault ValuePossible Values
SCHEDULER_THREADSNumber of threads for the scheduler10Integer values (e.g., 10, 20)
SCHEDULER_PROCESS_CLEANUP_ENABLEDActivates the cron job for process cleanupfalsetrue, false
SCHEDULER_PROCESS_CLEANUP_CRON_EXPRESSIONCron expression for the process cleanup scheduler0 */5 0-5 * * ? -> every day during the night, every 5 minutes, at the start of the minuteCron expression (e.g., 0 0 1 * * ?)
SCHEDULER_PROCESS_CLEANUP_BATCH_SIZENumber of processes to be cleaned up in one batch1000Integer values (e.g., 100, 1000)
SCHEDULER_MASTER_ELECTION_CRON_EXPRESSIONCron expression for the master election process30 */3 * * * ? -> master election every 3 minutesCron expression (e.g., 0 0/3 * * * ?)

Managing subprocesses expiration

This section details the environment variable that controls the expiration of subprocesses within a parent process. It determines whether subprocesses should terminate when the parent process expires or follow their own expiration settings.

Environment VariableDescriptionDefault ValuePossible Values
FLOWX_PROCESS_EXPIRE_SUBPROCESSESGoverns subprocess expiration in a parent process. When true, terminates all associated subprocesses upon parent process expiration. When false, subprocesses follow their individual expiration settings or persist indefinitely if not configuredtruetrue, false

Configuring application management

The FlowX helm chart provides a management service with the necessary parameters to integrate with the Prometheus operator. However, this integration is disabled by default.

Old configuration from < v4.1 releases (will be deprecated in v4.5):

  • MANAGEMENT_METRICS_EXPORT_PROMETHEUS_ENABLED: Enables or disables Prometheus metrics export.

New configuration, starting from v4.1 release, available below. Note that this setup is backwards compatible, it does not affect the configuration from v3.4.x. The configuration files will still work until v4.5 release.

To configure Prometheus metrics export for the FlowX.AI Engine, the following environment variable is required:

Environment VariableDescriptionDefault ValuePossible Values
MANAGEMENT_PROMETHEUS_METRICS_EXPORT_ENABLEDEnables or disables Prometheus metrics export.falsetrue, false