Kafka Connect - OData v2 Connectors

The OData v2 connectors package consists of Kafka Connect source and sink connectors that enable seamless interaction with OData v2 services.

While these connectors are optimized for SAP OData v2, they can be used with other compatible services as well.

Dependencies

The connectors use the Apache Olingo v2 client library contained in the connectors package.

Restrictions and pending features

  • Currently, the connectors are restricted to performing standard OData v2 CRUD operations on entity sets.
  • The source connector only accepts media type application/atom+xml in delta requests. This is due to the fact that the Olingo implementation for OData v2 lacks support for delta links in JSON.
  • The source connector does not provide support for query options such as filters and projections in expanded navigation properties.
  • The sink connector does not support processing record values without a value schema.

Configuration

The OData v2 connectors offer complete support for the configuration UI in Confluent Control Center. Unlike using property files, configuring the connectors through Confluent Control Center provides several benefits, including proposed properties, a comprehensive set of recommended property values, incremental visibility of applicable configurations, an extensive range of interactive validations, and more.

To enable the SAP OData service recommendations within the connector configuration of Confluent Control Center, the /sap/opu/odata/iwfnd/CATALOGSERVICE SAP OData catalog service must be activated within the relevant SAP system.

The evaluation version of the connector is limited to 5 OData v2 service entities. Once the mandatory fields of a service entity have been entered in the configuration UI, a new configuration group for configuring an additional service entity will appear. To ensure optimal performance, the UI of the Confluent Control Center only displays a maximum of 1000 recommendations.

Please refer to Source Configuration Details and Sink Configuration Details. (Or the files doc/sink/configuration.html and doc/source/configuration.html in the connector package). Furthermore, subfolder etc of the connectors package contains configuration template properties files.

Service destination configuration

A minimal OData v2 service destination configuration looks like this:

# OData v2 server host as either DNS or IP
sap.odata.host.address = services.odata.org
# OData v2 server port
sap.odata.host.port = 443
# OData v2 protocol (supported values are http or https)
sap.odata.host.protocol = https
# OData v2 user name for basic authentication.
# For services not requiring authentication this can be set to any value.
sap.odata.user.name = anonymous
# OData v2 user password for basic authentication.
# For services not requiring authentication this can be set to any value.
sap.odata.user.pwd = anonymous

Encrypted communication is supported by using HTTPS.
The supported authentication type is basic authentication with a username and password.

Retriable http codes

Repeating the request may be appropriate for certain HTTP response codes.

# OData v2 retriable http codes
sap.odata.http.codes = 408,502,503,504

The connector will retry the specified list http response codes.

Service entity set configuration

A minimal service entity set configuration looks like this:

# OData v2 URL service path
sap.odata#00.service = /V2/Northwind/Northwind.svc
# OData v2 entity set name.
sap.odata#00.entityset = Order_Details
# Kafka topic name
sap.odata#00.topic = Order_Details
  • The service and entityset properties uniquely identify the service entity set.
  • To load the recommender, a prefix must be entered.
  • topic defines the Kafka output topic the connector producer will use to publish extracted data.
  • decimal.mapping can optionally be used to transform DECIMAL types to other appropriate data types if needed.
    decimal.mapping = primitive will transform decimals to double or long, depending on the scale.

Modern web browsers allow for direct use and querying of OData services. This feature can be used, e.g. to test the service or to identify the properties and values required for the connectors’ configuration.

Custom KeyStore and TrustStore

A KeyStore used to store private key and identity certificates, which enables the connector to verify its identity to the corresponding application server in an SSL connection. On the other hand, a TrustStore is utilized to store certificates issued by Certified Authorities (CA) that authenticate the certificate presented by the application servers in an SSL connection.

The OData V2 connectors enable secure communication with the corresponding application server using both the default Java KeyStore and TrustStore, as well as a custom KeyStore and TrustStore.

To enable custom KeyStore and TrustStore set the following properties:

