Features
Parallelism
To define the parallelism of the source connector, each OData service entity set is assigned to exactly one worker task. This restriction is due to the Connect API, which handles REST calls and data loads package-wise in multiple tasks. However, at the point when the poll() method is called to retrieve new data from the source, the tasks have already been created, and rebalancing is required to change task assignments. If the number of configured OData service entity sets exceeds the maxTasks configuration or the available tasks, a single task will handle extractions for multiple entity sets. Scaling by adding tasks is only practical if the number of configured entity sets is greater than the number of available tasks. In addition to that, scaling by adding topic partitions makes no sense at all, as we only use one partition to guarantee sequential order.
Delivery semantics
The OData v2 protocol does not have a standard method for identifying unique datasets, handling delta data, change data capture, or tracking successive data extractions. Change Data Capture was officially introduced with the OASIS OData v4 protocol. Regardless of that fact, Apache Olingo v2 and some OData v2 services implement a lightweight specification of change tracking used by the source connector to implement a delta mode including offset recovery and exactly-once delivery semantics.
In general the source and sink connectors offer at-least-once delivery semantics. However, the OData source connector stands out by ensuring that data is processed exactly once When operating in delta mode, or in full mode with the execution period set to -1 for one-time polls.
To enable exactly-once support for source connectors in your Kafka cluster, you will need to update the worker-level configuration property exactly.once.source.support to either preparing or enabled. For further information, see KIP-618
When deploying a source connector, you can configure the exactly.once.support property to make exactly-once delivery either requested (default) or required.
-
required the connector will undergo a preflight check to ensure it can provide exactly-once delivery. If the connector or worker cannot support exactly-once delivery, creation or validation requests will fail.
-
requested the connector will not undergo a preflight check for exactly-once delivery.
Operation mode
The source connector can operate in two modes: full mode or delta mode.
Each OData v2 service can be configured to use either mode by setting the track-changes configuration property to 0 for full mode or 1 for delta mode.
Full mode sends a periodic data request to the corresponding source OData v2 service requesting data from an entity set.
Delta mode uses the change-tracking functionality supported by some OData v2 services to perform an initial data request for the corresponding entity set first. In delta mode (Change Data Capture), only new, changed, or deleted records for the set of entities requested in the initial data request are extracted in subsequent requests. If a record has been deleted, only the key properties will be included in the value part of the message, with all other properties set to null. Additionally, deleted records will include a RECORDMODE or ODQ_CHANGEMODE field set to “D” and a DELETED_AT timestamp indicating when the entity was deleted. After completing an extraction, the source connector waits for exec-period seconds before initiating the next extraction from the OData v2 service.
Delta mode uses the change-tracking functionality supported by some OData v2 services to perform an initial data request for the corresponding entity set first. Subsequent requests only extract data in delta mode (Change Data Capture), containing only new, changed or deleted records for the set of entities requested in the initial data request.
Delta link recovery
A common extraction pattern for extracting delta data from SAP is an ODP-Based Data Extraction via OData. This extraction scenario adheres to a specific pattern, enabling the recovery of delta links through the delta link history exposed by SAP via an OData service. If the connector has used an invalidated delta link, enabling this configuration allows it to proceed with the oldest delta link provided by the source system. In case of the error “536 - Recovery of old requests not possible. Close delta
# Used recovery mechanism in the event of an invalidated delta link
# 0: off
# 1: Recovering from latest available delta link using entity "DeltaLinksOf...", e.g. for OData enabled ODP sources
sap.odata#00.delta.link.recovery = 0
With delta.link.regex and delta.link.replacement, you can replace parts of the stored delta link using a regex when starting the connector. In the example, the delta token is overwritten directly. This functionality could be used to modify the stored delta link when the connector is started.
# Optional: manual delta link override using a regular expression match
sap.odata#00.delta.link.regex = deltatoken='(.*)'
# Optional: delta link replacement string
sap.odata#00.delta.link.replacement = deltatoken='DXXXXXXXXXXXXXX_XXXXXXXXX'
Push-based subscriptions and notifications
The SAP Gateway Foundation offers a Subscription and Notification Flow that supports push-based source connector scenarios. Upon subscribing to a particular OData v2 service and entity set, which is handled by the source connector during startup, SAP will send push notifications to the source connector whenever there are creates, changes, or deletes for the entities that the connector has subscribed to. The source connector achieves this by executing an HTTP listener that handles event notification callbacks received from SAP.
The source connector offers configuration properties for subscriptions, including:
- sap.odata.subscription.dest This property specifies the name of the type ‘G’ RFC destination customized in SAP that targets the connector task, containing the IP/DNS and Port for addressing a Connect node directly or an intermediate proxy node. To ensure each connector task instance has a unique Port, it is recommended to set max.tasks per connector instance to 1.
- sap.odata.subscription.ports defines a list of ports for the http notification listener executed by the connector tasks. When the http listener is started, the connector will search for available ports in the order of listing.
- subscription.enable can be used to enable subscriptions and notifications for each individual service and entity set. To enable subscriptions, the OData service must be implemented to support them. With SAP subscription enabled services offer the entity sets named SubscriptionCollection and NotificationCollection.
Periodic extractions customized by setting configuration property exec-period can be combined with push-based notifications. Notifications sent to the connector are always processed immediately and with the highest priority during the next call to the poll() method by the Kafka Connect service. Periodic data extractions will follow as soon as the next execution interval is reached.
If the exec-period property is set to -1, no periodic data extractions will be executed. This makes most sense in combination with enabled subscriptions, as most often periodic data extractions are not needed if all change data capture events are handled by notifications. There is a slightly different behaviour between full and delta operation mode:
- in delta mode each notification leads to a new delta request call to the OData service. Setting exec-period to -1 and disabling subscriptions at the same time will lead to validation errors.
- in full mode the first request will always be a regular OData request including fetching all pages. After that only changed entities from notification events will be processed and requested from the OData service if exec-period equals -1.
Offset handling
The logic offset for each message produced by the source connector contains a
- URL from which the message was requested,
- a record number identifying the position of the message in the response and
- optional a second URL for the succeeding request in case of server side paging (next link) or change tracking (delta link).
In case of any issues or task rebalances, the connector will get the latest offsets from Kafka Connect and restart extracting the last processed requests from the OData v2 service to ensure no data loss. All the messages up to the record number of the corresponding logical offset will be regarded as duplicates and then be filtered out before the SourceRecords are handed over to the producer, to make sure that no duplicate messages will be stored in the output topic. This means that in all cases of a restart the source connector will immediately try to recover from the last request, before going into a wait-loop for the next execution interval configured by exec-period.
In delta/change tracking mode, this enables at-least-once semantics, but in full mode the source system has to guarantee that the next call to the last request URL after restarting the connector instance returns the same data up to the record index stored in the offset before the connector went down.
Graceful backoff
In case of connection or communication issues with the configured OData v2 service endpoint, the source connector applies a retry backoff strategy. The maximum number of retry attempts can be configured using property sap.odata.max.retries. After each failed connection attempt, the connector blocks execution for a random number of milliseconds in the range between the values of configuration properties sap.odata.min.retry.backoff.ms and sap.odata.max.retry.backoff.ms.
To give a more precise control over the duration of time to wait while establishing a connection and waiting for the server to send a response before retrying an OData request, you can utilize the configuration properties sap.odata.connection.connect.timeout.ms and sap.odata.connection.read.timeout.ms.
Partitioning
The partitions provided with each SourceRecord equals the service and entity set name, as this is the primary key of an entity set as defined by the OData v2 protocol.
Data type mapping
The following table gives an overview of the OData v2 data type mapping from OData v2 data types to Kafka Connect data types applied by the source connector:
OData v2 | Kafka Connect Schema Type | Java data type |
---|---|---|
Bit | INT8 | java.lang.Byte |
Uint7 | INT8 | java.lang.Byte |
EdmSByte | INT8 | java.lang.Byte |
EdmByte | INT16 | java.lang.Short |
EdmGuid | STRING | java.lang.String |
EdmTime | TIME | java.util.Date |
EdmInt16 | INT16 | java.lang.Short |
EdmInt32 | INT32 | java.lang.Integer |
EdmInt64 | INT64 | java.lang.Long |
EdmBinary | BYTES | java.nio.ByteBuffer |
EdmDouble | FLOAT64 | java.lang.Double |
EdmSingle | FLOAT32 | java.lang.Float |
EdmString | STRING | java.lang.String |
EdmBoolean | BOOL | java.lang.Boolean |
EdmDecimal | DECIMAL | java.math.BigDecimal |
EdmDateTime | TIMESTAMP | java.util.Date |
EdmDateTimeOffset | TIMESTAMP | java.util.Date |
EdmStructuralType | STRUCT | java.util.Map[String, Object] |
Hints
- The OData V2 standard defines the EdmDateTimeOffset literal form by the lexical representation for datetime (including timezone offset), which allows the accuracy of nanoseconds. However, nanoseconds are truncated during conversion by the Apache Olingo v2 client library.
JMX metrics
The OData v2 source connector supports all the connector and task metrics provided by Kafka Connect through Java Management Extensions (JMX). In addition, the OData v2 source connector provides extra JMX metrics for accessing state managed by the connector.
MBean: org.init.ohja.kafka.connect:type=odatav2-source-task-metrics,connector=([-.w]+),task=([d]+)
Metric | Explanation |
---|---|
retries | Count of retries performed in the connector task that is in retrying state. |
last-extraction | Timestamp for last execution time of poll function in connector task. |
active-subscriptions | Number of active subscriptions registered by the connector. |
${configGroup}-service | Service path (root) for OData v2 service configured in the configuration group. |
${configGroup}-entityset | Name of the OData entity set for OData v2 service configured in the configuration group. |
${configGroup}-topic | Name of target topic for OData v2 service configured in the configuration group. |
${configGroup}-service-url | Service url of current data extraction for OData v2 service configured in the configuration group. |
${configGroup}-uri-type | URI type of current data extraction for OData v2 service configured in the configuration group. |
${configGroup}-position | Record position of current data extraction for OData v2 service configured in the configuration group. |