Before you start

  • Read the prerequisites: Review the Intro to Elasticsearch section first
  • Choose your strategy: Decide between Kafka (recommended for production) or HTTP indexing based on your infrastructure
  • Check permissions: Ensure you have access to modify process-engine configurations

Quick decision: Kafka vs HTTP

StrategyBest forProsCons
Kafka (recommended)Production environments with high throughputFire-and-forget communication, time-based partitioning, better performance, decoupled architectureRequires Kafka Connect setup
HTTPDevelopment or simple setupsDirect connection, easier setupBlocking operations, no time-based partitioning, tighter coupling
Why Kafka is recommended: The Kafka strategy allows for fire-and-forget communication, eliminating the need for the process engine to wait for indexing requests to complete. This significantly improves performance in high-throughput scenarios.
Critical difference: Only the Kafka strategy provides out-of-the-box support for time-based partitioning through the transforms.routeTS.timestamp.format configuration (see later in this guide). The HTTP strategy does not support time-based partitioning as a built-in feature.

Configuration overview

All indexing is controlled by these core settings:

Global indexing control

FLOWX_INDEXING_ENABLED=false  # Set only when you want to disable indexing
FLOWX_INDEXING_ENABLED defaults to true. Only set this variable if you want to disable indexing by setting it to false.

Strategy selection

FLOWX_INDEXING_PROCESSINSTANCE_INDEXING_TYPE=kafka  # Options: kafka, http, no-indexing

Performance considerations (FlowX defaults)

Default configuration:
  • Monthly indices: yyyyMM format for time-based partitioning (Kafka only)
  • 2 shards + 2 replicas: Handles up to 200k process instances per day
  • Total shards per month: 6 (2 primary + 4 replica shards)
  • Annual impact: 72 shards per year (well under Elasticsearch’s 1000 shard default limit)
Scaling guidelines:
  • If indexing becomes slow: Check physical resources and shard size
  • If monthly indices become too large: Switch to weekly indices (yyyyww)
  • For high parallel indexing load: Add more primary shards
  • High availability: The default 2 replicas provide good redundancy
Important: Each replica is applied per shard, so monitor resource usage when increasing replicas.

HTTP-only setting

FLOWX_INDEXING_OPTIMISTIC_LOCKING_RETRIES=3  # Only used with HTTP strategy

Step 1: Configure the process engine

Add these environment variables to your process-engine configuration:
# Use Kafka strategy
FLOWX_INDEXING_PROCESSINSTANCE_INDEXING_TYPE=kafka

# Index settings (optional - these are the defaults)
FLOWX_INDEXING_PROCESSINSTANCE_INDEX_NAME=process_instance
FLOWX_INDEXING_PROCESSINSTANCE_SHARDS=2
FLOWX_INDEXING_PROCESSINSTANCE_REPLICAS=2

Step 2: Deploy Kafka Connect

Prerequisites:
  • Kafka cluster (installed with Strimzi operator)
  • Elasticsearch cluster (installed with eck-operator)
  • Convert ES certificates to JKS format (see commands below)
Certificate conversion commands:
# 1. Extract the CA certificate
kubectl get secret elasticsearch-es-http-certs-public -n flowx \
  -o jsonpath='{.data.ca\.crt}' | base64 --decode > es-ca.crt

# 2. Create JKS keystore
keytool -importcert -alias elasticsearch -file es-ca.crt \
  -keystore keystore.jks -storepass flowx123456 -noprompt

# 3. Create Kubernetes secret
kubectl -n flowx create secret generic kafka-connect-elastic-jks --from-file=keystore.jks
Deploy KafkaConnect:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: flowx-elasticsearch-kafka-connect
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.9.0  # Match your Kafka cluster version
  replicas: 1
  bootstrapServers: flowx-kafka-bootstrap:9092
  
  # OAuth configuration (if using Keycloak)
  authentication:
    type: oauth
    clientId: flowx-service-client
    clientSecret:
      secretName: keycloak-kafka-cluster-client
      key: KEYCLOAK_KAFKA_CLUSTER_CLIENT_SECRET
    tokenEndpointUri: https://YOUR_KEYCLOAK_URL/auth/realms/YOUR_REALM/protocol/openid-connect/token
    tlsTrustedCertificates:
      - secretName: self-signed-certificate
        certificate: tls.crt
    disableTlsHostnameVerification: true

  config:
    group.id: flowx-kafka-connect-es-plugin
    offset.storage.topic: ai.flowx.kafka-connect-cluster-offsets
    config.storage.topic: ai.flowx.kafka-connect-cluster-configs
    status.storage.topic: ai.flowx.kafka-connect-cluster-status
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
    topic.creation.enable: true
    config.providers: env
    config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider

  build:
    output:
      type: docker
      image: ttl.sh/strimzi-connect-flowx-$(date +%s):24h  # Change this for production
    plugins:
      - name: kafka-connect-elasticsearch
        artifacts:
          - type: zip
            url: https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-elasticsearch/versions/15.0.0/confluentinc-kafka-connect-elasticsearch-15.0.0.zip

  template:
    pod:
      volumes:
        - name: elasticsearch-keystore
          secret:
            secretName: kafka-connect-elastic-jks
    connectContainer:
      env:
        - name: ELASTIC_PASSWORD
          valueFrom:
            secretKeyRef:
              name: elasticsearch-es-elastic-user
              key: elastic
      volumeMounts:
        - name: elasticsearch-keystore
          mountPath: /mnt/elasticsearch-keystore
          readOnly: true

Step 3: Configure the Elasticsearch Sink Connector

Key settings explained:
  • transforms.routeTS.timestamp.format: Controls index partitioning (monthly=yyyyMM, daily=yyyyMMdd)
  • transforms.routeTS.topic.format: Must start with your configured index name
  • batch.size: Adjust based on throughput needs (1000 is good default)