# Absolute path to the custom KeyStore on the worker machine.
sap.odata.keystore.location = path/to/your/custom/KeyStore
# The password for the custom KeyStore.
sap.odata.keystore.password = password
# The type of the custom KeyStore.
# e.g. jceks, jks, dks, pkcs11, pkcs12
sap.odata.keystore.type = jks
# Absolute path to the custom TrustStore on the worker machine.
sap.odata.truststore.location = path/to/your/custom/TrustStore
# The password for the custom TrustStore.
sap.odata.truststore.password = password
# The type of the custom TrustStore.
# e.g. jceks, jks, dks, pkcs11, pkcs12
sap.odata.truststore.type = jks
# The protocol that is applied for the encrypted communication between connector and server, like SSL or TLS.
# e.g. SSL, SSLv2, SSLv3, TLS, TLSv1, TLSv1.1, TLSv1.2, TLSv1.3
sap.odata.sslcontext.protocol = SSL

Following types are supported file types for KeyStore and TrustStore: jceks, jks, dks or pkcs12.

The configuration sslcontext.protocol sets the encryption protocol which will be used for the secure communication.

Supported protocols:

SSL, SSLv2, SSLv3, TLS, TLSv1, TLSv1.1, TLSv1.2 und TLSv1.3

Supported KeyStore and TrustStore types:

jceks, jks, dks, pkcs11, pkcs12

Hints

  • The custom TrustStore and KeyStore connector configurations overwrite the specific SSL connection configuration for the involved connector and not the global JVM configuration.
  • In the absence of any custom TrustStore or KeyStore definition, the system default is used.
  • A password is always required for utilizing a custom TrustStore or KeyStore.
  • The configuration parameters for the encryption protocol as well the KeyStore and TrustStore types are case sensitive.

Information

Source Connector

Parallelism

To define the parallelism of the source connector, each OData service entity set is assigned to exactly one worker task. This restriction is due to the Connect API, which handles REST calls and data loads package-wise in multiple tasks. However, at the point when the poll() method is called to retrieve new data from the source, the tasks have already been created, and rebalancing is required to change task assignments. If the number of configured OData service entity sets exceeds the maxTasks configuration or the available tasks, a single task will handle extractions for multiple entity sets. Scaling by adding tasks is only practical if the number of configured entity sets is greater than the number of available tasks. In addition to that, scaling by adding topic partitions makes no sense at all, as we only use one partition to guarantee sequential order.

Delivery semantics

The OData v2 protocol does not have a standard method for identifying unique datasets, handling delta data, change data capture, or tracking successive data extractions. Change Data Capture was officially introduced with the OASIS OData v4 protocol. Regardless of that fact, Apache Olingo v2 and some OData v2 services implement a lightweight specification of change tracking used by the source connector to implement a delta mode including offset recovery and exactly-one delivery semantics.

In general the source and sink connectors offer at-least-once delivery semantics. However, the OData source connector stands out by ensuring that data is processed exactly once When operating in delta mode, or in full mode with the execution period set to -1 for one-time polls.

Enabling Exactly-Once in your Kafka cluster

To enable exactly-once support for source connectors in your Kafka cluster, you will need to update the worker-level configuration property exactly.once.source.support to either preparing or enabled. For further information, see see KIP-618

When deploying a source connector, you can configure the exactly.once.support property to make exactly-once delivery either requested (default) or required.

  • required the connector will undergo a preflight check to ensure it can provide exactly-once delivery. If the connector or worker cannot support exactly-once delivery, creation or validation requests will fail.

  • requested the connector will not undergo a preflight check for exactly-once delivery.

Operation Mode

The source connector can operate in two modes: full mode or delta mode.
Each OData v2 service can be configured to use either mode by setting the track-changes configuration property to 0 for full mode or 1 for delta mode.

Full mode sends a periodic data request to the corresponding source OData v2 service requesting data from an entity set.

