> ## Documentation Index
> Fetch the complete documentation index at: https://docs.flowx.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Creating a Kafka producer

> This guide focuses on creating a **Kafka** producer using Spring Boot.

Here are some tips, including the required configurations and code samples, to help you implement a Kafka producer in Java.

## Required dependencies

Ensure that you have the following dependencies in your project:

```xml theme={"system"}
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-client</artifactId>
    <version>0.6.1</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.1</version>
</dependency>

<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>0.1.13</version>
</dependency>
```

## Configuration

Ensure that you have the following configuration in your `application.yml` or `application.properties` file:

```yaml theme={"system"}
spring:
  kafka:
    bootstrap-servers: localhost:9092
    security.protocol: "PLAINTEXT"
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        message:
          max:
            bytes: ${KAFKA_MESSAGE_MAX_BYTES:52428800} #50MB
        max:
          request:
            size: ${KAFKA_MESSAGE_MAX_BYTES:52428800} #50MB
```

## Code sample for a Kafka producer

<Warning>
  Ensure that you have the necessary KafkaTemplate bean autowired in your producer class. The sendMessage method demonstrates how to send a message to a Kafka topic with the specified headers and payload. Make sure to include all the received Kafka headers in the response that is sent back to the **FlowX Engine**.
</Warning>

```java theme={"system"}
private final KafkaTemplate<String, Object> kafkaTemplate;

public void sendMessage(String topic, Headers headers, Object payload) {
  ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, payload);
  // make sure to send all the received headers back to the FlowX Engine
  headers.forEach(header -> producerRecord.headers().add(header));
  kafkaTemplate.send(producerRecord);
}
```

## Kafka headers

### Understanding Kafka headers in FlowX Integration

When integrating with FlowX Engine via Kafka, headers play a crucial role in message routing and processing. It's essential to preserve and include all received headers in responses back to the FlowX Engine.

<Info>
  **Need help finding identifier values?** Check out the [Finding Identifiers and Parameters](./finding-identifiers-and-parameters) guide to learn where to locate workspace IDs, application IDs, build IDs, process instance UUIDs, and other identifiers in FlowX Designer.
</Info>

### Important headers

| Header                                    | Purpose                                                              | Consequences If Missing                                                              |
| ----------------------------------------- | -------------------------------------------------------------------- | ------------------------------------------------------------------------------------ |
| `Fx-Organization-Id`                      | Organization UUID for the target deployment                          | **Message rejected** — consumers throw `MissingKafkaHeaderException` (non-retryable) |
| `Fx-Workspace-Id`                         | Identifies the workspace context for multi-tenant environments       | Workspace isolation may be compromised; data queries may fail                        |
| `jwt`                                     | JWT token for authenticated operations (process start, task actions) | Operations requiring user context will fail authorization                            |
| `fxContext`                               | Identifies the target process/subprocess in the hierarchy            | Messages may be incorrectly routed or not processed                                  |
| `Fx-AppId`                                | Identifies the application processing the message                    | Application context may be lost                                                      |
| `Fx-RootAppId`                            | Identifies the original application that initiated the process chain | Process origin tracking may be lost; important for complex process hierarchies       |
| `Fx-Build-App-Version-Id`                 | Contains the application version identifier for data querying        | Version-specific behaviors may be affected; data queries may fail                    |
| `processInstanceId`/`processInstanceUuid` | Primary keys for message correlation                                 | Message correlation may fail                                                         |
| `Fx-ProcessName`                          | Required for cross-process communication                             | "Start another process via Kafka" functionality may break                            |

### The Fx-Organization-Id header

All FlowX Kafka consumers validate the `Fx-Organization-Id` header on every incoming message. Messages without it are **rejected immediately** with a non-retryable `MissingKafkaHeaderException`.

Set this header to the Organization UUID for your deployment. In self-hosted deployments, this is the same `ORGANIZATION_ID` value configured on all services.

```java theme={"system"}
producerRecord.headers().add("Fx-Organization-Id",
    "26ef7b5f-b463-4375-88c8-xxxxxxxxxxxx".getBytes(StandardCharsets.UTF_8));
```

### The fxContext header explained

The `fxContext` header is particularly important for routing messages in architectures with embedded processes and subprocesses:

* For kafka-receive nodes in the root process: `fxContext = "main"`
* For an embedded subprocess with nodeId=4: `fxContext = "main:4"`
* For an embedded sub-subprocess with nodeId=12: `fxContext = "main:4:12"`

This hierarchical format ensures messages are delivered to their intended recipients within the process structure.

### Understanding Fx-RootAppId

The `Fx-RootAppId` header is used to track the originating application throughout the entire process chain. This is particularly important when:

* Multiple applications are involved in a workflow
* Processes spawn subprocesses across different components
* You need to trace a complete transaction back to its originating application

Unlike `Fx-AppId` which may change as a message passes through different components, `Fx-RootAppId` preserves the original initiator's identity.

### Best Practices

1. **Preserve All Headers**: Always include all received Kafka headers in responses to the FlowX Engine.
   ```java theme={"system"}
   headers.forEach(header -> producerRecord.headers().add(header));
   ```
