Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.flowx.ai/llms.txt

Use this file to discover all available pages before exploring further.

Scope. This guide covers process-instance indexing only — the process_instance-* indices written by process-engine. Audit logs (audit-logs index, written directly by audit-core over HTTPS) and search workloads (data-search, read-only) do not flow through this pipeline.

Before you start

  • Read the prerequisites: Review the Intro to Elasticsearch section first
  • Choose your strategy: Decide between Kafka (recommended for production) or HTTP indexing based on your infrastructure
  • Check permissions: Ensure you have access to modify process-engine configurations

Quick decision: Kafka vs HTTP

StrategyBest forProsCons
Kafka (recommended)Production environments with high throughputFire-and-forget communication, time-based partitioning, better performance, decoupled architectureRequires Kafka Connect setup
HTTPDevelopment or simple setupsDirect connection, easier setupBlocking operations, no time-based partitioning, tighter coupling
Why Kafka is recommended: The Kafka strategy allows for fire-and-forget communication, eliminating the need for the process engine to wait for indexing requests to complete. This significantly improves performance in high-throughput scenarios.
Critical difference: Only the Kafka strategy provides out-of-the-box support for time-based partitioning through the transforms.routeTS.timestamp.format configuration (see later in this guide). The HTTP strategy does not support time-based partitioning as a built-in feature.

Configuration overview

All indexing is controlled by these core settings:

Global indexing control

FLOWX_INDEXING_ENABLED=false  # Set only when you want to disable indexing
FLOWX_INDEXING_ENABLED defaults to true. Only set this variable if you want to disable indexing by setting it to false.

Strategy selection

FLOWX_INDEXING_TYPE=kafka  # Options: kafka, http, no-indexing

Performance considerations (FlowX defaults)

Default configuration:
  • Monthly indices: yyyyMM format for time-based partitioning (Kafka only)
  • 2 shards + 0 replicas (code default): bump replicas based on your high-availability requirements
  • Primary shards per year: 24 (2 primary × 12 monthly indices) — well under Elasticsearch’s 1000 shard default limit; each replica adds another 24
Scaling guidelines:
  • If indexing becomes slow: Check physical resources and shard size
  • If monthly indices become too large: Switch to weekly indices (yyyyww)
  • For high parallel indexing load: Add more primary shards
  • High availability: Set FLOWX_ELASTICSEARCH_INDEXSETTINGS_REPLICAS to at least 1 in production for resilience to a single node loss
Important: Each replica is applied per shard, so monitor resource usage when increasing replicas.

HTTP-only setting

FLOWX_INDEXING_OPTIMISTICLOCKINGRETRIES=3  # Only used with HTTP strategy

Step 1: Configure the process engine

Add these environment variables to your process-engine configuration:
# Elasticsearch connection settings
SPRING_ELASTICSEARCH_REST_PROTOCOL=https
SPRING_ELASTICSEARCH_REST_URIS=elasticsearch:9200
SPRING_ELASTICSEARCH_REST_DISABLESSL=false
SPRING_ELASTICSEARCH_REST_USERNAME=
SPRING_ELASTICSEARCH_REST_PASSWORD=

# Use Kafka strategy
FLOWX_INDEXING_TYPE=kafka

# Index settings (optional - these are the defaults)
FLOWX_ELASTICSEARCH_INDEXSETTINGS_NAME=process_instance
FLOWX_ELASTICSEARCH_INDEXSETTINGS_SHARDS=2
FLOWX_ELASTICSEARCH_INDEXSETTINGS_REPLICAS=0
process-engine still needs HTTPS connectivity to Elasticsearch in Kafka mode. Even with FLOWX_INDEXING_TYPE=kafka, process-engine creates the process_instance_template index template directly against Elasticsearch at startup. If process-engine cannot reach Elasticsearch, the template silently fails to apply and indices fall back to default mappings — search and aggregations break in non-obvious ways.

Step 2: Deploy Kafka Connect

Compatibility matrix

