Documentation Index
Fetch the complete documentation index at: https://docs.flowx.ai/llms.txt
Use this file to discover all available pages before exploring further.
Scope. This guide covers process-instance indexing only — the process_instance-* indices written by process-engine. Audit logs (audit-logs index, written directly by audit-core over HTTPS) and search workloads (data-search, read-only) do not flow through this pipeline.
Before you start
- Read the prerequisites: Review the Intro to Elasticsearch section first
- Choose your strategy: Decide between Kafka (recommended for production) or HTTP indexing based on your infrastructure
- Check permissions: Ensure you have access to modify process-engine configurations
Quick decision: Kafka vs HTTP
| Strategy | Best for | Pros | Cons |
|---|
| Kafka (recommended) | Production environments with high throughput | Fire-and-forget communication, time-based partitioning, better performance, decoupled architecture | Requires Kafka Connect setup |
| HTTP | Development or simple setups | Direct connection, easier setup | Blocking operations, no time-based partitioning, tighter coupling |
Why Kafka is recommended: The Kafka strategy allows for fire-and-forget communication, eliminating the need for the process engine to wait for indexing requests to complete. This significantly improves performance in high-throughput scenarios.
Critical difference: Only the Kafka strategy provides out-of-the-box support for time-based partitioning through the transforms.routeTS.timestamp.format configuration (see later in this guide). The HTTP strategy does not support time-based partitioning as a built-in feature.
Configuration overview
All indexing is controlled by these core settings:
Global indexing control
FLOWX_INDEXING_ENABLED=false # Set only when you want to disable indexing
FLOWX_INDEXING_ENABLED defaults to true. Only set this variable if you want to disable indexing by setting it to false.
Strategy selection
FLOWX_INDEXING_TYPE=kafka # Options: kafka, http, no-indexing
Default configuration:
- Monthly indices:
yyyyMM format for time-based partitioning (Kafka only)
- 2 shards + 0 replicas (code default): bump replicas based on your high-availability requirements
- Primary shards per year: 24 (2 primary × 12 monthly indices) — well under Elasticsearch’s 1000 shard default limit; each replica adds another 24
Scaling guidelines:
- If indexing becomes slow: Check physical resources and shard size
- If monthly indices become too large: Switch to weekly indices (
yyyyww)
- For high parallel indexing load: Add more primary shards
- High availability: Set
FLOWX_ELASTICSEARCH_INDEXSETTINGS_REPLICAS to at least 1 in production for resilience to a single node loss
Important: Each replica is applied per shard, so monitor resource usage when increasing replicas.
HTTP-only setting
FLOWX_INDEXING_OPTIMISTICLOCKINGRETRIES=3 # Only used with HTTP strategy
Setup: Kafka indexing (recommended)
Add these environment variables to your process-engine configuration:
# Elasticsearch connection settings
SPRING_ELASTICSEARCH_REST_PROTOCOL=https
SPRING_ELASTICSEARCH_REST_URIS=elasticsearch:9200
SPRING_ELASTICSEARCH_REST_DISABLESSL=false
SPRING_ELASTICSEARCH_REST_USERNAME=
SPRING_ELASTICSEARCH_REST_PASSWORD=
# Use Kafka strategy
FLOWX_INDEXING_TYPE=kafka
# Index settings (optional - these are the defaults)
FLOWX_ELASTICSEARCH_INDEXSETTINGS_NAME=process_instance
FLOWX_ELASTICSEARCH_INDEXSETTINGS_SHARDS=2
FLOWX_ELASTICSEARCH_INDEXSETTINGS_REPLICAS=0
process-engine still needs HTTPS connectivity to Elasticsearch in Kafka mode. Even with FLOWX_INDEXING_TYPE=kafka, process-engine creates the process_instance_template index template directly against Elasticsearch at startup. If process-engine cannot reach Elasticsearch, the template silently fails to apply and indices fall back to default mappings — search and aggregations break in non-obvious ways.
Step 2: Deploy Kafka Connect
Compatibility matrix
| Component | Supported in 5.9.0 |
|---|
| Kafka cluster | 3.9 – 4.2 (set KafkaConnect.spec.version to match) |
| Elasticsearch Sink Connector plugin | 15+ |
| Elasticsearch (target cluster) | 8.x |
| Kafka Connect runtime | matches Kafka cluster version, run by the Strimzi operator |
Multiple sink connector implementations also work, as long as they are compatible with both the deployed Kafka and Elasticsearch version deployed. This guide uses the Confluent kafka-connect-elasticsearch plugin as the reference path.
Prerequisites:
- Kafka cluster (installed with Strimzi operator)
- Elasticsearch cluster (installed with eck-operator)
- Convert ES certificates to JKS format (see commands below)
Certificate conversion commands:
# 1. Extract the CA certificate
kubectl get secret elasticsearch-es-http-certs-public -n flowx \
-o jsonpath='{.data.ca\.crt}' | base64 --decode > es-ca.crt
# 2. Create JKS keystore
keytool -importcert -alias elasticsearch -file es-ca.crt \
-keystore keystore.jks -storepass flowx123456 -noprompt
# 3. Create Kubernetes secret
kubectl -n flowx create secret generic kafka-connect-elastic-jks --from-file=keystore.jks
Deploy KafkaConnect:
FlowX does not publish a Kafka Connect image. You build and host the container image yourself. Strimzi’s spec.build block below downloads the connector plugin, bakes it into a new image, and pushes it to your registry. Use an immutable tag that ties the image to the plugin version so upgrades are auditable.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: flowx-elasticsearch-kafka-connect
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.9.0 # FlowX 5.9 supports Kafka 3.9 – 4.2; set this to your deployed cluster version
replicas: 1
bootstrapServers: flowx-kafka-bootstrap:9092
# OAuth configuration (if using Keycloak)
authentication:
type: oauth
clientId: flowx-service-client
clientSecret:
secretName: keycloak-kafka-cluster-client
key: KEYCLOAK_KAFKA_CLUSTER_CLIENT_SECRET
tokenEndpointUri: https://YOUR_KEYCLOAK_URL/auth/realms/YOUR_REALM/protocol/openid-connect/token
tlsTrustedCertificates:
- secretName: self-signed-certificate
certificate: tls.crt
disableTlsHostnameVerification: true
config:
group.id: flowx-kafka-connect-es-plugin
offset.storage.topic: ai.flowx.kafka-connect-cluster-offsets
config.storage.topic: ai.flowx.kafka-connect-cluster-configs
status.storage.topic: ai.flowx.kafka-connect-cluster-status
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
# Auto-creates Kafka Connect's internal bookkeeping topics (offsets / configs / status).
# The SOURCE topic (ai.flowx.<env>.core.index.process.v1) is NOT covered here — you must
# declare it via your Strimzi KafkaTopic resources before starting the connector.
topic.creation.enable: true
config.providers: env
config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider
build:
output:
type: docker
# YOU build and host this image. FlowX does not publish a Kafka Connect image.
# Strimzi pulls the upstream Connect base, downloads the plugin below, bakes a new
# container, and pushes it here. Tag it with the plugin version for traceability.
image: <YOUR_REGISTRY>/<your-connect-image-name>:<your-tag>
pushSecret: registry-credentials # K8s secret of type kubernetes.io/dockerconfigjson
plugins:
- name: kafka-connect-elasticsearch
artifacts:
- type: zip
url: https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-elasticsearch/versions/15.1.2/confluentinc-kafka-connect-elasticsearch-15.1.2.zip
# sha512sum: <recommended for supply-chain integrity — fill in the published checksum>
template:
pod:
volumes:
- name: elasticsearch-keystore
secret:
secretName: kafka-connect-elastic-jks
connectContainer:
env:
- name: ELASTIC_PASSWORD
valueFrom:
secretKeyRef:
name: elasticsearch-es-elastic-user
key: elastic
volumeMounts:
- name: elasticsearch-keystore
mountPath: /mnt/elasticsearch-keystore
readOnly: true
Indices are partitioned by process start month, not update month. The TimestampRouter uses the Kafka record timestamp, which process-engine populates with the process instance’s dateStarted. Late updates to old process instances land in the original start-month index, not the index for the current month. Plan archiving alignment accordingly, and use the body field indexLastUpdatedTime (not the index name) for “activity in last X days” queries.
The source topic must be pre-declared. topic.creation.enable: true on the KafkaConnect cluster only creates Kafka Connect’s internal offsets/configs/status topics — it does not create the source topic the connector consumes. Add ai.flowx.<env>.core.index.process.v1 to your Strimzi KafkaTopic resources before the connector starts.
Key settings explained:
transforms.routeTS.timestamp.format: Controls index partitioning (monthly=yyyyMM, daily=yyyyMMdd)
transforms.routeTS.topic.format: Must start with your configured index name
batch.size: Adjust based on throughput needs (1000 is good default)
Process instance data archiving integration: If you’re using FlowX’s process instance data archiving feature, ensure your Elasticsearch partitioning configuration aligns with your database partitioning strategy. This is essential because:
- When archiving process instances, data must be deleted from both the database and Elasticsearch
- The partitioning intervals should be similar between database and Elasticsearch for consistent data lifecycle management
- Elasticsearch indexing strategy must be enabled when partitioning is configured
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: flowx-elasticsearch-sink-connector
labels:
strimzi.io/cluster: flowx-elasticsearch-kafka-connect
spec:
class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
config:
# Connection settings - UPDATE THESE
connection.url: https://elasticsearch-es-http.flowx.svc:9200
connection.username: elastic
connection.password: ${env:ELASTIC_PASSWORD}
# SSL settings
elastic.security.protocol: SSL
elastic.https.ssl.truststore.type: JKS
elastic.https.ssl.truststore.location: /mnt/elasticsearch-keystore/keystore.jks
elastic.https.ssl.truststore.password: "flowx123456"
# Source topic - must match your Kafka topic naming
topics: ai.flowx.core.index.process.v1
# Time-based routing - IMPORTANT: Choose your partitioning strategy
transforms: routeTS
transforms.routeTS.type: org.apache.kafka.connect.transforms.TimestampRouter
transforms.routeTS.timestamp.format: yyyyMM # Monthly indices (change as needed)
transforms.routeTS.topic.format: process_instance-${timestamp} # Must start with your index name
# Performance settings
batch.size: 1000
read.timeout.ms: 30000
flush.synchronously: "true"
# Data handling
behavior.on.malformed.documents: IGNORE
behavior.on.null.values: IGNORE
drop.invalid.message: "true"
schema.ignore: "true"
write.method: UPSERT
type.name: _doc
# Converters
key.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: "false"
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: "false"
Step 4: Verify the setup
Check Kafka Connect status:
kubectl get kafkaconnect flowx-elasticsearch-kafka-connect -o yaml
Check connector status:
kubectl get kafkaconnector flowx-elasticsearch-sink-connector -o yaml
Verify indices are being created:
# Port-forward to Elasticsearch
kubectl port-forward svc/elasticsearch-es-http 9200:9200
# Check indices (should see process_instance-YYYYMM pattern)
curl -k -u elastic:$ELASTIC_PASSWORD https://localhost:9200/_cat/indices?v
Setup: HTTP indexing (simple)
For HTTP indexing, update your process-engine configuration:
# Elasticsearch connection settings
SPRING_ELASTICSEARCH_REST_PROTOCOL=https
SPRING_ELASTICSEARCH_REST_URIS=elasticsearch:9200
SPRING_ELASTICSEARCH_REST_DISABLESSL=false
SPRING_ELASTICSEARCH_REST_USERNAME=
SPRING_ELASTICSEARCH_REST_PASSWORD=
# Use HTTP strategy
FLOWX_INDEXING_TYPE=http
# Index settings (optional - these are the defaults)
FLOWX_ELASTICSEARCH_INDEXSETTINGS_NAME=process_instance
FLOWX_ELASTICSEARCH_INDEXSETTINGS_SHARDS=2
FLOWX_ELASTICSEARCH_INDEXSETTINGS_REPLICAS=0
# HTTP-specific setting
FLOWX_INDEXING_OPTIMISTICLOCKINGRETRIES=3
Kafka topics
The process engine publishes indexing records to a topic whose name is composed from the standard FlowX Kafka naming triple:
<package><environment>core.index.process<version>
Each segment is set independently via environment variables on the process-engine deployment:
| Environment Variable | Typical value | Notes |
|---|
KAFKA_TOPIC_NAMING_PACKAGE | ai.flowx. | Vendor prefix. Same for all FlowX topics. |
KAFKA_TOPIC_NAMING_ENVIRONMENT | dev. or “ (empty) | Environment segment. Empty in production. |
KAFKA_TOPIC_NAMING_VERSION | .v1 | Schema version suffix. |
Worked examples:
| Environment | Composed source topic |
|---|
Dev (ENVIRONMENT=dev.) | ai.flowx.dev.core.index.process.v1 |
Production (ENVIRONMENT= empty) | ai.flowx.core.index.process.v1 |
Important: The topics: value in your KafkaConnector configuration must match the composed topic for your environment. You must also declare that topic explicitly via your Strimzi KafkaTopic resources — see the warning under Step 3.
Index management
Automatic template creation
The process engine automatically creates Elasticsearch index templates during startup:
- HTTP strategy: Creates the index directly with configured shards/replicas
- Kafka strategy: Creates an index template that applies to dynamically created indices
Time-based partitioning (Kafka only)
Choose your partitioning strategy based on data volume and retention needs:
| Database Partitioning | Elasticsearch Format | Index Pattern | Best For |
|---|
| Monthly | yyyyMM | process_instance-202406 | Medium volume |
| Weekly | yyyyww | process_instance-202426 | High volume |
| Daily | yyyyMMdd | process_instance-20240615 | Very high volume |
Efficient data deletion
Best practice: Delete entire indices rather than individual documents for better performance.
With time-based partitioning, you can:
# Delete old monthly index
curl -X DELETE "https://localhost:9200/process_instance-202401"
# Delete multiple old indices
curl -X DELETE "https://localhost:9200/process_instance-2024*"
Troubleshooting
Common issues
Indexing not working:
- Check if indexing is disabled (only if you explicitly set
FLOWX_INDEXING_ENABLED=false)
- Verify Elasticsearch connectivity
- Check process-engine logs for errors
Kafka Connect issues:
# Check connect cluster status
kubectl describe kafkaconnect flowx-elasticsearch-kafka-connect
# Check connector logs
kubectl logs -l strimzi.io/kind=KafkaConnect -f
Certificate issues:
# Verify JKS keystore
keytool -list -keystore keystore.jks -storepass flowx123456
# Check secret exists
kubectl get secret kafka-connect-elastic-jks -o yaml
Performance issues:
- Increase
batch.size in connector config
- Adjust number of shards based on cluster size
- Monitor Elasticsearch cluster health
Verification queries
Check index pattern:
# List all process instance indices
curl -k -u elastic:$PASSWORD "https://localhost:9200/_cat/indices/process_instance*?v"
# Check template exists
curl -k -u elastic:$PASSWORD "https://localhost:9200/_template/process_instance*"
Query across time-based indices:
# Search across all process instance indices
curl -k -u elastic:$PASSWORD -X GET "https://localhost:9200/process_instance-*/_search" \
-H "Content-Type: application/json" \
-d '{"query": {"match_all": {}}}'