Here are some tips, including the required configurations and code samples, to help you implement a Kafka producer in Java.

Required dependencies

Ensure that you have the following dependencies in your project:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-client</artifactId>
    <version>0.6.1</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.1</version>
</dependency>

<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>0.1.13</version>
</dependency>

Configuration

Ensure that you have the following configuration in your application.yml or application.properties file:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    security.protocol: "PLAINTEXT"
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        message:
          max:
            bytes: ${KAFKA_MESSAGE_MAX_BYTES:52428800} #50MB
        max:
          request:
            size: ${KAFKA_MESSAGE_MAX_BYTES:52428800} #50MB

Code sample for a Kafka producer

Ensure that you have the necessary KafkaTemplate bean autowired in your producer class. The sendMessage method demonstrates how to send a message to a Kafka topic with the specified headers and payload. Make sure to include all the received Kafka headers in the response that is sent back to the FlowX Engine.

private final KafkaTemplate<String, Object> kafkaTemplate;

public void sendMessage(String topic, Headers headers, Object payload) {
  ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, payload);
  // make sure to send all the received headers back to the FlowX Engine
  headers.forEach(header -> producerRecord.headers().add(header));
  kafkaTemplate.send(producerRecord);
}

Kafka headers

Understanding Kafka headers in FlowX Integration

When integrating with FlowX Engine via Kafka, headers play a crucial role in message routing and processing. It’s essential to preserve and include all received headers in responses back to the FlowX Engine.

Important headers

HeaderPurposeConsequences If Missing
fxContextIdentifies the target process/subprocess in the hierarchyMessages may be incorrectly routed or not processed
Fx-AppIdIdentifies the application processing the messageApplication context may be lost
Fx-RootAppIdIdentifies the original application that initiated the process chainProcess origin tracking may be lost; important for complex process hierarchies
Fx-BuildIdContains the build identifier for versioning and traceabilityVersion-specific behaviors may be affected; debugging becomes more difficult
processInstanceId/processInstanceUuidPrimary keys for message correlationMessage correlation may fail
Fx-ProcessNameRequired for cross-process communication”Start another process via Kafka” functionality may break

The fxContext header explained

The fxContext header is particularly important for routing messages in architectures with embedded processes and subprocesses:

  • For kafka-receive nodes in the root process: fxContext = "main"
  • For an embedded subprocess with nodeId=4: fxContext = "main:4"
  • For an embedded sub-subprocess with nodeId=12: fxContext = "main:4:12"

This hierarchical format ensures messages are delivered to their intended recipients within the process structure.

Understanding Fx-RootAppId

The Fx-RootAppId header is used to track the originating application throughout the entire process chain. This is particularly important when:

  • Multiple applications are involved in a workflow
  • Processes spawn subprocesses across different components
  • You need to trace a complete transaction back to its originating application

Unlike Fx-AppId which may change as a message passes through different components, Fx-RootAppId preserves the original initiator’s identity.

Best Practices

  1. Preserve All Headers: Always include all received Kafka headers in responses to the FlowX Engine.
    headers.forEach(header -> producerRecord.headers().add(header));