ComponentSupported in 5.9.0
Kafka cluster3.9 – 4.2 (set KafkaConnect.spec.version to match)
Elasticsearch Sink Connector plugin15+
Elasticsearch (target cluster)8.x
Kafka Connect runtimematches Kafka cluster version, run by the Strimzi operator
Multiple sink connector implementations also work, as long as they are compatible with both the deployed Kafka and Elasticsearch version deployed. This guide uses the Confluent kafka-connect-elasticsearch plugin as the reference path.
Prerequisites:
  • Kafka cluster (installed with Strimzi operator)
  • Elasticsearch cluster (installed with eck-operator)
  • Convert ES certificates to JKS format (see commands below)
Certificate conversion commands:
# 1. Extract the CA certificate
kubectl get secret elasticsearch-es-http-certs-public -n flowx \
  -o jsonpath='{.data.ca\.crt}' | base64 --decode > es-ca.crt

# 2. Create JKS keystore
keytool -importcert -alias elasticsearch -file es-ca.crt \
  -keystore keystore.jks -storepass flowx123456 -noprompt

# 3. Create Kubernetes secret
kubectl -n flowx create secret generic kafka-connect-elastic-jks --from-file=keystore.jks
Deploy KafkaConnect:
FlowX does not publish a Kafka Connect image. You build and host the container image yourself. Strimzi’s spec.build block below downloads the connector plugin, bakes it into a new image, and pushes it to your registry. Use an immutable tag that ties the image to the plugin version so upgrades are auditable.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: flowx-elasticsearch-kafka-connect
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.9.0  # FlowX 5.9 supports Kafka 3.9 – 4.2; set this to your deployed cluster version
  replicas: 1
  bootstrapServers: flowx-kafka-bootstrap:9092

  # OAuth configuration (if using Keycloak)
  authentication:
    type: oauth
    clientId: flowx-service-client
    clientSecret:
      secretName: keycloak-kafka-cluster-client
      key: KEYCLOAK_KAFKA_CLUSTER_CLIENT_SECRET
    tokenEndpointUri: https://YOUR_KEYCLOAK_URL/auth/realms/YOUR_REALM/protocol/openid-connect/token
    tlsTrustedCertificates:
      - secretName: self-signed-certificate
        certificate: tls.crt
    disableTlsHostnameVerification: true

  config:
    group.id: flowx-kafka-connect-es-plugin
    offset.storage.topic: ai.flowx.kafka-connect-cluster-offsets
    config.storage.topic: ai.flowx.kafka-connect-cluster-configs
    status.storage.topic: ai.flowx.kafka-connect-cluster-status
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
    # Auto-creates Kafka Connect's internal bookkeeping topics (offsets / configs / status).
    # The SOURCE topic (ai.flowx.<env>.core.index.process.v1) is NOT covered here — you must
    # declare it via your Strimzi KafkaTopic resources before starting the connector.
    topic.creation.enable: true
    config.providers: env
    config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider

  build:
    output:
      type: docker
      # YOU build and host this image. FlowX does not publish a Kafka Connect image.
      # Strimzi pulls the upstream Connect base, downloads the plugin below, bakes a new
      # container, and pushes it here. Tag it with the plugin version for traceability.
      image: <YOUR_REGISTRY>/<your-connect-image-name>:<your-tag>
      pushSecret: registry-credentials  # K8s secret of type kubernetes.io/dockerconfigjson
    plugins:
      - name: kafka-connect-elasticsearch
        artifacts:
          - type: zip
            url: https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-elasticsearch/versions/15.1.2/confluentinc-kafka-connect-elasticsearch-15.1.2.zip
            # sha512sum: <recommended for supply-chain integrity — fill in the published checksum>

  template:
    pod:
      volumes:
        - name: elasticsearch-keystore
          secret:
            secretName: kafka-connect-elastic-jks
    connectContainer:
      env:
        - name: ELASTIC_PASSWORD
          valueFrom:
            secretKeyRef:
              name: elasticsearch-es-elastic-user
              key: elastic
      volumeMounts:
        - name: elasticsearch-keystore
          mountPath: /mnt/elasticsearch-keystore
          readOnly: true

