> ## Documentation Index
> Fetch the complete documentation index at: https://docs.flowx.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Configuring Elasticsearch indexing

> This guide shows you how to enable process instance indexing using either Kafka or HTTP transport strategies.

<Info>
  **Scope.** This guide covers process-instance indexing only — the `process_instance-*` indices written by `process-engine`. Audit logs (`audit-logs` index, written directly by `audit-core` over HTTPS) and search workloads (`data-search`, read-only) do **not** flow through this pipeline.
</Info>

## Before you start

* **Read the prerequisites**: Review the [Intro to Elasticsearch](../../../docs/platform-overview/frameworks-and-standards/event-driven-architecture-frameworks/intro-to-elasticsearch) section first
* **Choose your strategy**: Decide between Kafka (recommended for production) or HTTP indexing based on your infrastructure
* **Check permissions**: Ensure you have access to modify process-engine configurations

## Quick decision: Kafka vs HTTP

| Strategy                | Best for                                     | Pros                                                                                               | Cons                                                              |
| ----------------------- | -------------------------------------------- | -------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------- |
| **Kafka** (recommended) | Production environments with high throughput | Fire-and-forget communication, time-based partitioning, better performance, decoupled architecture | Requires Kafka Connect setup                                      |
| **HTTP**                | Development or simple setups                 | Direct connection, easier setup                                                                    | Blocking operations, no time-based partitioning, tighter coupling |

<Tip>
  **Why Kafka is recommended**: The Kafka strategy allows for fire-and-forget communication, eliminating the need for the process engine to wait for indexing requests to complete. This significantly improves performance in high-throughput scenarios.
</Tip>

<Info>
  **Critical difference**: Only the Kafka strategy provides out-of-the-box support for time-based partitioning through the `transforms.routeTS.timestamp.format` configuration (see later in this guide). The HTTP strategy does not support time-based partitioning as a built-in feature.
</Info>

***

## Configuration overview

All indexing is controlled by these core settings:

### Global indexing control

```bash theme={"system"}
FLOWX_INDEXING_ENABLED=false  # Set only when you want to disable indexing
```

<Info>
  `FLOWX_INDEXING_ENABLED` defaults to true. Only set this variable if you want to disable indexing by setting it to false.
</Info>

### Strategy selection

```bash theme={"system"}
FLOWX_INDEXING_TYPE=kafka  # Options: kafka, http, no-indexing
```

### Performance considerations (FlowX defaults)

**Default configuration:**

* **Monthly indices**: `yyyyMM` format for time-based partitioning (Kafka only)
* **2 shards + 0 replicas** (code default): bump replicas based on your high-availability requirements
* **Primary shards per year**: 24 (2 primary × 12 monthly indices) — well under Elasticsearch's 1000 shard default limit; each replica adds another 24

**Scaling guidelines:**

* **If indexing becomes slow**: Check physical resources and shard size
* **If monthly indices become too large**: Switch to weekly indices (`yyyyww`)
* **For high parallel indexing load**: Add more primary shards
* **High availability**: Set `FLOWX_ELASTICSEARCH_INDEXSETTINGS_REPLICAS` to at least `1` in production for resilience to a single node loss

<Warning>
  **Important**: Each replica is applied per shard, so monitor resource usage when increasing replicas.
</Warning>

### HTTP-only setting

```bash theme={"system"}
FLOWX_INDEXING_OPTIMISTICLOCKINGRETRIES=3  # Only used with HTTP strategy
```

***

## Setup: Kafka indexing (recommended)

### Step 1: Configure the process engine

Add these environment variables to your process-engine configuration:

```bash theme={"system"}
# Elasticsearch connection settings
SPRING_ELASTICSEARCH_REST_PROTOCOL=https
SPRING_ELASTICSEARCH_REST_URIS=elasticsearch:9200
SPRING_ELASTICSEARCH_REST_DISABLESSL=false
SPRING_ELASTICSEARCH_REST_USERNAME=
SPRING_ELASTICSEARCH_REST_PASSWORD=

# Use Kafka strategy
FLOWX_INDEXING_TYPE=kafka

# Index settings (optional - these are the defaults)
FLOWX_ELASTICSEARCH_INDEXSETTINGS_NAME=process_instance
FLOWX_ELASTICSEARCH_INDEXSETTINGS_SHARDS=2
FLOWX_ELASTICSEARCH_INDEXSETTINGS_REPLICAS=0
```

