Before proceeding, it is recommended to familiarize yourself with Elasticsearch and its indexing process by referring to the Intro to Elasticsearch section.

Intro to Elasticsearch

Configuration updates

To enable Kafka indexing strategy, the previous configuration parameter flowx.use-elasticsearch is being replaced. However, to ensure backward compatibility, it will still be preserved in the configuration. Below is an example of how to configure it:

spring:
  elasticsearch:
    index-settings:
      name: process_instance
      shards: 1
      replicas: 1

Instead of the removed configuration, a new configuration area has been added:

flowx:
  indexing:
    enabled: true  // true | false - specifies if the indexing with Elasticsearch for the whole app is enabled or disabled.
    processInstance:  // set of configurations for indexing process instances. Can be duplicated for other objects.
      indexing-type: kafka  // no-indexing | http | kafka - the chosen indexing strategy.
      index-name: process_instance  // the index name that is part of the search pattern.
      shards: 1
      replicas: 1

The flowx.indexing.enabled property determines whether indexing with Elasticsearch is enabled. When set to false or missing, no indexing will be performed for any entities defined below. When set to true, indexing with Elasticsearch is enabled.

If the FLOWX_INDEXING_ENABLED configuration is set to false, the following configuration information and guidelines are not applicable to your use case.

The flowx.indexing.processInstance.indexing-type property defines the indexing strategy for process instances. It can have one of the following values:

  • no-indexing: No indexing will be performed for process instances.
  • http: Direct connection from the process engine to Elasticsearch through HTTP calls.
  • kafka: Data will be sent to be indexed via a Kafka topic using the new strategy. To implement this strategy, the Kafka Connect with Elasticsearch Sink Connector must be deployed in the infrastructure.

Configuration steps

To enable indexing with Elasticsearch for the entire application, update the process-engine configuration with the following parameters:

  • FLOWX_INDEXING_ENABLED: set this parameter to true to enable indexing with Elastisearch for the entire application
Variable NameEnabledDescription
FLOWX_INDEXING_ENABLEDtrueIndexing with Elasticsearch for the whole app is enabled
FLOWX_INDEXING_ENABLEDfalseIndexing with Elasticsearch for the whole app is disabled
  • FLOWX_INDEXING_PROCESSINSTANCE_INDEXING_TYPE: Set this parameter to kafka to use the Kafka transport strategy for indexing process instances.
Variable NameIndexing Type - ValuesDefinition
FLOWX_INDEXING_PROCESSINSTANCE_INDEXING_TYPEno-indexingNo indexing is performed for process instances
FLOWX_INDEXING_PROCESSINSTANCE_INDEXING_TYPEhttpProcess instances are indexed via HTTP (direct connection from process-engine to Elasticsearch thorugh HTTP calls)
FLOWX_INDEXING_PROCESSINSTANCE_INDEXING_TYPEkafkaProcess instances are indexed via Kafka (send data to be indexed through a kafka topic - the new strategy for the applied solution)
  • FLOWX_INDEXING_PROCESSINSTANCE_INDEX_NAME: Specify the name of the index used for process instances.
Variable NameValuesDefinition
FLOWX_INDEXING_PROCESSINSTANCE_INDEXING_INDEX_NAMEprocess_instanceThe name of the index used for storing process instances. It is also part of the search pattern
  • FLOWX_INDEXING_PROCESSINSTANCE_SHARDS: Set the number of shards for the index.
Variable NameValuesDefinition
FLOWX_INDEXING_PROCESSINSTANCE_SHARDS1The number of shards for the Elasticsearch index storing process instances
  • FLOWX_INDEXING_PROCESSINSTANCE_REPLICAS: Set the number of replicas for the index.
Variable NameValuesDefinition
FLOWX_INDEXING_PROCESSINSTANCE_REPLICAS1The number of replicas for the Elasticsearch index storing process instances

For Kafka indexing, the Kafka Connect with Elasticsearch Sink Connector must be deployed in the infrastructure.

Elasticsearch Service Sink Connector

Configuration examples

Kafka Connect

  • Assumes kafka cluster installed with strimzi operator and elasticsearch with eck-operator
  • Can save the image built by kafka connect to a local registry and comment build section
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: kafka-connect-kafka-flowx
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.0.0
  replicas: 1
  bootstrapServers: kafka-flowx-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: kafka-flowx-cluster-ca-cert
        certificate: ca.crt
  image: ttl.sh/strimzi-connect-ttlsh266-3.0.0:24h
  config:
    group.id: flowx-kafka-connect
    offset.storage.topic: kafka-connect-cluster-offsets
    config.storage.topic: kafka-connect-cluster-configs
    status.storage.topic: kafka-connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
    topic.creation.enable: true
  build:
   output:
     type: docker
     # This image will last only for 24 hours and might be overwritten by other users
     # Strimzi will use this tag to push the image. But it will use the digest to pull
     # the container image to make sure it pulls exactly the image we just built. So
     # it should not happen that you pull someone else's container image. However, we
     # recommend changing this to your own container registry or using a different
     # image name for any other than demo purposes.
     image: ttl.sh/strimzi-connect-ttlsh266-3.0.0:24h
   plugins:
     - name: kafka-connect-elasticsearch
       artifacts:
         - type: zip
           url: https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-elasticsearch/versions/14.0.6/confluentinc-kafka-connect-elasticsearch-14.0.6.zip
  externalConfiguration:
    volumes:
      - name: elasticsearch-keystore-volume
        secret:
          secretName: elasticsearch-keystore
    env:
      - name: SPRING_ELASTICSEARCH_REST_PASSWORD
        valueFrom:
          secretKeyRef:
            name: elasticsearch-es-elastic-user
            key: elastic

