Kafka connection examples
AVRO nput
Here is an example of uploading the data from a Kafka topic with the message value stored as Avro. The key converter is set to StringConverter, but the key information is not being used.
The connector upload rules are
10MB file or
100000 records or
30 seconds since the last write to cater for no more records available for the time being
Also, it flushes the parquet file every 1000 records. It’s when the flush happens that the file size check is updated.
name=kafka2ems connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector tasks.max=1 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 topics=payments connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items connect.ems.target.table=payments connect.ems.connection.id=**** connect.ems.commit.size.bytes=10000000 connect.ems.commit.records=100000 connect.ems.commit.interval.ms=30000 connect.ems.tmp.dir=/tmp/ems connect.ems.authorization.key="AppKey ***" connect.ems.error.policy=RETRY connect.ems.max.retries=20 connect.ems.retry.interval=60000 connect.ems.parquet.write.flush.records=1000 connect.ems.debug.keep.parquet.files=false
JSON Input
Here is an example of uploading the data from a Kafka topic with the message value stored as JSON. The key converter is set to StringConverter, but the key information is not being used.
The connector upload rules are
10MB file or
100000 records or
30 seconds since the last write to cater for no more records available for the time being
Also, it flushes the parquet file every 1000 records. It’s when the flush happens that the file size check is updated.
name=kafka2ems connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector tasks.max=1 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter topics=payments connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items connect.ems.target.table=payments connect.ems.connection.id=**** connect.ems.commit.size.bytes=10000000 connect.ems.commit.records=100000 connect.ems.commit.interval.ms=30000 connect.ems.tmp.dir=/tmp/ems connect.ems.authorization.key="AppKey ***" connect.ems.error.policy=RETRY connect.ems.max.retries=20 connect.ems.retry.interval=60000 connect.ems.parquet.write.flush.records=1000 connect.ems.debug.keep.parquet.files=false
Primary key(s)
Specifies a set of fields from the incoming payload, which should be used as Primary Keys in Celonis. If this is not provided, then all the fields are used.
// single field ... connect.ems.data.primary.key=customer_id ... // Composite PK ... connect.ems.data.primary.key=name,address ...
Please refer to the primary keys documentation to learn about the best practices.
Overwrite the order field when using primary key(s)
If your data already contains an ordered field, use it since it will improve the performance and leads to less disk space required in Celonis Platform.
Here is an example configuration when a field timestamp guarantees that two records with the same PK won't share the same value:
... connect.ems.data.primary.key=customer_id connect.ems.order.field.name=processed_ts ... // Composite PK ... connect.ems.data.primary.key=name,address connect.ems.order.field.name=processed_ts ...
Please refer to the primary keys documentation to learn about the best practices.
Fix obfuscation
All the obfuscated fields are uploaded to Celonis Platform as *****. In this example _the creditcard and ssn fields are obfuscated.
... name=kafka2ems connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector connect.ems.obfuscation.method="fix" connect.ems.obfuscation.fields="credit_card, ssn" ...
SHA1 obfuscation
All the fields are encrypted with SHA1, and the result is converted to a hex string. For example the text "this is a test" will end up translated to "9938a75e6d10a74d6b2e9bc204177de5b95f28fe".
In this example _the creditcard and ssn fields are obfuscated.
... connect.ems.obfuscation.method="sha1" connect.ems.obfuscation.fields="credit_card, ssn" ...
SHA512 obfusction
All the fields are encrypted with SHA512, and the result is converted to a hex string. In this example _the creditcard and ssn fields are obfuscated.
... connect.ems.obfuscation.method="sha512" connect.ems.obfusation.sha512.salt="customerdefined salt" connect.ems.obfuscation.fields="credit_card, ssn" ...