Step 3: Configure the Elasticsearch Sink Connector

Indices are partitioned by process start month, not update month. The TimestampRouter uses the Kafka record timestamp, which process-engine populates with the process instance’s dateStarted. Late updates to old process instances land in the original start-month index, not the index for the current month. Plan archiving alignment accordingly, and use the body field indexLastUpdatedTime (not the index name) for “activity in last X days” queries.
The source topic must be pre-declared. topic.creation.enable: true on the KafkaConnect cluster only creates Kafka Connect’s internal offsets/configs/status topics — it does not create the source topic the connector consumes. Add ai.flowx.<env>.core.index.process.v1 to your Strimzi KafkaTopic resources before the connector starts.
Key settings explained:
  • transforms.routeTS.timestamp.format: Controls index partitioning (monthly=yyyyMM, daily=yyyyMMdd)
  • transforms.routeTS.topic.format: Must start with your configured index name
  • batch.size: Adjust based on throughput needs (1000 is good default)
Process instance data archiving integration: If you’re using FlowX’s process instance data archiving feature, ensure your Elasticsearch partitioning configuration aligns with your database partitioning strategy. This is essential because:
  • When archiving process instances, data must be deleted from both the database and Elasticsearch
  • The partitioning intervals should be similar between database and Elasticsearch for consistent data lifecycle management
  • Elasticsearch indexing strategy must be enabled when partitioning is configured
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: flowx-elasticsearch-sink-connector
  labels:
    strimzi.io/cluster: flowx-elasticsearch-kafka-connect
spec:
  class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
  config:
    # Connection settings - UPDATE THESE
    connection.url: https://elasticsearch-es-http.flowx.svc:9200
    connection.username: elastic
    connection.password: ${env:ELASTIC_PASSWORD}
    
    # SSL settings
    elastic.security.protocol: SSL
    elastic.https.ssl.truststore.type: JKS
    elastic.https.ssl.truststore.location: /mnt/elasticsearch-keystore/keystore.jks
    elastic.https.ssl.truststore.password: "flowx123456"
    
    # Source topic - must match your Kafka topic naming
    topics: ai.flowx.core.index.process.v1
    
    # Time-based routing - IMPORTANT: Choose your partitioning strategy
    transforms: routeTS
    transforms.routeTS.type: org.apache.kafka.connect.transforms.TimestampRouter
    transforms.routeTS.timestamp.format: yyyyMM  # Monthly indices (change as needed)
    transforms.routeTS.topic.format: process_instance-${timestamp}  # Must start with your index name
    
    # Performance settings
    batch.size: 1000
    read.timeout.ms: 30000
    flush.synchronously: "true"
    
    # Data handling
    behavior.on.malformed.documents: IGNORE
    behavior.on.null.values: IGNORE
    drop.invalid.message: "true"
    schema.ignore: "true"
    write.method: UPSERT
    type.name: _doc
    
    # Converters
    key.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable: "false"
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: "false"

Step 4: Verify the setup

Check Kafka Connect status:
kubectl get kafkaconnect flowx-elasticsearch-kafka-connect -o yaml
Check connector status:
kubectl get kafkaconnector flowx-elasticsearch-sink-connector -o yaml
Verify indices are being created:
# Port-forward to Elasticsearch
kubectl port-forward svc/elasticsearch-es-http 9200:9200

# Check indices (should see process_instance-YYYYMM pattern)
curl -k -u elastic:$ELASTIC_PASSWORD https://localhost:9200/_cat/indices?v

Setup: HTTP indexing (simple)

Configure the process engine

For HTTP indexing, update your process-engine configuration:
# Elasticsearch connection settings
SPRING_ELASTICSEARCH_REST_PROTOCOL=https
SPRING_ELASTICSEARCH_REST_URIS=elasticsearch:9200
SPRING_ELASTICSEARCH_REST_DISABLESSL=false
SPRING_ELASTICSEARCH_REST_USERNAME=
SPRING_ELASTICSEARCH_REST_PASSWORD=

