Skip to main content
Version: 3.4.x

FLOWX.AI Engine setup guide

Introductionโ€‹

This guide will provide instructions on how to set up and configure the FLOWX.AI Engine to meet your specific requirements.

Infrastructure prerequisitesโ€‹

The FLOWX.AI Engine requires the following components to be set up before it can be started:

  • Docker engine - version 17.06 or higher
  • Kafka - version 2.8 or higher
  • Elasticsearch - version 7.11.0 or higher
  • DB instance

Dependencies

For Microservices architecture, some Microservices holds their data individually using separate Databases.

A basic Postgres configuration can be set up using a helm values.yaml file as it follows:

  • helm values.yaml:

      onboardingdb:
    existingSecret: {{secretName}}
    metrics:
    enabled: true
    service:
    annotations:
    prometheus.io/port: {{prometheus port}}
    prometheus.io/scrape: "true"
    type: ClusterIP
    serviceMonitor:
    additionalLabels:
    release: prometheus-operator
    enabled: true
    interval: 30s
    scrapeTimeout: 10s
    persistence:
    enabled: true
    size: 1Gi
    postgresqlDatabase: onboarding
    postgresqlExtendedConf:
    maxConnections: 200
    sharedBuffers: 128MB
    postgresqlUsername: postgres
    resources:
    limits:
    cpu: 6000m
    memory: 2048Mi
    requests:
    cpu: 200m
    memory: 512Mi

Dependenciesโ€‹

  • Redis server - a Redis cluster is required for the engine to cache process definitions, compiled scripts, and Kafka responses
  • Kafka cluster - Kafka is the backbone of the engine and all plugins and integrations are accessed via the Kafka broker
  • Additional dependencies - details about how to set up logging via Elasticsearch, monitoring, and tracing via Jaeger, can be found here

Configurationโ€‹

Authorization & access rolesโ€‹

  • SECURITY_TYPE: Indicates that OAuth 2.0 is the chosen security type, default value: oauth2.
security:
type: oauth2
  • SECURITY_PATHAUTHORIZATIONS_0_PATH: Defines a security path or endpoint pattern. It specifies that the security settings apply to all paths under the "/api/" path. The ** is a wildcard that means it includes all subpaths under "/api/**".
  • SECURITY_PATHAUTHORIZATIONS_0_ROLESALLOWED: Specifies the roles allowed for accessing the specified path. In this case, the roles allowed are empty (""). This might imply that access to the "/api/**" paths is open to all users or that no specific roles are required for authorization.
   pathAuthorizations:
- path: "/api/**"
rolesAllowed: "ANY_AUTHENTICATED_USER"
  • SECURITY_OAUTH2_BASE_SERVER_URL: The base URL for the OAuth 2.0 Authorization Server, which is responsible for authentication and authorization for clients and users, it is used to authorize clients, as well as to issue and validate access tokens.
  • SECURITY_OAUTH2_CLIENT_CLIENT_ID: A unique identifier for a client application that is registered with the OAuth 2.0 Authorization Server, this is used to authenticate the client application when it attempts to access resources on behalf of a user.
  • SECURITY_OAUTH2_CLIENT_CLIENT_SECRET: Secret Key that is used to authenticate requests made by an authorization client.
  • SECURITY_OAUTH2_REALM: Security configuration env var in the Spring Security OAuth2 framework, it is used to specify the realm name used when authenticating with OAuth2 providers.
  • SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_ID: The process engine requires a process engine service account to make direct calls to the Keycloak API. This setting specifies the service account that is essential so the of Start Catch Event node is possible. Ensure that you provide the correct client ID for this service account.
  • SECURITY_OAUTH2_SERVICE_ACCOUNT_ADMIN_CLIENT_SECRET: Along with the client ID, you must also specify the client secret associated with the service account for proper authentication.
 oauth2:
base-server-url: http://localhost:8080/auth
realm: flowx
client:
client-id: flowx-platform-authorize
client-secret: somesecret
service-account:
admin:
client-id: flowx-process-engine-sa
client-secret: somesecret

More details about the necessary service account, here:

ยปProcess engine service account

Configuring Kafkaโ€‹