Process instance data archiving integration: If you’re using FlowX’s process instance data archiving feature, ensure your Elasticsearch partitioning configuration aligns with your database partitioning strategy. This is essential because:
  • When archiving process instances, data must be deleted from both the database and Elasticsearch
  • The partitioning intervals should be similar between database and Elasticsearch for consistent data lifecycle management
  • Elasticsearch indexing strategy must be enabled when partitioning is configured
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: flowx-elasticsearch-sink-connector
  labels:
    strimzi.io/cluster: flowx-elasticsearch-kafka-connect
spec:
  class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
  config:
    # Connection settings - UPDATE THESE
    connection.url: https://elasticsearch-es-http.flowx.svc:9200
    connection.username: elastic
    connection.password: ${env:ELASTIC_PASSWORD}
    
    # SSL settings
    elastic.security.protocol: SSL
    elastic.https.ssl.truststore.type: JKS
    elastic.https.ssl.truststore.location: /mnt/elasticsearch-keystore/keystore.jks
    elastic.https.ssl.truststore.password: "flowx123456"
    
    # Source topic - must match your Kafka topic naming
    topics: ai.flowx.core.index.process.v1
    
    # Time-based routing - IMPORTANT: Choose your partitioning strategy
    transforms: routeTS
    transforms.routeTS.type: org.apache.kafka.connect.transforms.TimestampRouter
    transforms.routeTS.timestamp.format: yyyyMM  # Monthly indices (change as needed)
    transforms.routeTS.topic.format: process_instance-${timestamp}  # Must start with your index name
    
    # Performance settings
    batch.size: 1000
    read.timeout.ms: 30000
    flush.synchronously: "true"
    
    # Data handling
    behavior.on.malformed.documents: IGNORE
    behavior.on.null.values: IGNORE
    drop.invalid.message: "true"
    schema.ignore: "true"
    write.method: UPSERT
    type.name: _doc
    
    # Converters
    key.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable: "false"
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: "false"

Step 4: Verify the setup

Check Kafka Connect status:
kubectl get kafkaconnect flowx-elasticsearch-kafka-connect -o yaml
Check connector status:
kubectl get kafkaconnector flowx-elasticsearch-sink-connector -o yaml
Verify indices are being created:
# Port-forward to Elasticsearch
kubectl port-forward svc/elasticsearch-es-http 9200:9200

# Check indices (should see process_instance-YYYYMM pattern)
curl -k -u elastic:$ELASTIC_PASSWORD https://localhost:9200/_cat/indices?v

Setup: HTTP indexing (simple)

Configure the process engine

For HTTP indexing, update your process-engine configuration:
# Use HTTP strategy
FLOWX_INDEXING_PROCESSINSTANCE_INDEXING_TYPE=http

# Index settings (optional - these are the defaults)
FLOWX_INDEXING_PROCESSINSTANCE_INDEX_NAME=process_instance
FLOWX_INDEXING_PROCESSINSTANCE_SHARDS=2
FLOWX_INDEXING_PROCESSINSTANCE_REPLICAS=2

# HTTP-specific setting
FLOWX_INDEXING_OPTIMISTIC_LOCKING_RETRIES=3

Kafka topics

The process engine publishes indexing data to this topic:
Environment VariableDefault Value
KAFKA_TOPIC_PROCESS_INDEX_OUTai.flowx.dev.core.index.process.v1
Important: The topic name in your Kafka Sink Connector configuration must match this value.

Index management

Automatic template creation

The process engine automatically creates Elasticsearch index templates during startup:
  • HTTP strategy: Creates the index directly with configured shards/replicas
  • Kafka strategy: Creates an index template that applies to dynamically created indices

Time-based partitioning (Kafka only)

Choose your partitioning strategy based on data volume and retention needs:
Database PartitioningElasticsearch FormatIndex PatternBest For
MonthlyyyyyMMprocess_instance-202406Medium volume
Weeklyyyyywwprocess_instance-202426High volume
DailyyyyyMMddprocess_instance-20240615Very high volume

Efficient data deletion

Best practice: Delete entire indices rather than individual documents for better performance. With time-based partitioning, you can:
# Delete old monthly index
curl -X DELETE "https://localhost:9200/process_instance-202401"

# Delete multiple old indices
curl -X DELETE "https://localhost:9200/process_instance-2024*"

Troubleshooting

Common issues

Indexing not working:
  1. Check if indexing is disabled (only if you explicitly set FLOWX_INDEXING_ENABLED=false)
  2. Verify Elasticsearch connectivity
  3. Check process-engine logs for errors
Kafka Connect issues:
# Check connect cluster status
kubectl describe kafkaconnect flowx-elasticsearch-kafka-connect

# Check connector logs
kubectl logs -l strimzi.io/kind=KafkaConnect -f
Certificate issues:
# Verify JKS keystore
keytool -list -keystore keystore.jks -storepass flowx123456

# Check secret exists
kubectl get secret kafka-connect-elastic-jks -o yaml
Performance issues:
  • Increase batch.size in connector config
  • Adjust number of shards based on cluster size
  • Monitor Elasticsearch cluster health

Verification queries

Check index pattern:
# List all process instance indices
curl -k -u elastic:$PASSWORD "https://localhost:9200/_cat/indices/process_instance*?v"

# Check template exists
curl -k -u elastic:$PASSWORD "https://localhost:9200/_template/process_instance*"
Query across time-based indices:
# Search across all process instance indices
curl -k -u elastic:$PASSWORD -X GET "https://localhost:9200/process_instance-*/_search" \
  -H "Content-Type: application/json" \
  -d '{"query": {"match_all": {}}}'