# Use HTTP strategy
FLOWX_INDEXING_TYPE=http

# Index settings (optional - these are the defaults)
FLOWX_ELASTICSEARCH_INDEXSETTINGS_NAME=process_instance
FLOWX_ELASTICSEARCH_INDEXSETTINGS_SHARDS=2
FLOWX_ELASTICSEARCH_INDEXSETTINGS_REPLICAS=0

# HTTP-specific setting
FLOWX_INDEXING_OPTIMISTICLOCKINGRETRIES=3

Kafka topics

The process engine publishes indexing records to a topic whose name is composed from the standard FlowX Kafka naming triple:
<package><environment>core.index.process<version>
Each segment is set independently via environment variables on the process-engine deployment:
Environment VariableTypical valueNotes
KAFKA_TOPIC_NAMING_PACKAGEai.flowx.Vendor prefix. Same for all FlowX topics.
KAFKA_TOPIC_NAMING_ENVIRONMENTdev. or “ (empty)Environment segment. Empty in production.
KAFKA_TOPIC_NAMING_VERSION.v1Schema version suffix.
Worked examples:
EnvironmentComposed source topic
Dev (ENVIRONMENT=dev.)ai.flowx.dev.core.index.process.v1
Production (ENVIRONMENT= empty)ai.flowx.core.index.process.v1
Important: The topics: value in your KafkaConnector configuration must match the composed topic for your environment. You must also declare that topic explicitly via your Strimzi KafkaTopic resources — see the warning under Step 3.

Index management

Automatic template creation

The process engine automatically creates Elasticsearch index templates during startup:
  • HTTP strategy: Creates the index directly with configured shards/replicas
  • Kafka strategy: Creates an index template that applies to dynamically created indices

Time-based partitioning (Kafka only)

Choose your partitioning strategy based on data volume and retention needs:
Database PartitioningElasticsearch FormatIndex PatternBest For
MonthlyyyyyMMprocess_instance-202406Medium volume
Weeklyyyyywwprocess_instance-202426High volume
DailyyyyyMMddprocess_instance-20240615Very high volume

Efficient data deletion

Best practice: Delete entire indices rather than individual documents for better performance. With time-based partitioning, you can:
# Delete old monthly index
curl -X DELETE "https://localhost:9200/process_instance-202401"

# Delete multiple old indices
curl -X DELETE "https://localhost:9200/process_instance-2024*"

Troubleshooting

Common issues

Indexing not working:
  1. Check if indexing is disabled (only if you explicitly set FLOWX_INDEXING_ENABLED=false)
  2. Verify Elasticsearch connectivity
  3. Check process-engine logs for errors
Kafka Connect issues:
# Check connect cluster status
kubectl describe kafkaconnect flowx-elasticsearch-kafka-connect

# Check connector logs
kubectl logs -l strimzi.io/kind=KafkaConnect -f
Certificate issues:
# Verify JKS keystore
keytool -list -keystore keystore.jks -storepass flowx123456

# Check secret exists
kubectl get secret kafka-connect-elastic-jks -o yaml
Performance issues:
  • Increase batch.size in connector config
  • Adjust number of shards based on cluster size
  • Monitor Elasticsearch cluster health

Verification queries

Check index pattern:
# List all process instance indices
curl -k -u elastic:$PASSWORD "https://localhost:9200/_cat/indices/process_instance*?v"

# Check template exists
curl -k -u elastic:$PASSWORD "https://localhost:9200/_template/process_instance*"
Query across time-based indices:
# Search across all process instance indices
curl -k -u elastic:$PASSWORD -X GET "https://localhost:9200/process_instance-*/_search" \
  -H "Content-Type: application/json" \
  -d '{"query": {"match_all": {}}}'
Last modified on June 2, 2026