Kafka handles all communication between the FLOWX.AI Engine and external plugins and integrations. It is also used for notifying running process instances when certain events occur.

Both a producer and a consumer must be configured. The following Kafka-related configurations can be set by using environment variables:

  • SPRING_KAFKA_BOOTSTRAP_SERVERS - the address of the Kafka server, it should be in the format "host:port"

  • KAFKA_AUTH_EXCEPTION_RETRY_INTERVAL - the interval between retries after AuthorizationException is thrown by Kafka consumer

  • KAFKA_MESSAGE_MAX_BYTES - this is the largest size of the message that can be received by the broker from a producer.

Consumer groups & consumer threadsโ€‹

In Kafka a consumer group is a group of consumers that jointly consume and process messages from one or more Kafka topics. Each consumer group has a unique identifier called a group ID, which is used by Kafka to manage message consumption and distribution among the members of the group.

Thread numbers, on the other hand, refer to the number of threads that a consumer application uses to process messages from Kafka. By default, each consumer instance runs in a single thread, which can limit the throughput of message processing. Increasing the number of consumer threads can help to improve the parallelism and efficiency of message consumption, especially when dealing with high message volumes.

Both group IDs and thread numbers can be configured in Kafka to optimize the processing of messages according to specific requirements, such as message volume, message type, and processing latency.

The configuration related to consumers (group ids and thread numbers) can be configured separately for each message type as it follows:

  • KAFKA_CONSUMER_GROUP_ID_NOTIFY_ADVANCE - related to a Kafka consumer group that receives messages related to notifying advance actions, it is used to configure the group ID for this consumer group

  • KAFKA_CONSUMER_GROUP_ID_NOTIFY_PARENT - related to a Kafka consumer group that receives messages related to notifying when a subprocess is blocked and it sends a notification to the parent process, it is used to configure the group ID for this consumer group

  • KAFKA_CONSUMER_GROUP_ID_ADAPTERS - related to a Kafka consumer group that receives messages related to adapters, it is used to configure the group ID for this consumer group

  • KAFKA_CONSUMER_GROUP_ID_SCHEDULER_RUN_ACTION - related to a Kafka consumer group that receives messages related to requests to run scheduled actions, it is used to configure the group ID for this consumer group

  • KAFKA_CONSUMER_GROUP_ID_SCHEDULER_ADVANCING- related to a Kafka consumer group that receives messages related to messages sent by the scheduler when a timeout expires, indicating that the advancing should continue (parallel advancing), it is used to configure the group ID for this consumer group

  • KAFKA_CONSUMER_GROUP_ID_MESSAGE_EVENTS -

  • KAFKA_CONSUMER_GROUP_ID_PROCESS_START - related to a Kafka consumer group that receives messages related to starting processes, it is used to configure the group ID for this consumer group

  • KAFKA_CONSUMER_GROUP_ID_PROCESS_START_FOR_EVENT -

  • KAFKA_CONSUMER_GROUP_ID_PROCESS_EXPIRE - related to expiring processes, it is used to configure the group ID for this consumer group

  • KAFKA_CONSUMER_GROUP_ID_PROCESS_OPERATIONS - related to a Kafka consumer group that receives messages related to processing operations from task management, it is used to configure the group ID for this consumer group

  • KAFKA_CONSUMER_GROUP_ID_PROCESS_BATCH_PROCESSING- related to a Kafka consumer group that receives messages related to processing bulk operations from task management, it is used to configure the group ID for this consumer group

  • KAFKA_CONSUMER_THREADS_NOTIFY_ADVANCE - the number of threads used by a Kafka consumer application to notify the Kafka broker about the progress of

  • KAFKA_CONSUMER_THREADS_NOTIFY_PARENT - the number of threads used by a Kafka consumer application, related to a process when it is blocked

  • KAFKA_CONSUMER_THREADS_ADAPTERS- the number of threads used by a Kafka consumer application, related to adapters

  • KAFKA_CONSUMER_THREADS_SCHEDULER_RUN_ACTION - the number of threads used by a Kafka consumer application, related to requests to run scheduled actions

  • KAFKA_CONSUMER_THREADS_SCHEDULER_ADVANCING - the number of threads used by a Kafka consumer application, related to messages sent by the scheduler when a timeout expires, indicating that the advancing should continue (parallel advancing), it is used to configure the group ID for this consumer group

  • KAFKA_CONSUMER_THREADS_PROCESS_START - the number of threads used by a Kafka consumer application, related to starting processes

  • KAFKA_CONSUMER_THREADS_PROCESS_START_FOR_EVENT -

  • KAFKA_CONSUMER_THREADS_PROCESS_EXPIRE - the number of threads used by a Kafka consumer application, related to expiring processes

  • KAFKA_CONSUMER_THREADS_PROCESS_OPERATIONS - the number of threads used by a Kafka consumer application, related to processing operations from task management

  • KAFKA_CONSUMER_THREADS_PROCESS_BATCH_PROCESSING - the number of threads used by a Kafka consumer application, related to processing bulk operations from task management

