Celonis Kafka Connector FAQ
I have data in the message key. How can I make sure that data is uploaded?
Kafka Connect offers another set of plugins called Single Message Transform, which allows users to change the data shape by adding or removing fields or moving them between the key and the value.
Information and examples can be found here:
I want to replay my data
At times, there's a need to re-process the data from the Kafka topic(s). The connector uses a Kafka consumer group, which means replaying the data requires changing the consumer group offsets to the ones required. To do so, follow these steps.
Stop the connector
The connector instance name is used as part of the consumer group name. For example, if my connector instance name is my_ems_sink, then the consumer group name is connect-my_ems_sink
Reset the consumer group offsets.
Restart the connector
//reset the offset on topic foo and partition 0 to the value 100 kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group connect-my_ems_sink --reset-offsets --topic foo:0 --to-offset 100 --execute //reset all the connector topics to earliest kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group connect-my_ems_sink --reset-offsets --to-earliest --topic foo --execute //reset to a specific point in time kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group connect-my_ems_sink --reset-offsets --to-datetime 2022-06-01T00:00:00Z --topic foo --execute // move back the current offset by 10 kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group connect-my_ems_sink --reset-offsets --shift-by -10 --topic foo --execute
I want to sink more than one topic into Celonis Platform
Currently, one connector instance accepts only one output Celonis Platform storage (see configuration connect.ems.target.table). The solution is to create a connector instance for each required topic.
// first connector instance configuration name=ems-first_topic topics=first_topic connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items connect.ems.target.table=first_topic ... // second connector instance configuration name=ems-second_topic topics=second_topic connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items connect.ems.target.table=second_topic ...