Kafka operation
The Kafka operations define how to interact with the Kafka server and represent a specific action, such as Commit Offset, Consume and Produce, to publish, and subscribe to messages from an Apache Kafka message broker.
You must create a separate component for each action and object combination that your integration requires. The Kafka connector operations support the following actions:
- Commit Offset
- Consume
- Produce
- Listen
Commit Offset
In the Kafka application, the offset is a type of metadata representing the position of a message received in a particular partition. Each message within a given partition has a unique Offset value represented by an integer.
The Commit Offset operation in the Kafka connector commits an offset value to denote the position of the next message to consume within the partition.
Use this operation only when retreiving messages with the Consume operation where the Autocommit option is false.
The Object Type for the Commit Offset operation represents a Kafka partition in which to commit the message. Upon browsing for an object, you can choose to select an Object Type from the list of all topics provided by the Kafka service, or to select the Allow Dynamic Topic option.
The Allow Dynamic Topic selection limits the list of browsable options to display Dynamic Topic as the only option. If you select this option, you must specify the topic for each message in the topic field in JSON request or Dynamic Operation Property or Topic Name Operation Property. If the topic is not specified then the document results in an error.
If you select a topic as an Object Type rather than the Dynamic Topic option, the connector ignores any topic value within the topic field in the JSON request or Dynamic Operation Property or Topic Name Operation Property.
To select the dynamic topic name, the connector follows a specific hierarchy:
- JSON Request: The connector first checks the JSON request for the topic name. This has the highest priority.
- Dynamic Operation Property Value: If the topic name isn't found in the JSON payload, the connector checks the Dynamic Operation Property value.
- Topic Name Operation Property Value: If neither of the previous options provides the topic name, the connector uses the Topic Name Operation Property value.
Operation properties
To successfully perform a Commit Offset operation, specify the following properties on the Options tab:
Tracking Direction - Select the document tracking direction for the operation, either Input Documents or Output Documents. This setting enables you to choose which document appears in Process Reporting. Start steps always track output documents regardless of your selection.
If the tracking direction is read-only, the feature to change the direction is either unavailable or the developer set the configuration to read-only. The default value you see shows you which document appears in Process Reporting.
Client ID - The unique identifier used to identify the client application in Kafka. In this case, enter the ID associated with the Kafka connector.
Consumer Group -
Enter the unique string that identifies the consumer group which the connector belongs to. This property is useful for Kafka rebalancing. See the Kafka documentation for information about rebalancing. This property must be defined either in the Connection tab or within the individual operations or as a Dynamic Operation Property to avoid throwing an error. If it is specified in two or more places then order of precedence is Dynamic Operation Property > Operation Property > Connection Property.
When importing a JSON profile using the Commit Offset operation, the Kafka connector expects the document to contain the following fields:
Topic Name - The name of the topic that consumes the message.
The Topic Name can now be imported dynamically across all operations. This is to allow for importing topics that you may not have a connection to browse.
Partition ID - The unique identifier of the partition in which to commit the offset. Note that offsets are per partition.
Partition IDs - An operation text field that is only available when Manually assign partitions is selected. User input to this field must be a whole number greater than or equal to zero.
Offset ID - Assigned by the cluster, the assigned unique identifier to the consumed message. The offset ID also represents the position of the message within the partition.
Metadata - Specifies the Metadata that the Kafka connector sends for the commit.
The Commit Offset operation returns an empty success for each document processed successfully. Likewise, the operation returns an error for each document that is unable to successfully commit a message.
Consume
In the Kafka connector, the Consume operation is an inbound action used to retrieve messages from a Kafka topic. The Object Type for the Consume operation represents all topics provided by the Kafka service from which it consumes messages.
Upon browsing for an object, you can choose to select an Object Type from the list of all topics provided by the Kafka service or select the Allow Dynamic Topic option. If the topic is not specified then the document results in an error.
If you select a topic as an Object Type rather than the Dynamic Topic option, the connector ignores any topic value within the Dynamic Operation Property or Topic Name Operation Property.
The Allow Dynamic Topic selection limits the list of browsable options to display Dynamic Topic as the only option. If you select this option, you must specify the topic name in the Dynamic Operation property or Topic Name Operation Property value.
To select the dynamic topic name, the connector follows a specific hierarchy:
- Dynamic Operation Property Value: This is the highest priority.
- Topic Name Operation Property Value: If no value is provided in the Dynamic Operation Property for Topic Name, the connector uses the value set in the Topic Name Operation Property.
To successfully perform a Consume operation, specify the following properties on the Options tab:
Tracking Direction - Select the document tracking direction for the operation, either Input Documents or Output Documents. This setting enables you to choose which document appears in Process Reporting. Start steps always track output documents regardless of your selection.
If the tracking direction is read-only, the feature to change the direction is either unavailable or the developer set the configuration to read-only. The default value you see shows you which document appears in Process Reporting.
Return Application Error Responses - This setting controls whether an application error prevents an operation from completing:
- If cleared, the process aborts and reports the error on the Process Reporting page.
- If selected, processing continues and passes the error response to the next component to be processed as the connection output.
Client ID -
The unique identifier used to identify the client application in Kafka. In this case, enter the associated ID with the Kafka connector.
Consumer Group - Enter the unique string that identifies the consumer group which the connector belongs to. This property is useful for Kafka rebalancing. See the Kafka documentation for information about rebalancing. This property must be defined either in the Connection tab, or within the individual operations or as a Dynamic Operation Property to avoid throwing an error. If it is specified in two or more places then order of precedence is Dynamic Operation Property > Operation Property > Connection Property.
Receive Message Timeout - The length of time in milliseconds (the default is 30000) that the operation waits to receive messages.
If the operation receives all messages from the topic and does not reach the minimum number of messages, the operation waits until the time-out is exhausted to continue to get newly posted messages.
Manually assign partitions - Select this option to assign specific partition ranges where messages will be read. Selecting this field triggers the conditional text field Partition IDs.
Message Headers - A tracked property group that returns message headers.
Minimum Number of Messages - (Required) The number of messages to consume from the Kafka topic. Enter a value of zero (0) or greater.
- A value of 0 gets all messages from the Kafka topic.
- A value other than 0 gets only the specified number of messages.
Consume polls for messages in a loop until reaching at least the value specified for Minimum Number of Messages. By default, each poll returns 1 message. When there are many messages to be returned, you can increase the container property value com.boomi.connector.kafka.max.poll.records and improve performance. The container property takes precedence over the operation value.
Changing the container property can lead to situations where the operation returns a number of records greater than the value set in the operation. For example, setting the container property to 3 and the Minimum Number of Messages to 5 causes the operation to poll the topic twice, resulting in returning 6 messages. If you set the container property to a value greater than Minimum Number of Messages, the operation polls only once, effectively returning more messages than the value configured in the operation.
Autocommit - Select to have the connector automatically commit all retrieved messages. If this field is clear, use the Commit Offset operation to commit messages.
Auto Offset Reset -
Indicates if message consumption starts at the earliest offset (default) or the latest offset when you do not indicate an initial offset in Kafka.
If the operation is successful, it returns messages from the specified Kafka topic. The Consume operation produces the following tracked properties:
Offset ID - Assigned by the cluster, the assigned unique identifier to the consumed message. The offset ID also represents the position of the message within the partition. The default value is 1.
Partition ID - The unique identifier of the partition that stores the message.
Topic Name - The name of the topic that consumes the message.
Message Key - The key associated with the given document. Kafka uses the key to determine where to store the message in a specific partition.
Message Timestamp - The timestamp value associated with the message.
Custom Client Properties – This section allows you to add, edit, delete and encrypt key–value pairs to configure the Kafka connector. By default, no properties are preconfigured, so you need to define them during setup as required.
Produce
The Kafka Produce operation sends streams of data records to topics in the Kafka cluster. The Kafka connector sends the Message Key document property to the broker, where the Kafka server indicates what to do with the message next. If the indicated topic in Topic Name does not exist, it returns an Application Error for that document.
Upon browsing for an object, you can choose to select an Object Type from the list of all topics provided by the Kafka service, or select the Allow Dynamic Topic option. The Allow Dynamic Topic selection limits the list of browsable options to display Dynamic Topic as the only option. If you select this option, you must specify the topic for each message using a Dynamic Operation property or Topic Name Operation Property or the document results in an error.
If you selected a topic as an Object Type rather than the Dynamic Topic option, the connector ignores any Dynamic Operation property or Topic Name Operation Property value.
To select the dynamic topic name, the connector follows a specific hierarchy:
- Dynamic Operation Property Value: This is the highest priority.
- Topic Name Operation Property Value: If no value is provided in the Dynamic Operation Property for Topic Name, the connector uses the value set in the Topic Name Operation Property.
Operation properties
To successfully perform a Produce operation, specify the following properties on the Options tab:
Tracking Direction - Select the document tracking direction for the operation, either Input Documents or Output Documents. This setting enables you to choose which document appears in Process Reporting. Start steps always track output documents regardless of your selection.
If the tracking direction is read-only, the feature to change the direction is either unavailable or the developer set the configuration to read-only. The default value you see shows you which document appears in Process Reporting.
Return Application Error Responses - This setting controls if an application error prevents an operation from completing:
- If cleared, the process aborts and reports the error on the Process Reporting page.
- If selected, processing continues and passes the error response to the next component that needs processing as the connection output.
Client ID - The unique identifier used to identify the client application in Kafka. In this case, enter the associated ID with the Kafka connector.
Producer Mode - This property defines how messages are delivered between the producer and the Kafka user. It provides a drop-down option in the connector configuration. Select Synchronous (default) for standard delivery, or choose Transactional to group multiple messages into a single atomic transaction.
Transactional ID - This property is required when Producer Mode is set to Transactional. It specifies the Transaction ID that groups the current batch of messages into a single Kafka transaction, ensuring atomic delivery across multiple records. The property is dynamic and must be configured at design time for transactional workflows.
Acknowledgments -
Indicates the number of acknowledgments the Kafka server must receive before considering the request complete. The default value is 0, and indicates that it does not need acknowledgments to confirm posting the message to the Kafka server. Selecting the value 1 indicates that a single leader acknowledgment is sufficient to mark the request as complete, while selecting ALL indicates the operation needs acknowledgment from all brokers to consider the request complete.
Compression Type - The format used to compress and send messages to the Kafka server. The Kafka connector supports the GZIP, Snappy, and LZ4 compression protocols.
Maximum Time to Wait - The length of time in milliseconds to wait for publishing the message before failing with a time-out error. The default value is 30,000. Set the value to 0 to have the connector wait until you receive a confirmation or it reached the process time-out.
Headers Properties - A custom properties panel with the option to add key/value pairs as message headers.
Partition ID - A postive integer used to identify specific partitions within a topic. There is no default value and if no input is provided, the connector will allow the broker to decide which partitions messages should be stored in. If a value is set, messages will be stored in the defined partition.
Custom Client Properties - This section allows you to add, edit, delete, and encrypt key–value pairs to configure the Kafka connector. By default, no properties are preconfigured, so you need to define them during setup as required.
Listen
The Kafka Listen operation starts a polling mechanism to retrieve messages from a specific topic in the Kafka cluster. After being initiated by its Listen Manager, the polling subscribes to a topic and polls it regularly based on the Polling Interval and Polling Delay set in the connection. To avoid any potentially extended blocking of its Listen Manager, every run of this operation invokes the poll exactly once, retrieves all available messages in the consumer buffer, and returns them in the payload as soon as possible.
The Listen operation automatically commits received messages upon successful execution of the submitted process. If you don't want a message to be committed, you will have to add an exception step or avoid error handling so the process cannot complete.
Container properties
Listen uses the com.boomi.connector.kafka.max.request.size container property to determine the maximum amount of data the Kafka server returns for each request. For more information, see the topic Kafka connector.
Operation properties
To successfully perform a Listen operation, specify the following properties on the Options tab:
Tracking Direction - Select the document tracking direction for the operation, either Input Documents or Output Documents. This setting enables you to choose which document appears in Process Reporting. Start steps always track output documents regardless of your selection.
If the tracking direction is read-only, the feature to change the direction is either unavailable or the developer set the configuration to read-only. The default value you see shows you which document appears in Process Reporting.
Client ID - The unique identifier (ID) used to identify the client application in Kafka. In this case, enter the associated ID with the Kafka connector.
Consumer Group - Enter the unique string that identifies the consumer group which the connector belongs to. This property is useful for Kafka rebalancing. See the Kafka documentation for information about rebalancing. This property must be defined either here in the Connection tab or within the individual operations to avoid throwing an error.
Manually assign partitions - Select this option to assign specific partition ranges where messages will be read. Selecting this field triggers the conditional text field Partition IDs.
Partition IDs - A conditional text field that is only available when Manually assign partitions is selected. User input to this field must be a whole number greater than or equal to zero.
Maximum Number of Messages per poll -
Specify the maximum number of messages to return from the Kafka message broker for a single poll. This property sets Kafka's max.poll.records to determine the maximum number of returned records in a single request.
Message Delivery Policy - Select the delivery policy to guarantee message delivery between the producer and consumer. At least once indicates that messages are never lost, but can be redelivered.
Message Headers - A tracked property group that returns message headers.
Auto Offset Reset -
Select Earliest in the drop-down to start message consumption at the earliest offset (default) or Latest when no initial offset is indicated in Kafka. Verify that your selection matches the auto.offset.reset option on the Kafka server.
In the unlikely event of a process run not finishing successfully, the connector returns to the last committed offset to help ensure message delivery.
Singleton Listener - A checkbox that determines the listener will be started in only a single node when selected. The listener will be started in all container nodes by default.
Listen output documents
Upon browsing for an object and a successful Listen operation, it creates an output document with a raw binary format with the following tracked properties:
Offset ID - Assigned by the cluster, a unique identifier for the consumed message. The offset ID also represents the position of the message within the partition. The default value is 1.
Partition ID - The unique identifier of the partition that stores the message.
Topic Name - The name of the topic that consumes the message.
Message Key - The key associated with the given document. Kafka uses the key to determine where to store a message in a particular partition.
Message Timestamp - A tracked property to return the timestamp value of a message.
Custom Client Properties – This section allows you to add, edit, delete, and encrypt key–value pairs to configure the Kafka connector. By default, no properties are preconfigured, so you need to define them during setup as required.