Features
Delivery semantics
The sink connector makes use of Kafka’s message commit functionality and is therefore able to guarantee at-least-once semantics.
The connector first collects consumed records in a task cache and delivers them to the target service in time intervals established by the connect worker configuration property offset.flush.interval.ms. Moreover, deliveries can be requested outside of this interval based on the number of records accumulated in the task cache with the configuration property sap.odata.flush.trigger.size.
Dead Letter Queue
The sink connector supports the Dead Letter Queue (DLQ) functionality.
Graceful backoff
In case of connection or communication issues with the configured OData v4 service endpoint, the sink connector applies a retry backoff strategy similar to the source connector.
- In single record mode, the sink connector retries a request if any HttpClient transport exception occurs or if a response with the client error code 408 or any server error code > 500 is returned.
- In batch mode, the same rule applies for the $batch request. Errors contained in the batch response do lead to a connector stop and are not retried.
Operation mode
The sink connector currently offers the operation modes Insert, Update and Dynamic, which can be set through the configuration property sap.odata.operation.mode. The sink connector is currently limited to standard OData v4 CRUD operations, therefore an Upsert mode is currently not supported, but can be offered by the OData service provider as part of the insert or update implementation.
- In Insert mode the sink connector utilizes the HTTP POST method to export entries. Depending on the OData v4 service implementation, this may result in failures if the entry to be inserted already exists in the target system.
The OData standard provides support for deep inserts, which allows nested business objects to be inserted by an OData service as multiple entities along with their associations to each other. See OData deep inserts for more details. - In Update mode the sink connector uses the HTTP PATCH method to partially update OData entities. As a result, only the fields that are available in the record will be updated in the target system, while any missing record fields will be ignored. Depending on the OData v4 service implementation this might lead to failures if the entry to be updated is not available in the target system.
- In Dynamic mode the sink connector uses either HTTP POST or HTTP PATCH to export records. The value of the record field provided by the configuration property sap.odata.operation.mode.property is used at record level to decide which HTTP method is used. If this field is set to ‘I’, HTTP POST will be used, while a value of ‘U’ will result in the use of HTTP PATCH.
Delete operation
The sink connector has the capability to delete records from the target system when a tombstone record is consumed by configuring the property sap.odata.enable.delete. A tombstone record is identified as a Kafka record that contains a non-null key and a null value. When this feature is enabled, the sink connector will convert tombstone records into delete requests that use the DELETE HTTP method. If the feature is not enabled, all tombstone records will be disregarded.
In order for this feature to function correctly, it is necessary for all key properties of an entry to be present in the record key, and for the record key itself to be of type struct.
Batching requests
By default, the sink connector groups multiple operations into a single HTTP request payload to reduce the overhead of HTTP transmission and improve overall performance. The maximum batch size is configurable with sap.odata.max.batch.size. For more information about general tweaking of the batch size for connectors, see Batch size for sink connector.
If batching requests is disabled, every record that is read from Kafka will be sent to the target system as a separate request. This means that an HTTP connection will be opened and closed for each individual record.
JMX metrics
The OData v4 sink connector supports all the connector and task metrics provided by Kafka Connect through Java Management Extensions (JMX). In addition, the OData v4 sink connector provides extra JMX metrics for accessing state managed by the connector.
MBean: org.init.ohja.kafka.connect:type=odatav4-sink-task-metrics,connector=([-.w]+),task=([d]+)
Metric | Explanation |
---|---|
retries | Count of retries performed in the connector task that is in retrying state. |
next-retry | Timestamp for next retry performed in the connector task that is in retrying state. |
${configGroup}-service | Service path (root) for OData v4 service configured in the configuration group. |
${configGroup}-entityset | Name of the OData entity set for OData v4 service configured in the configuration group. |
${configGroup}-topic | Name of target topic for OData v4 service configured in the configuration group. |
Data type mapping
The following table gives an overview of the OData v4 data type mapping from Kafka Connect data types to OData v4 data types applied by the sink connector. The default conversion type is the first one listed in the Kafka Connect Schema Types column.
OData v4 | Kafka Connect Schema Types |
---|---|
EdmBinary | BYTES |
EdmBoolean | BOOLEAN |
EdmByte | INT8, INT16, INT32, INT64 |
EdmDate | DATE, TIMESTAMP, INT64 |
EdmDateTimeOffset | DATE, TIMESTAMP, INT64 |
EdmDecimal | DECIMAL, FLOAT32, FLOAT64, INT8, INT16, INT32, INT64 |
EdmDouble | INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, DECIMAL |
EdmDuration | DECIMAL, INT8, INT16, INT32, INT64 |
EdmGuid | STRING |
EdmInt16 | INT8, INT16, INT32, INT64 |
EdmInt32 | INT8, INT16, INT32, INT64 |
EdmInt64 | INT8, INT16, INT32, INT64 |
EdmSByte | INT8, INT16, INT32, INT64 |
EdmSingle | INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, DECIMAL |
EdmStream | STRING |
EdmString | STRING |
EdmTimeOfDay | TIME, DATE, TIMESTAMP, Int64 |
EdmEnumType | STRING |
EdmStructuredType | STRUCT |
Hints
- Type conversions into OData types from other Kafka Connect Schema Types than the default conversion type might result in inaccurate data or loss of information. E.g. some integer values that can be represented as INT32 may not be accurately represented as EdmInt16.
- The sink connector does not actively use ETags for concurrency control. When issuing a PUT or DELETE request, the sink connector uses the value ’*’ in the If-Match HTTP request header.