Skip to main content

Using the Celonis Kafka connector

Important

Any references to third-party products or services do not constitute Celonis Product Documentation nor do they create any contractual obligations. This material is for informational purposes only and is subject to change without notice.

Celonis does not warrant the availability, accuracy, reliability, completeness, or usefulness of any information regarding the subject of third-party services or systems.

The Celonis Kafka connector is a sink plugin, designed specifically to execute through the Kafka Connect runtime. The Celonis Kafka connector's purpose is to continuously read records from one or multiple source topics, transform them into a canonical format, and then load it into the Celonis Platform via the Standard Data Ingestion API.

Tip

Refer to the official Kafka Connect documentation for instructions on how to install Kafka Connect plugins.

Using the Celonis Kafka connector requires a Celonis Platform data pool. For more information on data pools, see Creating and managing data pools.

Download the latest version of the Celonis Kafka connector from the Download Portal. This is listed as Kafka EMS sink connector:

  1. Follow the instructions on Download portal.

  2. Find the Kafka EMS sink connector section, and select the JAR link to start the download.

    A screenshot showing how to access the Kafka EMS connector in the download portal.

To set up the Celonis Kafka connector connection:

  1. In the Celonis Platform, go to Data > Data Integrations, and select your data pool.

  2. In your data pool diagram, select Data Connections > Add Data Connection.

    A screenshot showing how to add a data connection within a data pool.
  3. Select Push data into Celonis.

    A screenshot showing Add Data Connection window highlighting the location of the Push data into Celonis option.
  4. In the Configure your Data Ingestion application window, define the following configuration details:

    • Name: Provide a name for the ingestion application.

    • Source Table Name: Provide the Kafka topic name.

    • Target Table Name: Provide the name you want to use for this table in the data pool. By default, this is same value as Source Table Name.

      Tip

      If your data uses nested tables, we recommend using a table schema to define the remaining fields, as described in the next step.

    • Primary Key: Provide the unique identifier column(s) for each record that uniquely identifies a row. Press Space after an entry to add additional ones.

      Note

      If you do not assign primary key to tables, the Data Ingestion pipeline automatically creates a synthetic column called _celonis_id  and uses it as the primary key for unnested tables.

    • Age Columns: Provide the name of the column(s) that specifies the version or timestamp of the row. Press Space after an entry to add additional ones.

      Note

      Incoming records are processed only if the Age Columns value is greater than or equal to (≥) the existing record’s value for the Primary Key.

  5. If your data uses nested tables, define a table schema:

    Note

    If your data is flat/normalized, you do not need to define a table schema.

    If your data is nested, we recommend using one representative record from the Kafka topic to configure the table schema, which is used as the underlying ingestion logic.

    1. Select Options (⋮) > Configure Table Schema.

      A new window opens.

    2. In the left-side code block, paste your JSON schema. If it is valid, the target table structure on the right side is updated with the corresponding column names.

      Note

      If you are getting the error message:

      An unexpected error occurred. Please send this error code 'undefined' to Celonis Customer Support.

      It is typically because the JSON payload is invalid. Verify that the JSON follows a valid structure and matches the schema defined in your configuration.

    3. Select the checkboxes (or combinations) in Primary Key and Age Column to define each.

    4. Select Finish. The Primary Key and Age column values defined as part of the schema configuration are automatically applied.

  6. As needed, you can define multiple table configurations by selecting + Add and repeating the previous instructions.

  7. Important

    Before saving your configuration, it is critical to verify all Primary Key values for tables are correct. After saving the configuration, it is not possible to change the Primary Key values of an existing table.

    In the Configure your Data Ingestion application window, verify all values are correct, and select Save. This opens the Ingestion application credentials window.

  8. In the the Ingestion application credentials window, copy the Access Key and Access Secret to a safe location. These are the required credentials your Kafka Connector will use to authenticate with the Celonis Platform when pushing data.

    Important

    This is the only time the Access Key and Access Secret will be accessible. If you do not retain them, you must create a new configuration.

  9. Retrieve and update the Celonis Kafka connector configuration from the UI:

    1. Select Options (⋮) > S3 Clients configuration:

      A screenshot showing how to access the Kafka connector S3 client configuration..
    2. In the S3 clients configuration window, select the Kafka Connector–JSON tab.

    3. Select Copy to clipboard, and edit it on your local device.

    4. Update connect.ems.uploader.s3.access.key with your Access Key and connect.ems.uploader.s3.access.secret with your Access Secret.

    5. Add the following values:

      connect.ems.commit.interval.ms=600000 
      connect.ems.commit.record
      s=10000
      connect.ems.commit.size.bytes=25000000

      Important

      These parameters act as a "flush trigger" that ensures data is committed to S3 whenever a specific time, record count, or file size threshold is reached. Without them, the connection may use default settings that could lead to significant data delays, inefficiently sized files, or system crashes during high-traffic spikes.

    6. Update the S3 client configuration JSON as need based on your use case. Refer to Additional Celonis Kafka connector configuration details and Configuration reference for the Kafka EMS sink Connector for specific configuration options.

    7. Save your S3 client configuration.

    You can now use this configuration for the Celonis Kafka connector.

    Note

    If you make changes in the Data Connection (e.g. add a new table), the configuration only gets updated within Celonis.

    To make changes effective, you need to replace your existing S3 client configuration with the new configuration (ensuring you follow the same steps above), and restart your Celonis Kafka connector.

  10. Start the connector with your S3 client configuration.

