Features

Delivery semantics

The OData v2 protocol defines no generic way of identifying unique datasets, handling delta data, change data capture or keeping track of succeeding data extractions. The sink connector makes use of Kafka’s message commit functionality and is therefore able to guarantee at-least-once semantics.

The connector first stores consumed records in a task cache and delivers them to their target service at time intervals specified by the connect worker configuration property offset.flush.interval.ms. Additionally, deliveries can be requested independent of this time interval based on the number of accumulated records in the task cache, using the configuration property sap.odata.flush.trigger.size.

Dead letter queue

The sink connector supports the Dead Letter Queue (DLQ) functionality.

Graceful backoff

In case of connection or communication issues with the configured OData v2 service endpoint the sink connector applies a retry backoff strategy similar to the source connector.

Operation mode

The sink connector currently provides the operation modes Insert, Update and Dynamic, set by the configuration property sap.odata.operation.mode. The sink connector is currently limited to standard OData v2 CRUD operations, therefore an Upsert mode is currently not supported, but can be offered by the OData service provider as part of the insert or update implementation.

  • In Insert mode the sink connector uses the HTTP POST method to export entries. Depending on the OData v2 service implementation this might lead to failures if the entry to be inserted is already available in the target system.
    The OData standard has support for deep inserts, with which nested business objects can be inserted by an OData service as multiple entities and their associations to each other. See OData deep inserts for more details.
  • In Update mode the sink connector uses the HTTP PUT method to export entities. Depending on the OData v2 service implementation this might lead to failures if the entry to be updated is not available in the target system.
    The sink connector expects an entry containing all required fields. Partial updates for entries with the HTTP MERGE method are currently not supported.
    Deep updates for nested business objects are not part of the OData standard and will lead to exceptions on the service provider side.
  • In Dynamic mode the sink connector either uses HTTP POST or HTTP PUT to export entries. The value of the record field provided by the configuration property sap.odata.operation.mode.property is used at record level to decide which HTTP method is used. If the value of specified field is ‘I’, HTTP POST is used, if it is set to ‘U’, HTTP PUT is used.

Delete operation

The sink connector can be configured with sap.odata.enable.delete to delete records at the target system when consuming a tombstone record, which is a Kafka record with a non-null key and a null value. If enabled, the sink connector converts tombstone records into HTTP DELETE requests. If not, all tombstone records will be ignored.

To use this feature, all key properties of an entry must be included in the record key, which must be of type struct.

Batching requests

To reduce HTTP transmission overhead and improve performance, the sink connector groups multiple operations into a single HTTP request payload by default. The maximum batch size is configurable using the sap.odata.max.batch.size. For more information about general tweaking of the batch size for connectors, see Batch size for sink connector.

If batching requests is disabled, each record read from Kafka will be sent to the target system individually. This means that an HTTP connection is opened and closed for each record, which may impact performance.

Data type mapping

The following table gives an overview of the OData v2 data type mapping from Kafka Connect data types to OData v2 data types applied by the sink connector. The default conversion type is the first one listed in the Kafka Connect Schema Types column.

OData v2 Kafka Connect Schema Types
Bit INT8
EdmSByte INT8, INT16, INT32, INT64
EdmByte INT8, INT16, INT32, INT64
EdmGuid STRING
EdmTime TIME, TIMESTAMP, DATE, INT64
EdmInt16 INT16, INT8, INT32, INT64
EdmInt32 INT32, INT8, INT16, INT64
EdmInt64 INT64, INT8, INT16, INT32
EdmBinary BYTES
EdmDouble FLOAT64, FLOAT32, DECIMAL, INT8, INT16, INT32, INT64
EdmSingle FLOAT32, FLOAT64, DECIMAL, INT8, INT16, INT32, INT64
EdmString STRING
EdmBoolean BOOL
EdmDecimal DECIMAL, FLOAT32, FLOAT64, INT8, INT16, INT32, INT64
EdmDateTime TIMESTAMP, DATE, INT64
EdmDateTimeOffset TIMESTAMP, DATE, INT64
EdmStructuralType STRUCT

Hints

  • Type conversions into OData types from other Kafka Connect Schema Types than the default conversion type might result in inaccurate data or loss of information. E.g. some integer values that can be represented as INT32 may not be accurately represented as EdmInt16.
  • The sink connector does not actively use ETags for concurrency control. When issuing a PUT or DELETE request, the sink connector uses the value ’*’ in the If-Match HTTP request header.

JMX metrics

The OData v2 sink connector supports all the connector and task metrics provided by Kafka Connect through Java Management Extensions (JMX). In addition, the OData v2 sink connector provides extra JMX metrics for accessing state managed by the connector.

MBean: org.init.ohja.kafka.connect:type=odatav2-sink-task-metrics,connector=([-.w]+),task=([d]+)

Metric Explanation
retries Count of retries performed in the connector task that is in retrying state.
next-retry Timestamp for next retry performed in the connector task that is in retrying state.
${configGroup}-service Service path (root) for OData v2 service configured in the configuration group.
${configGroup}-entityset Name of the OData entity set for OData v2 service configured in the configuration group.
${configGroup}-topic Name of source topic for OData v2 service configured in the configuration group.