Delta mode uses the change-tracking functionality supported by some OData v2 services to perform an initial data request for the corresponding entity set first. In delta mode (Change Data Capture), only new, changed, or deleted records for the set of entities requested in the initial data request are extracted in subsequent requests. If a record has been deleted, only the key properties will be included in the value part of the message, with all other properties set to null. Additionally, deleted records will include a RECORDMODE or ODQ_CHANGEMODE field set to “D” and a DELETED_AT timestamp indicating when the entity was deleted. After completing an extraction, the source connector waits for exec-period seconds before initiating the next extraction from the OData v2 service.

Delta mode uses the change-tracking functionality supported by some OData v2 services to perform an initial data request for the corresponding entity set first. Subsequent requests only extract data in delta mode (Change Data Capture), containing only new, changed or deleted records for the set of entities requested in the initial data request.

Push-based subscriptions and notifications

The SAP Gateway Foundation offers a Subscription and Notification Flow that supports push-based source connector scenarios. Upon subscribing to a particular OData v2 service and entity set, which is handled by the source connector during startup, SAP will send push notifications to the source connector whenever there are creates, changes, or deletes for the entities that the connector has subscribed to. The source connector achieves this by executing an HTTP listener that handles event notification callbacks received from SAP.

The source connector offers configuration properties for subscriptions, including:

  • sap.odata.subscription.dest This property specifies the name of the type ‘G’ RFC destination customized in SAP that targets the connector task, containing the IP/DNS and Port for addressing a Connect node directly or an intermediate proxy node. To ensure each connector task instance has a unique Port, it is recommended to set max.tasks per connector instance to 1.
  • sap.odata.subscription.ports defines a list of ports for the http notification listener executed by the connector tasks. When the http listener is started, the connector will search for available ports in the order of listing.
  • subscription.enable can be used to enable subscriptions and notifications for each individual service and entity set. To enable subscriptions, the OData service must be implemented to support them. With SAP subscription enabled services offer the entity sets named SubscriptionCollection and NotificationCollection.

Periodic extractions customized by setting configuration property exec-period can be combined with push-based notifications. Notifications sent to the connector are always processed immediately and with the highest priority during the next call to the poll() method by the Kafka Connect service. Periodic data extractions will follow as soon as the next execution interval is reached.
If the exec-period property is set to -1, no periodic data extractions will be executed. This makes most sense in combination with enabled subscriptions, as most often periodic data extractions are not needed if all change data capture events are handled by notifications. There is a slightly different behaviour between full and delta operation mode:

  • in delta mode each notification leads to a new delta request call to the OData service. Setting exec-period to -1 and disabling subscriptions at the same time will lead to validation errors.
  • in full mode the first request will always be a regular OData request including fetching all pages. After that only changed entities from notification events will be processed and requested from the OData service if exec-period equals -1.

Offset handling

The logic offset for each message produced by the source connector contains a

  • URL from which the message was requested,
  • a record number identifying the position of the message in the response and
  • optional a second URL for the succeeding request in case of server side paging (next link) or change tracking (delta link).

In case of any issues or task rebalances, the connector will get the latest offsets from Kafka Connect and restart extracting the last processed requests from the OData v2 service to ensure no data loss. All the messages up to the record number of the corresponding logical offset will be regarded as duplicates and then be filtered out before the SourceRecords are handed over to the producer, to make sure that no duplicate messages will be stored in the output topic. This means that in all cases of a restart the source connector will immediately try to recover from the last request, before going into a wait-loop for the next execution interval configured by exec-period.
In delta/change tracking mode, this enables at-least-once semantics, but in full mode the source system has to guarantee that the next call to the last request URL after restarting the connector instance returns the same data up to the record index stored in the offset before the connector went down.

Graceful backoff

In case of connection or communication issues with the configured OData v2 service endpoint, the source connector applies a retry backoff strategy. The maximum number of retry attempts can be configured using property sap.odata.max.retries. After each failed connection attempt, the connector blocks execution for a random number of milliseconds in the range between the values of configuration properties sap.odata.min.retry.backoff.ms and sap.odata.max.retry.backoff.ms.

