Advanced Features
Delta Link Recovery
A common extraction pattern for extracting delta data from SAP is an ODP-Based Data Extraction via OData. This extraction scenario adheres to a specific pattern, enabling the recovery of delta links through the delta link history exposed by SAP via an OData service. If the connector has used an invalidated delta link, enabling this configuration allows it to proceed with the oldest delta link provided by the source system.
In case of the error “536 - Recovery of old requests not possible. Close delta
# Used recovery mechanism in the event of an invalidated delta link
# 0: off
# 1: Recovering from the oldest available delta link using entity "DeltaLinksOf...", e.g. for OData enabled ODP sources
sap.odata#00.delta.link.recovery = 0
With delta.link.regex
and delta.link.replacement
, you can replace parts of the stored delta link using a regex when starting the connector. In the example, the delta token is overwritten directly. This functionality could be used to modify the stored delta link when the connector is started.
# Optional: manual delta link override using a regular expression match
sap.odata#00.delta.link.regex = deltatoken='(.*)'
# Optional: delta link replacement string
sap.odata#00.delta.link.replacement = deltatoken='DXXXXXXXXXXXXXX_XXXXXXXXX'
Writing Null Messages
With the property null-messages
, the connector updates offsets (delta links) in the connect-offset topic without needing to produce actual source data. By default, this feature is turned off (0
). This is helpful when the source system does not produce new data for a long time and older delta links get cleaned up by the source system.
# Create null messages and persist logical connect offsets for empty requests in the source system
# 0(default): false
# 1: true
sap.odata#00.null-messages = 0
Scheduling
The connector is meant to run continuously according to streaming principles. Nevertheless, a clock period can be set using configuration property exec-period
per single OData V2 service / entity set combination. A data extraction will then take place only once in a clock period interval.
As an alternative, one can use configuration properties cron.expression
and cron.security.time.sec
to define a cron-like schedule, e.g. if a source system specifies strict timeframes for extraction processes.
For more information on quartz cron expressions, check out the following link Quartz - Cron Trigger Tutorial.
Push-based Subscriptions
The SAP Gateway Foundation offers a Subscription and Notification Flow that supports push-based source connector scenarios. Upon subscribing to a particular OData V2 service and entity set, which is handled by the source connector during startup, SAP will send push notifications to the source connector whenever there are creates, changes, or deletes for the entities that the connector has subscribed to. The source connector achieves this by executing an HTTP listener that handles event notification callbacks received from SAP.
The source connector offers configuration properties for subscriptions, including:
-
sap.odata.subscription.dest
This property specifies the name of the type ‘G’ RFC destination customized in SAP that targets the connector task, containing the IP/DNS and Port for addressing a Connect node directly or an intermediate proxy node. To ensure each connector task instance has a unique Port, it is recommended to set
max.tasks
per connector instance to1
. -
sap.odata.subscription.ports
This defines a list of ports for the http notification listener executed by the connector tasks. When the http listener is started, the connector will search for available ports in the order of listing.
-
subscription.enable
This property can be used to enable subscriptions and notifications for each individual service and entity set. To enable subscriptions, the OData service must be implemented to support them. With SAP subscription enabled services offer the entity sets named
SubscriptionCollection
andNotificationCollection
.
Periodic extractions customized by setting configuration property exec-period
can be combined with push-based notifications. Notifications sent to the connector are always processed immediately and with the highest priority during the next call to the poll()
method by the Kafka Connect service. Periodic data extractions will follow as soon as the next execution interval is reached.
If the exec-period
property is set to -1
, no periodic data extractions will be executed. This makes most sense in combination with enabled subscriptions, as most often periodic data extractions are not needed if all change data capture events are handled by notifications.
There is a slightly different behaviour between full and delta operation mode:
- Delta mode: Each notification leads to a new delta request call to the OData service. Setting
exec-period
to-1
and disabling subscriptions at the same time will lead to validation errors. - Full mode: The first request will always be a regular OData request including fetching all pages. After that only changed entities from notification events will be processed and requested from the OData service if
exec-period
equals-1
.
Advanced Querying
Field Projection
Each OData V2 service entity set defines a collection of properties that can be read, updated, deleted, or inserted. An entity distinguishes between key and non-key properties. The configuration of a source connector allows to define a subset of non-key properties that will be extracted to Kafka. Despite this configuration the source connector will always extract all the entity’s key properties.
Selections
The source connector has built-in support for OData query filters. According to SAP Note 1574568 logical operators for SAP OData service query filters are restricted to eq
,ne
,le
,lt
,ge
and gt
. The source connector supports three additional operators: bt
(between), nb
(not between) and in
(in specified set).
A single filter condition consist of:
- A property name from the list of filterable properties supplied by the respective service entity, e.g. annotated by
sap: filterable=true
- An option defining the OData V2 query filter operator
- A low value defining the input value for the selected operator or the lower bound of an interval
- A high value defining the upper bound of an interval
Multiple filter conditions will be combined by an implicit logical AND.
Batch Requests
To reduce HTTP transmission overhead and improve performance, the sink connector groups multiple operations into a single HTTP request payload by default. The maximum batch size is configurable using the sap.odata.max.batch.size
.
For more information about general tweaking of the batch size for connectors, see Batch size for sink connector.
If batching requests is disabled, each record read from Kafka will be sent to the target system individually. This means that an HTTP connection is opened and closed for each record, which may impact performance.
Timeout in Precommit
There will be a timeout if in pre-commit no data could be loaded. If the task should wait longer for a shutdown please modify the configuration property sap.odata.flush.wait.ms
. If the connector still runs into a timeout after raising this value, you may have to raise the consumer.override.max.poll.interval.ms
and add the connector.client.config.override.policy = All
to the connectors properties file.
Parallelism
To define the parallelism of the source connector, each OData service entity set is assigned to exactly one worker task. This restriction is due to the Connect API, which handles REST calls and data loads package-wise in multiple tasks. However, at the point when the poll()
method is called to retrieve new data from the source, the tasks have already been created, and rebalancing is required to change task assignments.
If the number of configured OData service entity sets exceeds the maxTasks
configuration or the available tasks, a single task will handle extractions for multiple entity sets.
- Scaling by adding tasks is only practical if the number of configured entity sets is greater than the number of available tasks.
- Scaling by adding topic partitions should be used with care, since using a single partition guarantees sequential order.
Custom HTTP Header
Custom HTTP headers are optional fields that can be included in the header section of an HTTP request. A custom HTTP header field allows for the inclusion of specific information or metadata with each HTTP request. These headers are not part of the standard set defined by HTTP/1.1 specifications but can be defined and used by connector for various purposes. This additional information could be used for authentication, tracking, logging, or any other custom functionality that necessitates passing specific details along with each HTTP request.
Every HTTP request will include the specified custom headers. The custom HTTP headers should be formatted as key-value pairs separated by commas.
To activate a custom HTTP header, configure the associated property as follows:
# All HTTP requests will include provided custom headers.
sap.odata.custom.http.headers = key1:val1,key2:val2
Plugin Discovery
Plugin Discovery is the strategy the Connect worker uses to find plugin classes and make them available for configuration and execution. This process is controlled by the worker’s plugin.discovery
setting.
By default, the plugin.discovery
setting is HYBRID_WARN
, which is compatible with all plugins and logs a warning if it encounters any plugins incompatible with the SERVICE_LOAD
mode. The SERVICE_LOAD
option, which uses the faster ServiceLoader
mechanism, may improve performance during worker startup, but will not load incompatible plugins. See Connect Worker Configuration for all plugin.discovery
values.
The OData V2 Connectors support the ServiceLoader
mechanism.
For more information about Plugin Discovery and the Connect worker configuration refer to Kafka Connect Plugin Discovery as well as KIP-898: Modernize Connect plugin discovery.