Default parameter (env var)Default FLOWX.AI value (can be overwritten)
KAFKA_CONSUMER_GROUP_ID_NOTIFY_ADVANCEnotif123-preview
KAFKA_CONSUMER_GROUP_ID_NOTIFY_PARENTnotif123-preview
KAFKA_CONSUMER_GROUP_ID_ADAPTERSnotif123-preview
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_RUN_ACTIONnotif123-preview
KAFKA_CONSUMER_GROUP_ID_SCHEDULER_ADVANCINGnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_STARTnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_START_FOR_EVENTnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_EXPIREnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_OPERATIONSnotif123-preview
KAFKA_CONSUMER_GROUP_ID_PROCESS_BATCH_PROCESSINGnotif123-preview
KAFKA_CONSUMER_THREADS_NOTIFY_ADVANCE6
KAFKA_CONSUMER_THREADS_NOTIFY_PARENT6
KAFKA_CONSUMER_THREADS_ADAPTERS6
KAFKA_CONSUMER_THREADS_SCHEDULER_ADVANCING6
KAFKA_CONSUMER_THREADS_SCHEDULER_RUN_ACTION6
KAFKA_CONSUMER_THREADS_PROCESS_START6
KAFKA_CONSUMER_THREADS_PROCESS_START_FOR_EVENT2
KAFKA_CONSUMER_THREADS_PROCESS_EXPIRE6
KAFKA_CONSUMER_THREADS_PROCESS_OPERATIONS6
KAFKA_CONSUMER_THREADS_PROCESS_BATCH_PROCESSING6

It is important to know that all the events that start with a configured pattern will be consumed by the engine. This makes it possible to create a new integration and connect it to the engine without changing the configuration of the engine.

info

The suggested topic pattern naming convention is the following:

 topic:
naming:
package: "ai.flowx."
environment: "dev."
version: ".v1"
prefix: ${kafka.topic.naming.package}${kafka.topic.naming.environment}
suffix: ${kafka.topic.naming.version}
engineReceivePattern: engine.receive.

pattern: ${kafka.topic.naming.prefix}${kafka.topic.naming.engineReceivePattern}*

  • KAFKA_TOPIC_PROCESS_NOTIFY_ADVANCE - Kafka topic used internally by the engine

  • KAFKA_TOPIC_PROCESS_NOTIFY_PARENT - topic used for sub-processes to notify parent process when finished

  • KAFKA_TOPIC_PATTERN - the topic name pattern that the Engine listens on for incoming Kafka events

  • KAFKA_TOPIC_LICENSE_OUT - the topic name used by the Engine to generate licensing-related details

Default parameter (env var)Default FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_NOTIFY_ADVANCEai.flowx.dev.core.notify.advance.process.v1
KAFKA_TOPIC_PROCESS_NOTIFY_PARENTai.flowx.dev.core.notify.parent.process.v1
KAFKA_TOPIC_PATTERNai.flowx.dev.engine.receive.*
KAFKA_TOPIC_LICENSE_OUTai.flowx.dev.core.trigger.save.license.v1
  • KAFKA_TOPIC_TASK_OUT - used for sending notifications to the plugin

  • KAFKA_TOPIC_PROCESS_OPERATIONS_IN - used for receiving calls from the task management plugin with information regarding operations performed