<Warning>
  **`process-engine` still needs HTTPS connectivity to Elasticsearch in Kafka mode.** Even with `FLOWX_INDEXING_TYPE=kafka`, `process-engine` creates the `process_instance_template` index template directly against Elasticsearch at startup. If `process-engine` cannot reach Elasticsearch, the template silently fails to apply and indices fall back to default mappings — search and aggregations break in non-obvious ways.
</Warning>

### Step 2: Deploy Kafka Connect

#### Compatibility matrix

| Component                           | Supported in 5.9.0                                         |
| ----------------------------------- | ---------------------------------------------------------- |
| Kafka cluster                       | 3.9 – 4.2 (set `KafkaConnect.spec.version` to match)       |
| Elasticsearch Sink Connector plugin | 15+                                                        |
| Elasticsearch (target cluster)      | 8.x                                                        |
| Kafka Connect runtime               | matches Kafka cluster version, run by the Strimzi operator |

<Note>
  Multiple sink connector implementations also work, as long as they are compatible with both the deployed Kafka and Elasticsearch version deployed. This guide uses the Confluent `kafka-connect-elasticsearch` plugin as the reference path.
</Note>

**Prerequisites:**

* Kafka cluster (installed with Strimzi operator)
* Elasticsearch cluster (installed with eck-operator)
* Convert ES certificates to JKS format (see commands below)

**Certificate conversion commands:**

```bash theme={"system"}
# 1. Extract the CA certificate
kubectl get secret elasticsearch-es-http-certs-public -n flowx \
  -o jsonpath='{.data.ca\.crt}' | base64 --decode > es-ca.crt

# 2. Create JKS keystore
keytool -importcert -alias elasticsearch -file es-ca.crt \
  -keystore keystore.jks -storepass flowx123456 -noprompt

# 3. Create Kubernetes secret
kubectl -n flowx create secret generic kafka-connect-elastic-jks --from-file=keystore.jks
```

**Deploy KafkaConnect:**

<Warning>
  **FlowX does not publish a Kafka Connect image.** You build and host the container image yourself. Strimzi's `spec.build` block below downloads the connector plugin, bakes it into a new image, and pushes it to your registry. Use an immutable tag that ties the image to the plugin version so upgrades are auditable.
</Warning>

```yaml theme={"system"}
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: flowx-elasticsearch-kafka-connect
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.9.0  # FlowX 5.9 supports Kafka 3.9 – 4.2; set this to your deployed cluster version
  replicas: 1
  bootstrapServers: flowx-kafka-bootstrap:9092

  # OAuth configuration (if using Keycloak)
  authentication:
    type: oauth
    clientId: flowx-service-client
    clientSecret:
      secretName: keycloak-kafka-cluster-client
      key: KEYCLOAK_KAFKA_CLUSTER_CLIENT_SECRET
    tokenEndpointUri: https://YOUR_KEYCLOAK_URL/auth/realms/YOUR_REALM/protocol/openid-connect/token
    tlsTrustedCertificates:
      - secretName: self-signed-certificate
        certificate: tls.crt
    disableTlsHostnameVerification: true

  config:
    group.id: flowx-kafka-connect-es-plugin
    offset.storage.topic: ai.flowx.kafka-connect-cluster-offsets
    config.storage.topic: ai.flowx.kafka-connect-cluster-configs
    status.storage.topic: ai.flowx.kafka-connect-cluster-status
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
    # Auto-creates Kafka Connect's internal bookkeeping topics (offsets / configs / status).
    # The SOURCE topic (ai.flowx.<env>.core.index.process.v1) is NOT covered here — you must
    # declare it via your Strimzi KafkaTopic resources before starting the connector.
    topic.creation.enable: true
    config.providers: env
    config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider

  build:
    output:
      type: docker
      # YOU build and host this image. FlowX does not publish a Kafka Connect image.
      # Strimzi pulls the upstream Connect base, downloads the plugin below, bakes a new
      # container, and pushes it here. Tag it with the plugin version for traceability.
      image: <YOUR_REGISTRY>/<your-connect-image-name>:<your-tag>
      pushSecret: registry-credentials  # K8s secret of type kubernetes.io/dockerconfigjson
    plugins:
      - name: kafka-connect-elasticsearch
        artifacts:
          - type: zip
            url: https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-elasticsearch/versions/15.1.2/confluentinc-kafka-connect-elasticsearch-15.1.2.zip
            # sha512sum: <recommended for supply-chain integrity — fill in the published checksum>

  template:
    pod:
      volumes:
        - name: elasticsearch-keystore
          secret:
            secretName: kafka-connect-elastic-jks
    connectContainer:
      env:
        - name: ELASTIC_PASSWORD
          valueFrom:
            secretKeyRef:
              name: elasticsearch-es-elastic-user
              key: elastic
      volumeMounts:
        - name: elasticsearch-keystore
          mountPath: /mnt/elasticsearch-keystore
          readOnly: true
```