To give a more precise control over the duration of time to wait while establishing a connection and waiting for the server to send a response before retrying an OData request, you can utilize the configuration properties sap.odata.connection.connect.timeout.ms and sap.odata.connection.read.timeout.ms.

Partitioning

The partitions provided with each SourceRecord equals the service and entity set name, as this is the primary key of an entity set as defined by the OData v2 protocol.

Supported data types

The following table gives an overview of the OData v2 data type mapping from OData v2 data types to Kafka connect data types applied by the source connector:

OData v2 Kafka Connect Schema Type Java data type
Bit INT8 java.lang.Byte
Uint7 INT8 java.lang.Byte
EdmSByte INT8 java.lang.Byte
EdmByte INT16 java.lang.Short
EdmGuid STRING java.lang.String
EdmTime TIME java.util.Date
EdmInt16 INT16 java.lang.Short
EdmInt32 INT32 java.lang.Integer
EdmInt64 INT64 java.lang.Long
EdmBinary BYTES java.nio.ByteBuffer
EdmDouble FLOAT64 java.lang.Double
EdmSingle FLOAT32 java.lang.Float
EdmString STRING java.lang.String
EdmBoolean BOOL java.lang.Boolean
EdmDecimal DECIMAL java.math.BigDecimal
EdmDateTime TIMESTAMP java.util.Date
EdmDateTimeOffset TIMESTAMP java.util.Date
EdmStructuralType STRUCT java.util.Map[String, Object]

Hints

Sink Connector

Delivery semantics

The OData v2 protocol defines no generic way of identifying unique datasets, handling delta data, change data capture or keeping track of succeeding data extractions. The sink connector makes use of Kafka’s message commit functionality and is therefore able to guarantee at-least-once semantics.

The connector first stores consumed records in a task cache and delivers them to their target service at time intervals specified by the connect worker configuration property offset.flush.interval.ms. Additionally, deliveries can be requested independent of this time interval based on the number of accumulated records in the task cache, using the configuration property sap.odata.flush.trigger.size.

Dead Letter Queue

The sink connector supports the Dead Letter Queue (DLQ) functionality.

Graceful backoff

In case of connection or communication issues with the configured OData v2 service endpoint the sink connector applies a retry backoff strategy similar to the source connector.

Operation Mode

The sink connector currently provides the operation modes Insert, Update and Dynamic, set by the configuration property sap.odata.operation.mode. The sink connector is currently limited to standard OData v2 CRUD operations, therefore an Upsert mode is currently not supported, but can be offered by the OData service provider as part of the insert or update implementation.

  • In Insert mode the sink connector uses the HTTP POST method to export entries. Depending on the OData v2 service implementation this might lead to failures if the entry to be inserted is already available in the target system.
    The OData standard has support for deep inserts, with which nested business objects can be inserted by an OData service as multiple entities and their associations to each other. See OData deep inserts for more details.
  • In Update mode the sink connector uses the HTTP PUT method to export entities. Depending on the OData v2 service implementation this might lead to failures if the entry to be updated is not available in the target system.
    The sink connector expects an entry containing all required fields. Partial updates for entries with the HTTP MERGE method are currently not supported.
    Deep updates for nested business objects are not part of the OData standard and will lead to exceptions on the service provider side.
  • In Dynamic mode the sink connector either uses HTTP POST or HTTP PUT to export entries. The value of the record field provided by the configuration property sap.odata.operation.mode.property is used at record level to decide which HTTP method is used. If the value of specified field is ‘I’, HTTP POST is used, if it is set to ‘U’, HTTP PUT is used.

Delete Operation

The sink connector can be configured with sap.odata.enable.delete to delete records at the target system when consuming a tombstone record, which is a Kafka record with a non-null key and a null value. If enabled, the sink connector converts tombstone records into HTTP DELETE requests. If not, all tombstone records will be ignored.

To use this feature, all key properties of an entry must be included in the record key, which must be of type struct.