Request exampleโ€‹
{
"operationType": "UNASSIGN", //type of operation performed in Task Management plugin
"taskId": "some task id",
"processInstanceUuid": "1cff0b7d-966b-4b35-9e9b-63b1d6757ec6",
"swimlaneName": "Default",
"swimlaneId": "51ec1241-fe06-4576-9c84-31598c05c527",
"owner": {
"firstName": null,
"lastName": null,
"username": "service-account-flowx-process-engine-account",
"enabled": false
},
"author": "admin@flowx.ai"
}
  • KAFKA_TOPIC_PROCESS_OPERATIONS_BULK_IN - on this topic, you can perform operations from the "KAFKA_TOPIC_PROCESS_OPERATIONS_IN" topic and send them as an array, allowing you to send multiple operations at once
Request exampleโ€‹
{

"operations": [
{
"operationType": "HOLD",
"taskId": "some task id",
"processInstanceUuid": "d3aabfd8-d041-4c62-892f-22d17923b223", // the id of the process instance
"swimlaneName": "Default", //name of the swimlane
"owner": null,
"author": "john.doe@flowx.ai",
},

{
"operationType": "HOLD",
"taskId": "some task id",
"processInstanceUuid": "d3aabfd8-d041-4c62-892f-22d17923b223",
"swimlaneName": "Default", //name of the swimlane
"owner": null,
"author": "john.doe@flowx.ai",
}
]
}
info

If you need to send additional keys on the response, attach them in the header, as in the following example, where we used requestID key.

info

A response should be sent on a callbackTopic if it is mentioned in the headers, as in the following example:

{"processInstanceId": ${processInstanceId}, "callbackTopic": "test.operations.out", "requestID":"1234567890"}
Default parameter (env var)Default FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_TASK_OUTai.flowx.dev.plugin.tasks.trigger.save.task.v1
KAFKA_TOPIC_PROCESS_OPERATIONS_INai.flowx.dev.core.trigger.operations.v1
KAFKA_TOPIC_PROCESS_OPERATIONS_BULK_INai.flowx.core.trigger.operations.bulk.v1
info

Task manager operations could be the following: assignment, unassignment, hold, unhold, terminate and it is matched with the ...operations.out topic on the engine side. For more information check the Task Management plugin documentation:

๐Ÿ“„Task management plugin

ยปScheduler
  • KAFKA_TOPIC_PROCESS_EXPIRE_IN - the topic name that the Engine listens on for requests to expire processes

  • KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_SET - the topic name used by the Engine to schedule a process expiration

  • KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_STOP - the topic name used by the Engine to stop a process expiration

  • KAFKA_TOPIC_PROCESS_SCHEDULE_IN_RUN_ACTION - the topic name that the Engine listens on for requests to run scheduled actions

  • KAFKA_TOPIC_PROCESS_SCHEDULE_IN_ADVANCE - the topic name where Engine listens for events, where scheduler sends messages related to advancing through a database

Default parameter (env var)Default FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_EXPIRE_INai.flowx.dev.core.trigger.expire.process.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_SETai.flowx.dev.core.trigger.set.schedule.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_OUT_STOPai.flowx.dev.core.trigger.stop.schedule.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_RUN_ACTIONai.flowx.dev.core.trigger.run.action.v1
KAFKA_TOPIC_PROCESS_SCHEDULE_IN_ADVANCEai.flowx.dev.core.trigger.advance.process.v1
ยปUsing the scheduler
Default parameter (env var)Default FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_SETai.flowx.dev.core.trigger.set.timer-event-schedule.v1
KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_STOPai.flowx.dev.core.trigger.stop.timer-event-schedule.v1
  • KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_SET - used to communicate with Scheduler microservice

  • KAFKA_TOPIC_PROCESS_SCHEDULED_TIMER_EVENTS_OUT_STOP - used to communicate with Scheduler microservice

  • KAFKA_TOPIC_DATA_SEARCH_IN - the topic name that the Engine listens on for requests to search for processes

  • KAFKA_TOPIC_DATA_SEARCH_OUT - the topic name used by the Engine to reply after finding a process

Default parameter (env var)Default FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_DATA_SEARCH_INai.flowx.dev.core.trigger.search.data.v1
KAFKA_TOPIC_DATA_SEARCH_OUTai.flowx.dev.engine.receive.core.search.data.results.v1
  • KAFKA_TOPIC_AUDIT_OUT - topic key for sending audit logs