### Step 3: Configure the Elasticsearch Sink Connector

<Warning>
  **Indices are partitioned by process start month, not update month.** The `TimestampRouter` uses the Kafka record timestamp, which `process-engine` populates with the process instance's `dateStarted`. Late updates to old process instances land in the *original* start-month index, not the index for the current month. Plan archiving alignment accordingly, and use the body field `indexLastUpdatedTime` (not the index name) for "activity in last X days" queries.
</Warning>

<Warning>
  **The source topic must be pre-declared.** `topic.creation.enable: true` on the `KafkaConnect` cluster only creates Kafka Connect's internal offsets/configs/status topics — it does **not** create the source topic the connector consumes. Add `ai.flowx.<env>.core.index.process.v1` to your Strimzi `KafkaTopic` resources before the connector starts.
</Warning>

**Key settings explained:**

* `transforms.routeTS.timestamp.format`: Controls index partitioning (monthly=`yyyyMM`, daily=`yyyyMMdd`)
* `transforms.routeTS.topic.format`: Must start with your configured index name
* `batch.size`: Adjust based on throughput needs (1000 is good default)

<Warning>
  **Process instance data archiving integration**: If you're using FlowX's process instance data archiving feature, ensure your Elasticsearch partitioning configuration aligns with your database partitioning strategy. This is essential because:

  * When archiving process instances, data must be deleted from both the database and Elasticsearch
  * The partitioning intervals should be similar between database and Elasticsearch for consistent data lifecycle management
  * Elasticsearch indexing strategy must be enabled when partitioning is configured
</Warning>

```yaml theme={"system"}
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: flowx-elasticsearch-sink-connector
  labels:
    strimzi.io/cluster: flowx-elasticsearch-kafka-connect
spec:
  class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
  config:
    # Connection settings - UPDATE THESE
    connection.url: https://elasticsearch-es-http.flowx.svc:9200
    connection.username: elastic
    connection.password: ${env:ELASTIC_PASSWORD}
    
    # SSL settings
    elastic.security.protocol: SSL
    elastic.https.ssl.truststore.type: JKS
    elastic.https.ssl.truststore.location: /mnt/elasticsearch-keystore/keystore.jks
    elastic.https.ssl.truststore.password: "flowx123456"
    
    # Source topic - must match your Kafka topic naming
    topics: ai.flowx.core.index.process.v1
    
    # Time-based routing - IMPORTANT: Choose your partitioning strategy
    transforms: routeTS
    transforms.routeTS.type: org.apache.kafka.connect.transforms.TimestampRouter
    transforms.routeTS.timestamp.format: yyyyMM  # Monthly indices (change as needed)
    transforms.routeTS.topic.format: process_instance-${timestamp}  # Must start with your index name
    
    # Performance settings
    batch.size: 1000
    read.timeout.ms: 30000
    flush.synchronously: "true"
    
    # Data handling
    behavior.on.malformed.documents: IGNORE
    behavior.on.null.values: IGNORE
    drop.invalid.message: "true"
    schema.ignore: "true"
    write.method: UPSERT
    type.name: _doc
    
    # Converters
    key.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable: "false"
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: "false"
```

### Step 4: Verify the setup

**Check Kafka Connect status:**

```bash theme={"system"}
kubectl get kafkaconnect flowx-elasticsearch-kafka-connect -o yaml
```

**Check connector status:**

```bash theme={"system"}
kubectl get kafkaconnector flowx-elasticsearch-sink-connector -o yaml
```

**Verify indices are being created:**

```bash theme={"system"}
# Port-forward to Elasticsearch
kubectl port-forward svc/elasticsearch-es-http 9200:9200

# Check indices (should see process_instance-YYYYMM pattern)
curl -k -u elastic:$ELASTIC_PASSWORD https://localhost:9200/_cat/indices?v
```

***

## Setup: HTTP indexing (simple)

### Configure the process engine

For HTTP indexing, update your process-engine configuration:

