Features
Parallelism
The OData V4 source connector’s parallelism is determined by assigning a single worker task to an OData service entity set. This limitation is primarily due to the Connect API. While it’s possible to handle REST calls and data loads package-wise across multiple tasks, but at the point in time when the poll() method is called getting new data from the source, the tasks are already created, and we would need a rebalance to change the task assignments. If the amount of configured OData service entity sets is greater than the maxTasks configuration or the amount of available tasks, a single task will handle extractions for multiple entity sets. Scaling by adding tasks therefore only makes sense if the amount of configured entity sets is greater than the amount of available tasks.
In addition to that, scaling by adding topic partitions makes no sense at all, as we only use one partition to guarantee sequential order.
Delivery semantics
Change Data Capture was officially introduced with the OASIS OData v4 protocol specification. With The Apache Olingo v4 library, the source connector is capable of implementing delta mode based on the specification. This includes offset recovery and exactly-one delivery semantics.
In general, source and sink connectors provide at-least-once delivery guarantees. However, the OData source connector stands out by ensuring that data is processed exactly once When operating in delta mode, or in full mode with the execution period set to -1 for one-time polls.
To enable exactly-once support for source connectors in your Kafka cluster, you will need to update the worker-level configuration property exactly.once.source.support to either preparing or enabled. For further information, see KIP-618
When deploying a source connector, you can configure the exactly.once.support property to make exactly-once delivery either requested or required (default is requested).
-
required the connector will undergo a preflight check to ensure it can provide exactly-once delivery. If the connector or worker cannot support exactly-once delivery, creation or validation requests will fail.
-
requested the connector will not undergo a preflight check for exactly-once delivery.
Operation mode
The source connector can be operated in either full or delta mode. You can decide about the operation mode for each single OData v4 service by setting the individual configuration property track-changes to either 0 (full mode) or 1 (delta mode).
Full mode sends a periodic data request to the corresponding source OData v4 service requesting data from an entity set.
Delta mode uses the change-tracking functionality supported by some OData v4 services to perform an initial data request for the corresponding entity set first. Subsequent requests only extract data in delta mode (Change Data Capture), which only contain new, changed or deleted records for the same set of entities as in the initial data request. Deleted entity records will only have the key properties filled in the value part of the message, all other properties will be set to null. Furthermore, deleted entity records will contain either a RECORDMODE or ODQ_CHANGEMODE field set to “D”, as well as a timestamp field DELETED_AT that indicates when the entity was removed from the source system.
Once all available entity records have been extracted, the source connector will pause for exec-period seconds before sending the next data request to an OData v4 service.
Offset handling
The logic offset for each message produced by the source connector contains a
- URL from which the message was requested,
- a record number identifying the position of the message in the response and
- optional a second URL for the succeeding request in case of server side paging (next link) or change tracking (delta link).
If any problems or task rebalances occur, the connector will obtain the most recent offsets from Kafka Connect and will resume extracting the last processed requests from the OData v4 service to ensure that no data is lost. To prevent duplicate messages from being stored in the output topic, all messages up to the record number of the associated logical offset will be treated as duplicates and filtered out before the SourceRecords are passed on to the producer. This implies that if a restart occurs, the source connector will attempt to recover from the last request immediately, before going into a wait-loop for the next execution interval specified by exec-period.
When delta/change tracking mode, this approach enables at-least-once semantics.However, in full mode, it is the responsibility of the source system to ensure that the subsequent call to the last request URL, after restarting the connector instance, returns identical data up to the record index stored in the offset before the connector stopped working.
Graceful backoff
In case of connection or communication issues with the configured OData v4 service endpoint the source connector will implement a retry backoff strategy. The maximum number of retry attempts can be set by using property sap.odata.max.retries. After each unsuccessful connection attempt, the connector will pause execution for a random duration within the range specified by the configuration properties sap.odata.min.retry.backoff.ms and sap.odata.max.retry.backoff.ms.
For more precise control over the maximum wait time when establishing a connection, as well as the maximum wait time for the server to respond before attempting to retry an OData request, the configuration properties sap.odata.connection.connect.timeout.ms and sap.odata.connection.read.timeout.ms can be utilized.
Partitioning
The partitions provided with each SourceRecord equals the service and entity set name, as this is the primary key of an entity set as defined by the OData v4 protocol.
JMX metrics
The OData v4 connectors supports all the connector and task metrics provided by Kafka Connect through Java Management Extensions (JMX). In addition, the OData v4 connectors provides extra JMX metrics for accessing state managed by the connector.
MBean: org.init.ohja.kafka.connect:type=odatav4-source-task-metrics,connector=([-.w]+),task=([d]+)
Metric | Explanation |
---|---|
retries | Count of retries performed in the connector task that is in retrying state. |
last-extraction | Timestamp for last execution time of poll function in connector task. |
${configGroup}-service | Service path (root) for OData v4 service configured in the configuration group. |
${configGroup}-entityset | Name of the OData entity set for OData v4 service configured in the configuration group. |
${configGroup}-topic | Name of target topic for OData v4 service configured in the configuration group. |
${configGroup}-service-url | Service url of current data extraction for OData v4 service configured in the configuration group. |
${configGroup}-uri-type | URI type of current data extraction for OData v4 service configured in the configuration group. |
${configGroup}-position | Record position of current data extraction for OData v4 service configured in the configuration group. |
Supported data types
The following table gives an overview of the OData v4 data type mapping from OData v4 data types to Kafka connect data types applied by the source connector:
OData v4 | Kafka Connect Schema Type | Java data type |
---|---|---|
EdmBinary | BYTES | java.nio.ByteBuffer |
EdmBoolean | BOOLEAN | java.lang.Boolean |
EdmByte | INT16 | java.lang.Short |
EdmDate | DATE | java.util.Date |
EdmDateTimeOffset | TIMESTAMP | java.util.Date |
EdmDecimal | DECIMAL | java.math.BigDecimal |
EdmDouble | FLOAT64 | java.lang.Double |
EdmDuration | DECIMAL | java.math.BigDecimal |
EdmGuid | STRING | java.lang.String |
EdmInt16 | INT16 | java.lang.Short |
EdmInt32 | INT32 | java.lang.Integer |
EdmInt64 | INT64 | java.lang.Long |
EdmSByte | INT8 | java.lang.Byte |
EdmSingle | FLOAT32 | java.lang.Float |
EdmStream | STRING | java.lang.String |
EdmString | STRING | java.lang.String |
EdmTimeOfDay | TIME | java.util.Date |
EdmEnumType | STRING | java.lang.String |
EdmStructuredType | STRUCT | java.util.Map[String, Object] |
Hints
- The scale value ‘floating’ for properties of type EdmDecimal is not supported.
- The scale value ‘variable’ for properties of type EdmDecimal is converted to the value of the precision facet. If the precision facet is unspecified, ‘6’ is used as a scale with rounding mode ‘half even’ for the conversion. The fallback scale and the rounding mode can be overridden using sap.odata.decimal.scale.fallback and sap.odata.decimal.rounding.mode.