You can now use the Standard Data Ingestion API with the Celonis Kafka connector.

Note

For additional configuration references, see Configuration reference for the Kafka EMS sink Connector.

Using Kafka from a managed service provider

If you are using Kafka or Kafka Connect from a managed service provider, follow the instructions from your service:

  • AWS MSK Connect: To create the connector, follow the steps provided here. The steps involved will require installing a custom connector, and for that, follow this link and use the connector release jar.

  • Confluent: Connector installation will be performed using Confluent Hub. Follow the instructions here to enable the Kafka Connect sink.

  • Azure event hub: If you're running Event Hub, you can leverage Kafka Connect and the Celonis Kafka connector to load data into Celonis Platform. The instructions to enable Kafka Connect for Event Hub can be found here. Once the installation is done, follow the manual steps to enable the connector before creating an instance of it.

Reading unstructured JSON data

The Celonis Kafka connector ships with a special serializer that dynamically infers a schema based on the structure of the incoming JSON records and converts them in the canonical representation used internally by the Celonis Kafka connector.

value.converter = cloud.celonis.kafka.connect.json.SchemaInferringJsonConverter

Reading data in AVRO format

If your data is in AVRO format, or another format supported by Confluent (i.e. Protobuf, Thrift), the value.converter key needs to be set to the appropriate Confluent converter and point to a reachable Schema Registry instance.

value.converter = io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url = http://<SCHEMA-REGISTRY-HOST>:<PORT>

Reading from multiple source topics

A single instance of the Celonis Kafka connector can read from multiple source topics, provided that the data they hold is in the same format (e.g. all AVRO).

tasks.max = 3
topics=sourceA,sourceB,sourceC
# or alternatively topics.regex=source.*

By default, the Celonis Kafka connector persists records for each source topic in a Celonis Platform table with the same name.

Tip

When reading data from multiple topics, or from a simple topic that has multiple partitions, you may improve throughput by setting tasks.max with a value greater than one, thus allowing the Kafka Connect runtime to allocate up to N tasks to the connector instance.

Ensuring partition affinity

By default, the Kafka producer determines the target partition based on the message Key raw bytes values. To compute the target partition, it hashes the value using Murmur3, and then it does a modulo-Number of partitions.

Therefore to ensure the records with the same primary key end up always in the same partition, the Kafka message Key should contain only the primary key(s) values.

Note

The Celonis Kafka connector does not consider the Key fields.

You must add the primary key(s) to the record Value, either at produce time (duplicating data between the Key and the Value) or at the Celonis Kafka connector level by using Kafka Connect Single Message Transform, which will transfer fields from the Key into the Value (a reverse of this).