Batching Requests

To reduce HTTP transmission overhead and improve performance, the sink connector groups multiple operations into a single HTTP request payload by default. The maximum batch size is configurable using the sap.odata.max.batch.size. For more information about general tweaking of the batch size for connectors, see Batch size for sink connector.

If batching requests is disabled, each record read from Kafka will be sent to the target system individually. This means that an HTTP connection is opened and closed for each record, which may impact performance.

Data Type Mapping

The following table gives an overview of the OData v2 data type mapping from Kafka connect data types to OData v2 data types applied by the sink connector. The default conversion type is the first one listed in the Kafka Connect Schema Types column.

OData v2 Kafka Connect Schema Types
Bit INT8
EdmSByte INT8, INT16, INT32, INT64
EdmByte INT8, INT16, INT32, INT64
EdmGuid STRING
EdmTime TIME, TIMESTAMP, DATE, INT64
EdmInt16 INT16, INT8, INT32, INT64
EdmInt32 INT32, INT8, INT16, INT64
EdmInt64 INT64, INT8, INT16, INT32
EdmBinary BYTES
EdmDouble FLOAT64, FLOAT32, DECIMAL, INT8, INT16, INT32, INT64
EdmSingle FLOAT32, FLOAT64, DECIMAL, INT8, INT16, INT32, INT64
EdmString STRING
EdmBoolean BOOL
EdmDecimal DECIMAL, FLOAT32, FLOAT64, INT8, INT16, INT32, INT64
EdmDateTime TIMESTAMP, DATE, INT64
EdmDateTimeOffset TIMESTAMP, DATE, INT64
EdmStructuralType STRUCT

Hints

  • Type conversions into OData types from other Kafka Connect Schema Types than the default conversion type might result in inaccurate data or loss of information. E.g. some integer values that can be represented as INT32 may not be accurately represented as EdmInt16.
  • The sink connector does not actively use ETags for concurrency control. When issuing a PUT or DELETE request, the sink connector uses the value ’*’ in the If-Match HTTP request header.

Installation

Manual

Put the connector jars and library jars from directory /lib in the configured plugin path of Kafka Connect.

Confluent Hub CLI

The Confluent platform provides a command line installation interface that can be used to install the connectors package zip file from a local file system.

License

See Lizenzvereinbarung for INITs evaluation license and Dependencies for more information on the dependencies’ licenses. (doc/licenses.html file in connector package)

Usage

Packaging

The Odata v2 connectors package name is: init-kafka-connect-odata-<connector version>.zip

The zip archive includes one folder called init-kafka-connect-odata-<connector version>, which itself contains the nested folders lib, etc, assets and doc.

  • lib/ contains the java archives that need to be extracted into the plugin.path of Kafka Connect.
  • etc/ contains sample connector configuration property files. These can be supplied as an argument in the CLI during startup of Kafka Connect or by upload to the Confluent Control Center.
  • assets/ contains media files like icons and company logos.
  • doc/ contains more detailed documentation about the connectors like licenses, readme, configurations and so on.

Data Serialization

The OData v2 connectors support the Confluent JSON Converter as well as the AVRO Converter.

Single Message Transformations (SMTs)

Single Message Transformations (SMTs) are applied to messages as they flow through Kafka Connect. The use of SMTs in a source connector allows for each record to be passed through one or a chain of several simple transformations before writing data to the Kafka topic. In a sink scenario, SMTs transform outbound messages before they are sent to a sink connector. The OData v2 connectors supports SMTs and has been successfully tested with a concatenation of two SMTs.

Error Handling

The OData connectors apply different kinds of validations and error handling mechanisms like configuration validation, offset recovery, upstream connection tests, HTTP status checks and connection retries.

Single configuration parameter validation extending the Validator class will throw exceptions of type ConfigException in case of invalid configuration values. Additionally, the connector overrides the validate method to validate interdependent configuration parameters and adds error messages to class ConfigValue in case of any invalid parameter values. The corresponding parameters containing invalid values will be framed in red in the Confluent Control Center together with an appropriate error message.