```bash theme={"system"}
# Elasticsearch connection settings
SPRING_ELASTICSEARCH_REST_PROTOCOL=https
SPRING_ELASTICSEARCH_REST_URIS=elasticsearch:9200
SPRING_ELASTICSEARCH_REST_DISABLESSL=false
SPRING_ELASTICSEARCH_REST_USERNAME=
SPRING_ELASTICSEARCH_REST_PASSWORD=

# Use HTTP strategy
FLOWX_INDEXING_TYPE=http

# Index settings (optional - these are the defaults)
FLOWX_ELASTICSEARCH_INDEXSETTINGS_NAME=process_instance
FLOWX_ELASTICSEARCH_INDEXSETTINGS_SHARDS=2
FLOWX_ELASTICSEARCH_INDEXSETTINGS_REPLICAS=0

# HTTP-specific setting
FLOWX_INDEXING_OPTIMISTICLOCKINGRETRIES=3
```

***

## Kafka topics

The process engine publishes indexing records to a topic whose name is composed from the standard FlowX Kafka naming triple:

```
<package><environment>core.index.process<version>
```

Each segment is set independently via environment variables on the `process-engine` deployment:

| Environment Variable             | Typical value          | Notes                                     |
| -------------------------------- | ---------------------- | ----------------------------------------- |
| `KAFKA_TOPIC_NAMING_PACKAGE`     | `ai.flowx.`            | Vendor prefix. Same for all FlowX topics. |
| `KAFKA_TOPIC_NAMING_ENVIRONMENT` | `dev.` or \`\` (empty) | Environment segment. Empty in production. |
| `KAFKA_TOPIC_NAMING_VERSION`     | `.v1`                  | Schema version suffix.                    |

**Worked examples:**

| Environment                       | Composed source topic                |
| --------------------------------- | ------------------------------------ |
| Dev (`ENVIRONMENT=dev.`)          | `ai.flowx.dev.core.index.process.v1` |
| Production (`ENVIRONMENT=` empty) | `ai.flowx.core.index.process.v1`     |

**Important:** The `topics:` value in your `KafkaConnector` configuration must match the composed topic for your environment. You must also declare that topic explicitly via your Strimzi `KafkaTopic` resources — see the warning under Step 3.

***

## Index management

### Automatic template creation

The process engine automatically creates Elasticsearch index templates during startup:

* **HTTP strategy**: Creates the index directly with configured shards/replicas
* **Kafka strategy**: Creates an index template that applies to dynamically created indices

### Time-based partitioning (Kafka only)

Choose your partitioning strategy based on data volume and retention needs:

| Database Partitioning | Elasticsearch Format | Index Pattern               | Best For         |
| --------------------- | -------------------- | --------------------------- | ---------------- |
| Monthly               | `yyyyMM`             | `process_instance-202406`   | Medium volume    |
| Weekly                | `yyyyww`             | `process_instance-202426`   | High volume      |
| Daily                 | `yyyyMMdd`           | `process_instance-20240615` | Very high volume |

### Efficient data deletion

**Best practice:** Delete entire indices rather than individual documents for better performance.

With time-based partitioning, you can:

```bash theme={"system"}
# Delete old monthly index
curl -X DELETE "https://localhost:9200/process_instance-202401"

# Delete multiple old indices
curl -X DELETE "https://localhost:9200/process_instance-2024*"
```

***

## Troubleshooting

### Common issues

**Indexing not working:**

1. Check if indexing is disabled (only if you explicitly set `FLOWX_INDEXING_ENABLED=false`)
2. Verify Elasticsearch connectivity
3. Check process-engine logs for errors

**Kafka Connect issues:**

```bash theme={"system"}
# Check connect cluster status
kubectl describe kafkaconnect flowx-elasticsearch-kafka-connect

# Check connector logs
kubectl logs -l strimzi.io/kind=KafkaConnect -f
```

**Certificate issues:**

```bash theme={"system"}
# Verify JKS keystore
keytool -list -keystore keystore.jks -storepass flowx123456

# Check secret exists
kubectl get secret kafka-connect-elastic-jks -o yaml
```

**Performance issues:**

* Increase `batch.size` in connector config
* Adjust number of shards based on cluster size
* Monitor Elasticsearch cluster health

### Verification queries

**Check index pattern:**

```bash theme={"system"}
# List all process instance indices
curl -k -u elastic:$PASSWORD "https://localhost:9200/_cat/indices/process_instance*?v"

# Check template exists
curl -k -u elastic:$PASSWORD "https://localhost:9200/_template/process_instance*"
```

**Query across time-based indices:**

```bash theme={"system"}
# Search across all process instance indices
curl -k -u elastic:$PASSWORD -X GET "https://localhost:9200/process_instance-*/_search" \
  -H "Content-Type: application/json" \
  -d '{"query": {"match_all": {}}}'
```