Default parameter (env var)Default FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_AUDIT_OUTai.flowx.dev.core.save.audit.v1
Default parameter (env var)Default FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_INDEX_OUTai.flowx.dev.core.index.process.v1

Processes that can be started by sending messages to a Kafka topicโ€‹

  • KAFKA_TOPIC_PROCESS_START_IN - the Engine listens on this topic for requests to start a new process instance

  • KAFKA_TOPIC_PROCESS_START_OUT - used for sending out the reply after starting a new process instance

Default parameter (env var)Default FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_START_INai.flowx.dev.core.trigger.start.process.v1
KAFKA_TOPIC_PROCESS_START_OUTai.flowx.dev.core.confirm.start.process.v1
Default parameter (env var)Default FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_EVENT_MESSAGEai.flowx.dev.core.message.event.process.v1
KAFKA_TOPIC_PROCESS_START_FOR_EVENTai.flowx.dev.core.trigger.start-for-event.process.v1

Configuring events-gatewayโ€‹

  • KAFKA_TOPIC_EVENTSGATEWAY_OUT_MESSAGE - outgoing messages from process-engine to events-gateway

  • KAFKA_TOPIC_EVENTSGATEWAY_OUT_DISCONNECT- disconnect commands from process-engine to events-gateway

  • KAFKA_TOPIC_EVENTSGATEWAY_OUT_CONNECT - connect commands from process-engine to events-gateway

Topic NameDefault FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_EVENTSGATEWAY_OUT_MESSAGEai.flowx.eventsgateway.engine.commands.message.v1
KAFKA_TOPIC_EVENTSGATEWAY_OUT_DISCONNECTai.flowx.eventsgateway.engine.commands.disconnect.v1
KAFKA_TOPIC_EVENTSGATEWAY_OUT_CONNECTai.flowx.eventsgateway.engine.commands.connect.v1

Configuring file upload sizeโ€‹

The maximum file size allowed for uploads can be set by using the following environment variables:

  • SPRING_SERVLET_MULTIPART_MAX_FILE_SIZE - maximum file size allowed for uploads

  • SPRING_SERVLET_MULTIPART_MAX_REQUEST_SIZE - maximum request size allowed for uploads

Configuring Advancing controllerโ€‹

To use advancing controller, the following env vars are needed for process-engine to connect to Advancing Postgres DB:

  • ADVANCING_DATASOURCE_JDBC_URL - environment variable used to configure a JDBC (Java database connectivity) data source, it specifies the connection URL for a particular database, including the server, port, database name, and any other connection parameters necessary

  • ADVANCING_DATASOURCE_USERNAME - environment variable used to authenticate the user access to the data source

  • ADVANCING_DATASOURCE_PASSWORD - environment variable used to set the password for a data source connection

ยปAdvancing controller setup

Configuring Schedulerโ€‹

Below you can find a configuration .yaml to use scheduler service together with FLOWX.AI Engine:

scheduler:
threads: 10
processCleanup:
enabled: false
cronExpression: 0 */5 0-5 * * ? #every day during the night, every 5 minutes, at the start of the minute.
batchSize: 1000
masterElection:
cronExpression: 30 */3 * * * ? #master election every 3 minutes

  • processCleanup: A configuration for cleaning up processes.
  • enabled specifies whether this feature is turned on or off.
  • cronExpression is a schedule expression that determines when the cleanup process runs. In this case, it runs every day during the night (between 12:00 AM and 5:59 AM) and every 5 minutes, at the start of the minute.
  • batchSize specifies the number of processes to be cleaned up in one batch.
  • masterElection: A configuration for electing a master.
ยปScheduler setup guide

Managing Subprocesses Expirationโ€‹

The FLOWX_PROCESS_EXPIRE_SUBPROCESSES environment variable governs subprocess expiration in a parent process. When set to true, the parent process's expiration prompts simultaneous termination of all associated subprocesses. Conversely, when set to false, each subprocess follows its individual expiration configuration or persists indefinitely if not configured.

  • FLOWX_PROCESS_EXPIRE_SUBPROCESS - default: true

Was this page helpful?