Kafka Elasticsearch Connector

spec:
  class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
  tasksMax: 2
  config:
    tasks.max: "2" # The maximum number of tasks that can be run in parallel for this connector, which is 2 in this case. You can start with 2 and increase in case of huge load, but pay attention to the load that will be produced on Elasticsearch and also to the resources that you allocate to KafkaConnect so that it can support the threads.
    topics: "ai.flowx.core.index.process.v1" # Source Kafka topic. Must be the same as the one declared in the process defined as ${kafka.topic.naming.prefix}.core.index.process${kafka.topic.naming.suffix}
    key.ignore: "false" # Don't change this value! This tells Kafka Connect (KC) to process the key of the message - it will be used as the ID of the object in Elasticsearch.  
    schema.ignore: "true" # Don't change this value!!! This tells KC to ignore the mapping from the Kafka message. Elasticsearch will use internal mapping. See below. 
    connection.url: "https://elasticsearch-es-http:9200" # The URL to Elasticsearch. You should configure this.
    connection.username: "elastic" # The username to authenticate with Elasticsearch. You should configure this.
    connection.password: "Yyh03ZI66310Hyw59MXcR8xt" # The password to authenticate with Elasticsearch. You should configure this.
    elastic.security.protocol: "SSL" # The security protocol to use for connecting to Elasticsearch. You should use SSL if possible.
    elastic.https.ssl.keystore.location: "/opt/kafka/external-configuration/elasticsearch-keystore-volume/keystore.jks" # You should configure the path to the keystore where the Elasticsearch key is added.
    elastic.https.ssl.keystore.password: "MPx57vkACsRWKVap" # The password for the keystore file. You should configure this.
    elastic.https.ssl.key.password: "MPx57vkACsRWKVap" # The password for the key within the keystore file.
    elastic.https.ssl.keystore.type: "JKS" # The type of the keystore file. It is set to "JKS" (Java KeyStore).
    elastic.https.ssl.truststore.location: "/opt/kafka/external-configuration/elasticsearch-keystore-volume/keystore.jks" # you should configure the path to the keystore where the Elasticsearch key is added.
    elastic.https.ssl.truststore.password: "MPx57vkACsRWKVap" # The password for the truststore file. You should configure this
    elastic.https.ssl.truststore.type: "JKS" # The type of the truststore file. It is set to "JKS".
    elastic.https.ssl.protocol: "TLS" # The SSL/TLS protocol to use for communication. It is set to "TLS".
    batch.size: 1000 # The size of the message batch that KC will process. This should be fine as default value. If you experience slowness and want to increase the speed, changing this may help but it will be based on your scenario. Consult the documentation for more details.
    linger.ms: 1 # #Start with this value and change it only if needed. Consult the documentation for connector for more details.
    read.timeout.ms: 30000 # Increased to 30000 from the default 3000 due to flush.synchronously = true.
    flush.synchronously: "true" # Don't change this value! The way of writing to Elasticsearch. It must stay "true" for the router below to work.
    drop.invalid.message: "true" # Don't change this value! If set to false, the connector will wait for a configuration that allows processing the message. If set to true, the connector will drop the invalid message.
    behavior.on.null.values: "IGNORE" # Don't change this value! Must be set to IGNORE to avoid blocking the processing of null messages.
    behavior.on.malformed.documents: "IGNORE" # Don't change this value!  Must be IGNORE to avoid blocking the processing of invalid JSONs.
    write.method: "UPSERT" # Don't change this value!  UPSERT to create or update the index.
    type.name: "_doc" # Don't change this value! This is the name of the Elasticsearch type for indexing. 
    key.converter: "org.apache.kafka.connect.storage.StringConverter" # Don't change this value!
    key.converter.schemas.enable: "false" # Don't change this value! No schema defined for the key in the message.
    value.converter: "org.apache.kafka.connect.json.JsonConverter" # Don't change this value!
    value.converter.schemas.enable: "false" # Don't change this value! No schema defined for the value in the message body.
    transforms: "routeTS" # Don't change this value! This represents router that helps create indices dynamically based on the timestamp (process instance start date).
    transforms.routeTS.type: "org.apache.kafka.connect.transforms.TimestampRouter" # Don't change this value! It helps with routing the message to the correct index.
    transforms.routeTS.topic.format: "process_instance-${timestamp}" # You should configure this. It is important that this value must start with the value defined in process-engine config: flowx.indexing.processInstance.index-name. The name of the index will start with a prefix ("process_instance-" in this example) and must have the timestamp appended after for dynamically creating indices. For backward compatibility (utilizing the data in the existing index), the prefix must be "process_instance-". However, backward compatibility isn't specifically required here.
    transforms.routeTS.timestamp.format: "yyyyMMdd" # This format ensures that the timestamp is represented consistently and can be easily parsed when creating or searching for indices based on the process instance start date. You can change this with the value you want. If you want monthly indexes set it to "yyyyMM". But be aware that once you set it, when you change it, the existing object indexed will not be updated anymore. The update messages will be threated as new objects and indexed again because they are going to be sent to new indexes. This is important! Try to find your index size and stick with it.