The connectors map known exceptions to exception type ConnectException, which can be handled by Kafka connect accordingly. Errors and warnings are logged using SLF4J, as described in section Logging.

Logging

The connectors make use of SLF4J for logging integration. The loggers use the logger names org.init.ohja.kafka.connect.odatav2.source and org.init.ohja.kafka.connect.odatav2.sink and can be configured e.g., in the log4j configuration properties in the Confluent Platform.

The connector provides additional log location information ohja.location using MDC (mapped diagnostic context). The log location contains the name of the nearest enclosing definition of val, class, trait, object or package and the line number.

Example Log4j 1.x appender:

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m %X{ohja.location}%n

Field projection

Each OData v2 service entity set defines a collection of properties that can be read, updated, deleted, or inserted. An entity distinguishes between key and non-key properties. The configuration of a source connector allows to define a subset of non-key properties that will be extracted to Kafka. Despite this configuration the source connector will always extract all the entity’s key properties.

Selections

The source connector has built-in support for OData query filters. According to SAP Note 1574568 logical operators for SAP OData service query filters are restricted to: ‘eq’,‘ne’,‘le’,‘lt’,‘ge’ and ‘gt’. The source connector supports three additional operators: bt (between), nb (not between) and in (in specified set).
A single filter condition consist of:

  • a property name from the list of filterable properties supplied by the respective service entity, e.g annotated by sap: filterable=true
  • an option defining the OData v2 query filter operator
  • a low value defining the input value for the selected operator or the lower bound of an interval
  • a high value defining the upper bound of an interval

Multiple filter conditions will be combined by a an implicit logical and.

CLI

The kafka-connect-odatav2 connectors package contains a command line interface to validate connector properties, ping OData v2 services, retrieve a list of entity set names for a service, extract the schema for entity sets of OData v2 services, list available OData v2 services of an SAP system, get a list of active subscriptions and delete an active subscription.

Since the CLI is written in Scala you can execute it in Scala or Java. To run the CLI you need to provide the following dependencies in the CLASSPATH:

  1. Scala runtime libraries
  2. kafka-clients
  3. kafka-connect-odata connector libraries

Since a java runtime is provided by the Confluent Platform and the Scala runtime libraries are part of the connector package, executing the CLI with Java would not require installing Scala.

Java

java -cp <kafka-connect-odata>:<kafka-clients> org.init.ohja.kafka.connect.odatav2.OData2App <command> <options>

The Confluent Platform has kafka libraries available in /usr/share/java/kafka/. When the odata connectors package is installed to the plugin path of Connect, the command could look like this:

java -cp \
<CONNECT_PLUGIN_PATH>/init-kafka-connect-odatav2-x.x.x/lib/*:\
/usr/share/java/kafka/* \
org.init.ohja.kafka.connect.odatav2.OData2App \
<commands> <options>

Scala

If an appropriate version of Scala is installed the scala command can be used. This command already provides the necessary Scala runtime libraries.

scala -cp <kafka-connect-odata>:<kafka-clients> org.init.ohja.kafka.connect.odatav2.OData2App <command> <options>

The output will look like this:

usage:
  OData2App <command> <options>

commands:
  ping -s <relative service path>
  list-entitysets -s <relative service path>
  extract-schema -s <relative service path> -e <entity-set>
  list-services                                              (SAP only)
  active-subscriptions -s <relative service path>            (SAP only)
  delete-subscription -u <subscription URL>                  (SAP only)

mandatory options:
  -p <path to connector properties file>

Support

Our full enterprise support offers you access to top-tier support from our connector developers, tailored to meet your specific needs. This includes:

  • 8/5 support
  • 60-minutes response time depending on support plan
  • Full application lifecycle support from development to operations
  • Access to expert support engineers
  • Consultancy in case of individual SAP integration requirements

Please contact connector-support@init-software.de for more information.