Before you start
- Read the prerequisites: Review the Intro to Elasticsearch section first
- Choose your strategy: Decide between Kafka (recommended for production) or HTTP indexing based on your infrastructure
- Check permissions: Ensure you have access to modify process-engine configurations
Quick decision: Kafka vs HTTP
Strategy | Best for | Pros | Cons |
---|
Kafka (recommended) | Production environments with high throughput | Fire-and-forget communication, time-based partitioning, better performance, decoupled architecture | Requires Kafka Connect setup |
HTTP | Development or simple setups | Direct connection, easier setup | Blocking operations, no time-based partitioning, tighter coupling |
Why Kafka is recommended: The Kafka strategy allows for fire-and-forget communication, eliminating the need for the process engine to wait for indexing requests to complete. This significantly improves performance in high-throughput scenarios.
Critical difference: Only the Kafka strategy provides out-of-the-box support for time-based partitioning through the transforms.routeTS.timestamp.format
configuration (see later in this guide). The HTTP strategy does not support time-based partitioning as a built-in feature.
Configuration overview
All indexing is controlled by these core settings:
Global indexing control
FLOWX_INDEXING_ENABLED=false # Set only when you want to disable indexing
FLOWX_INDEXING_ENABLED
defaults to true. Only set this variable if you want to disable indexing by setting it to false.
Strategy selection
FLOWX_INDEXING_PROCESSINSTANCE_INDEXING_TYPE=kafka # Options: kafka, http, no-indexing
Default configuration:
- Monthly indices:
yyyyMM
format for time-based partitioning (Kafka only)
- 2 shards + 2 replicas: Handles up to 200k process instances per day
- Total shards per month: 6 (2 primary + 4 replica shards)
- Annual impact: 72 shards per year (well under Elasticsearchβs 1000 shard default limit)
Scaling guidelines:
- If indexing becomes slow: Check physical resources and shard size
- If monthly indices become too large: Switch to weekly indices (
yyyyww
)
- For high parallel indexing load: Add more primary shards
- High availability: The default 2 replicas provide good redundancy
Important: Each replica is applied per shard, so monitor resource usage when increasing replicas.
HTTP-only setting
FLOWX_INDEXING_OPTIMISTIC_LOCKING_RETRIES=3 # Only used with HTTP strategy
Setup: Kafka indexing (recommended)
Add these environment variables to your process-engine configuration:
# Use Kafka strategy
FLOWX_INDEXING_PROCESSINSTANCE_INDEXING_TYPE=kafka
# Index settings (optional - these are the defaults)
FLOWX_INDEXING_PROCESSINSTANCE_INDEX_NAME=process_instance
FLOWX_INDEXING_PROCESSINSTANCE_SHARDS=2
FLOWX_INDEXING_PROCESSINSTANCE_REPLICAS=2
Step 2: Deploy Kafka Connect
Prerequisites:
- Kafka cluster (installed with Strimzi operator)
- Elasticsearch cluster (installed with eck-operator)
- Convert ES certificates to JKS format (see commands below)
Certificate conversion commands:
# 1. Extract the CA certificate
kubectl get secret elasticsearch-es-http-certs-public -n flowx \
-o jsonpath='{.data.ca\.crt}' | base64 --decode > es-ca.crt
# 2. Create JKS keystore
keytool -importcert -alias elasticsearch -file es-ca.crt \
-keystore keystore.jks -storepass flowx123456 -noprompt
# 3. Create Kubernetes secret
kubectl -n flowx create secret generic kafka-connect-elastic-jks --from-file=keystore.jks
Deploy KafkaConnect:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: flowx-elasticsearch-kafka-connect
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.9.0 # Match your Kafka cluster version
replicas: 1
bootstrapServers: flowx-kafka-bootstrap:9092
# OAuth configuration (if using Keycloak)
authentication:
type: oauth
clientId: flowx-service-client
clientSecret:
secretName: keycloak-kafka-cluster-client
key: KEYCLOAK_KAFKA_CLUSTER_CLIENT_SECRET
tokenEndpointUri: https://YOUR_KEYCLOAK_URL/auth/realms/YOUR_REALM/protocol/openid-connect/token
tlsTrustedCertificates:
- secretName: self-signed-certificate
certificate: tls.crt
disableTlsHostnameVerification: true
config:
group.id: flowx-kafka-connect-es-plugin
offset.storage.topic: ai.flowx.kafka-connect-cluster-offsets
config.storage.topic: ai.flowx.kafka-connect-cluster-configs
status.storage.topic: ai.flowx.kafka-connect-cluster-status
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
topic.creation.enable: true
config.providers: env
config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider
build:
output:
type: docker
image: ttl.sh/strimzi-connect-flowx-$(date +%s):24h # Change this for production
plugins:
- name: kafka-connect-elasticsearch
artifacts:
- type: zip
url: https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-elasticsearch/versions/15.0.0/confluentinc-kafka-connect-elasticsearch-15.0.0.zip
template:
pod:
volumes:
- name: elasticsearch-keystore
secret:
secretName: kafka-connect-elastic-jks
connectContainer:
env:
- name: ELASTIC_PASSWORD
valueFrom:
secretKeyRef:
name: elasticsearch-es-elastic-user
key: elastic
volumeMounts:
- name: elasticsearch-keystore
mountPath: /mnt/elasticsearch-keystore
readOnly: true
Key settings explained:
transforms.routeTS.timestamp.format
: Controls index partitioning (monthly=yyyyMM
, daily=yyyyMMdd
)
transforms.routeTS.topic.format
: Must start with your configured index name
batch.size
: Adjust based on throughput needs (1000 is good default)
Process instance data archiving integration: If youβre using FlowXβs process instance data archiving feature, ensure your Elasticsearch partitioning configuration aligns with your database partitioning strategy. This is essential because:
- When archiving process instances, data must be deleted from both the database and Elasticsearch
- The partitioning intervals should be similar between database and Elasticsearch for consistent data lifecycle management
- Elasticsearch indexing strategy must be enabled when partitioning is configured
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: flowx-elasticsearch-sink-connector
labels:
strimzi.io/cluster: flowx-elasticsearch-kafka-connect
spec:
class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
config:
# Connection settings - UPDATE THESE
connection.url: https://elasticsearch-es-http.flowx.svc:9200
connection.username: elastic
connection.password: ${env:ELASTIC_PASSWORD}
# SSL settings
elastic.security.protocol: SSL
elastic.https.ssl.truststore.type: JKS
elastic.https.ssl.truststore.location: /mnt/elasticsearch-keystore/keystore.jks
elastic.https.ssl.truststore.password: "flowx123456"
# Source topic - must match your Kafka topic naming
topics: ai.flowx.core.index.process.v1
# Time-based routing - IMPORTANT: Choose your partitioning strategy
transforms: routeTS
transforms.routeTS.type: org.apache.kafka.connect.transforms.TimestampRouter
transforms.routeTS.timestamp.format: yyyyMM # Monthly indices (change as needed)
transforms.routeTS.topic.format: process_instance-${timestamp} # Must start with your index name
# Performance settings
batch.size: 1000
read.timeout.ms: 30000
flush.synchronously: "true"
# Data handling
behavior.on.malformed.documents: IGNORE
behavior.on.null.values: IGNORE
drop.invalid.message: "true"
schema.ignore: "true"
write.method: UPSERT
type.name: _doc
# Converters
key.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: "false"
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: "false"
Step 4: Verify the setup
Check Kafka Connect status:
kubectl get kafkaconnect flowx-elasticsearch-kafka-connect -o yaml
Check connector status:
kubectl get kafkaconnector flowx-elasticsearch-sink-connector -o yaml
Verify indices are being created:
# Port-forward to Elasticsearch
kubectl port-forward svc/elasticsearch-es-http 9200:9200
# Check indices (should see process_instance-YYYYMM pattern)
curl -k -u elastic:$ELASTIC_PASSWORD https://localhost:9200/_cat/indices?v
Setup: HTTP indexing (simple)
For HTTP indexing, update your process-engine configuration:
# Use HTTP strategy
FLOWX_INDEXING_PROCESSINSTANCE_INDEXING_TYPE=http
# Index settings (optional - these are the defaults)
FLOWX_INDEXING_PROCESSINSTANCE_INDEX_NAME=process_instance
FLOWX_INDEXING_PROCESSINSTANCE_SHARDS=2
FLOWX_INDEXING_PROCESSINSTANCE_REPLICAS=2
# HTTP-specific setting
FLOWX_INDEXING_OPTIMISTIC_LOCKING_RETRIES=3
Kafka topics
The process engine publishes indexing data to this topic:
Environment Variable | Default Value |
---|
KAFKA_TOPIC_PROCESS_INDEX_OUT | ai.flowx.dev.core.index.process.v1 |
Important: The topic name in your Kafka Sink Connector configuration must match this value.
Index management
Automatic template creation
The process engine automatically creates Elasticsearch index templates during startup:
- HTTP strategy: Creates the index directly with configured shards/replicas
- Kafka strategy: Creates an index template that applies to dynamically created indices
Time-based partitioning (Kafka only)
Choose your partitioning strategy based on data volume and retention needs:
Database Partitioning | Elasticsearch Format | Index Pattern | Best For |
---|
Monthly | yyyyMM | process_instance-202406 | Medium volume |
Weekly | yyyyww | process_instance-202426 | High volume |
Daily | yyyyMMdd | process_instance-20240615 | Very high volume |
Efficient data deletion
Best practice: Delete entire indices rather than individual documents for better performance.
With time-based partitioning, you can:
# Delete old monthly index
curl -X DELETE "https://localhost:9200/process_instance-202401"
# Delete multiple old indices
curl -X DELETE "https://localhost:9200/process_instance-2024*"
Troubleshooting
Common issues
Indexing not working:
- Check if indexing is disabled (only if you explicitly set
FLOWX_INDEXING_ENABLED=false
)
- Verify Elasticsearch connectivity
- Check process-engine logs for errors
Kafka Connect issues:
# Check connect cluster status
kubectl describe kafkaconnect flowx-elasticsearch-kafka-connect
# Check connector logs
kubectl logs -l strimzi.io/kind=KafkaConnect -f
Certificate issues:
# Verify JKS keystore
keytool -list -keystore keystore.jks -storepass flowx123456
# Check secret exists
kubectl get secret kafka-connect-elastic-jks -o yaml
Performance issues:
- Increase
batch.size
in connector config
- Adjust number of shards based on cluster size
- Monitor Elasticsearch cluster health
Verification queries
Check index pattern:
# List all process instance indices
curl -k -u elastic:$PASSWORD "https://localhost:9200/_cat/indices/process_instance*?v"
# Check template exists
curl -k -u elastic:$PASSWORD "https://localhost:9200/_template/process_instance*"
Query across time-based indices:
# Search across all process instance indices
curl -k -u elastic:$PASSWORD -X GET "https://localhost:9200/process_instance-*/_search" \
-H "Content-Type: application/json" \
-d '{"query": {"match_all": {}}}'