HTTP indexing

flowx:
  indexing:
    enabled: true
    processInstance:
      indexing-type: http
      index-name: process_instance
      shards: 1
      replicas: 1

If you don’t want to remove the existing configuration parameters, you can use the following example:

spring:
  elasticsearch:
    index-settings:
      name: process_instance
      shards: 1
      replicas: 1
flowx.use-elasticsearch: true
flowx:
  indexing:
    enabled: ${flowx.use-elasticsearch}
    processInstance:
      indexing-type: http
      index-name: ${spring.elasticsearch.index-settings.name}
      shards: ${spring.elasticsearch.index-settings.shards}
      replicas: ${spring.elasticsearch.index-settings.replicas}

Querying Elasticsearch

To read from multiple indices, queries in Elasticsearch have been updated. The queries now run against an index pattern that identifies multiple indices instead of a single index. The index pattern is derived from the value defined in the configuration property:

flowx.indexing.processInstance.index-name

Kafka topics - process events messages

This topic is used for sending the data to be indexed from Process engine. The data from this topic will be read by Kafka Connect.

  • Key: ${kafka.topic.process.index.out}
  • Value: ${kafka.topic.naming.prefix}.core.index.process${kafka.topic.naming.suffix}
Default parameter (env var)Default FLOWX.AI value (can be overwritten)
KAFKA_TOPIC_PROCESS_INDEX_OUTai.flowx.dev.core.index.process.v1

The topic name, defined in the value, will be used by Kafka Connect as source for the messages to be sent to Elasticsearch for indexing.

The attribute indexLastUpdatedTime is new and will be populated for the kafka-connect strategy. This will tell have the timestamp when last operation was done on the object in the index.

Elasticsearch update (index template)

Since mappings between messages and Elasticsearch data types are unknown, a mapping needs to be specified. This is achieved through an index template created by the process engine during startup. The template applies to indices starting with the value defined in flowx.indexing.processInstance.index-name config. Here’s an example of the index template:

//process_instance_template
{
    "index_patterns": ["process_instance*"],
    "priority": 300,
    "template":  
    {
	  "mappings": {
	    "_doc": {
	      "properties": {
	        "_class": {
	          "type": "keyword",
	          "index": false,
	          "doc_values": false
	        },
	        "dateStarted": {
	          "type": "date",
	          "format": "date_optional_time||epoch_millis"
	        },
	        "id": {
	          "type": "text",
	          "fields": {
	            "keyword": {
	              "type": "keyword",
	              "ignore_above": 256
	            }
	          }
	        },
	        "indexLastUpdatedTime": {
	          "type": "date",
	          "format": "date_optional_time||epoch_millis"
	        },
	        "keyIdentifiers": {
	          "type": "nested",
	          "include_in_parent": true,
	          "properties": {
	            "_class": {
	              "type": "keyword",
	              "index": false,
	              "doc_values": false
	            },
	            "key": {
	              "type": "text",
	              "fields": {
	                "keyword": {
	                  "type": "keyword",
	                  "ignore_above": 256
	                }
	              }
	            },
	            "originalValue": {
	              "type": "text",
	              "fields": {
	                "keyword": {
	                  "type": "keyword",
	                  "ignore_above": 256
	                }
	              }
	            },
	            "path": {
	              "type": "text",
	              "fields": {
	                "keyword": {
	                  "type": "keyword",
	                  "ignore_above": 256
	                }
	              }
	            },
	            "value": {
	              "type": "text",
	              "fields": {
	                "keyword": {
	                  "type": "keyword",
	                  "ignore_above": 256
	                }
	              }
	            }
	          }
	        },
	        "processDefinitionName": {
	          "type": "text",
	          "fields": {
	            "keyword": {
	              "type": "keyword",
	              "ignore_above": 256
	            }
	          }
	        },
	        "state": {
	          "type": "text",
	          "fields": {
	            "keyword": {
	              "type": "keyword",
	              "ignore_above": 256
	            }
	          }
	        }
	      }
	    }
	  },
        "settings":{
        "number_of_shards":5, //This value will be overwritten by the value that you set at `FLOWX_INDEXING_PROCESSINSTANCE_SHARDS` environment variable.
        "number_of_replicas":1
        }
    }
}

Here are some guidelines to help you get started:

Configuration guidelines