Kafka Connect - OData v2 Connectors

The OData v2 connectors package consists of Kafka Connect source and sink connectors that enable seamless interaction with OData v2 services.

While these connectors are optimized for SAP OData v2, they can be used with other compatible services as well.

Dependencies

The connectors use the Apache Olingo v2 client library contained in the connectors package.

Licensing

See licenses for INITs evaluation license (DE and EN) or for more information on the dependencies’ licenses.

Installation

Packaging

The OData v2 connectors package name is: init-kafka-connect-odata-<connector version>.zip

The zip archive includes one folder called init-kafka-connect-odata-<connector version>, which itself contains the nested folders lib, etc, assets and doc.

  • lib/ contains the Java archives that need to be extracted into the plugin.path of Kafka Connect.
  • etc/ contains sample connector configuration property files. These can be supplied as an argument in the CLI during startup of Kafka Connect or by upload to the Confluent Control Center.
  • assets/ contains media files like icons and company logos.
  • doc/ contains more detailed documentation about the connectors like licenses, readme, configurations and so on.

Manual

Put the connector jars and library jars from directory /lib in the configured plugin path of Kafka Connect.

Confluent CLI

The Confluent platform provides a command line installation interface that can be used to install the connectors package zip file from a local file system, see Confluent CLI Command Reference

General configuration

The OData v2 connectors offer complete support for the configuration UI in Confluent Control Center. Unlike using property files, configuring the connectors through Confluent Control Center provides several benefits, including proposed properties, a comprehensive set of recommended property values, incremental visibility of applicable configurations, an extensive range of interactive validations, and more.

To enable the SAP OData service recommendations within the connector configuration of Confluent Control Center, the /sap/opu/odata/iwfnd/CATALOGSERVICE SAP OData catalog service must be activated within the relevant SAP system.
The OData v2 source connector has built in support for ODP-based extraction.

The evaluation version of the connector is limited to 5 OData v2 service entities. Once the mandatory fields of a service entity have been entered in the configuration UI, a new configuration group for configuring an additional service entity will appear. To ensure optimal performance, the UI of the Confluent Control Center only displays a maximum of 1000 recommendations.

Please refer to Source Configuration Details and Sink Configuration Details. Furthermore, sub-folder etc of the connectors package contains configuration template properties files.

Service destination configuration

A minimal OData v2 service destination configuration looks like this:

# OData v2 server host as either DNS or IP
sap.odata.host.address = services.odata.org
# OData v2 server port
sap.odata.host.port = 443
# OData v2 protocol (supported values are http or https)
sap.odata.host.protocol = https
# OData v2 user name for basic authentication.
# For services not requiring authentication this can be set to any value.
sap.odata.user.name = anonymous
# OData v2 user password for basic authentication.
# For services not requiring authentication this can be set to any value.
sap.odata.user.pwd = anonymous

Encrypted communication is supported by using HTTPS.
The supported authentication types are basic authentication with a username and password and OAuth client credentials flow.

Retriable HTTP codes

Repeating the request may be appropriate for certain HTTP response codes.

# OData v2 retriable http codes
sap.odata.http.codes = 408,502,503,504

The connector will retry the specified list http response codes.

Service entity set configuration

A minimal service entity set configuration looks like this:

# OData v2 URL service path
sap.odata#00.service = /V2/Northwind/Northwind.svc
# OData v2 entity set name.
sap.odata#00.entityset = Order_Details
# Kafka topic name
sap.odata#00.topic = Order_Details
  • The service and entityset properties uniquely identify the service entity set.
  • To load the recommender, a prefix must be entered.
  • topic defines the Kafka output topic the connector producer will use to publish extracted data.
  • decimal.mapping can optionally be used to transform DECIMAL types to other appropriate data types if needed.
    decimal.mapping = primitive will transform decimals to double or long, depending on the scale.

Modern web browsers allow for direct use and querying of OData services. This feature can be used, e.g. to test the service or to identify the properties and values required for the connectors’ configuration.

Custom KeyStore and TrustStore

A KeyStore used to store private key and identity certificates, which enables the connector to verify its identity to the corresponding application server in an SSL connection. On the other hand, a TrustStore is utilized to store certificates issued by Certified Authorities (CA) that authenticate the certificate presented by the application servers in an SSL connection.

The OData V2 connectors enable secure communication with the corresponding application server using both the default Java KeyStore and TrustStore, as well as a custom KeyStore and TrustStore.

To enable custom KeyStore and TrustStore set the following properties:

# Absolute path to the custom KeyStore on the worker machine.
sap.odata.keystore.location = path/to/your/custom/KeyStore
# The password for the custom KeyStore.
sap.odata.keystore.password = password
# The type of the custom KeyStore.
# e.g. jceks, jks, dks, pkcs11, pkcs12
sap.odata.keystore.type = jks
# Absolute path to the custom TrustStore on the worker machine.
sap.odata.truststore.location = path/to/your/custom/TrustStore
# The password for the custom TrustStore.
sap.odata.truststore.password = password
# The type of the custom TrustStore.
# e.g. jceks, jks, dks, pkcs11, pkcs12
sap.odata.truststore.type = jks
# The protocol that is applied for the encrypted communication between connector and server, like SSL or TLS.
# e.g. SSL, SSLv2, SSLv3, TLS, TLSv1, TLSv1.1, TLSv1.2, TLSv1.3
sap.odata.sslcontext.protocol = SSL

Following types are supported file types for KeyStore and TrustStore: jceks, jks, dks or pkcs12.

The configuration sslcontext.protocol sets the encryption protocol which will be used for the secure communication.

Supported protocols

SSL, SSLv2, SSLv3, TLS, TLSv1, TLSv1.1, TLSv1.2 und TLSv1.3

Supported KeyStore and TrustStore types

jceks, jks, dks, pkcs11, pkcs12

Hints

  • The custom TrustStore and KeyStore connector configurations overwrite the specific SSL connection configuration for the involved connector and not the global JVM configuration.
  • In the absence of any custom TrustStore or KeyStore definition, the system default is used.
  • A password is always required for utilizing a custom TrustStore or KeyStore.
  • The configuration parameters for the encryption protocol as well the KeyStore and TrustStore types are case sensitive.

Supported features

Data serialization

The OData v2 connectors support the Confluent JSON Converter as well as the AVRO Converter.

Single Message Transformations (SMTs)

Single Message Transformations (SMTs) are applied to messages as they flow through Kafka Connect. The use of SMTs in a source connector allows for each record to be passed through one or a chain of several simple transformations before writing data to the Kafka topic. In a sink scenario, SMTs transform outbound messages before they are sent to a sink connector. The OData v2 connectors supports SMTs and has been successfully tested with a concatenation of two SMTs.

Error handling

The OData connectors apply different kinds of validations and error handling mechanisms like configuration validation, offset recovery, upstream connection tests, HTTP status checks and connection retries.

Single configuration parameter validation extending the Validator class will throw exceptions of type ConfigException in case of invalid configuration values. Additionally, the connector overrides the validate method to validate interdependent configuration parameters and adds error messages to class ConfigValue in case of any invalid parameter values. The corresponding parameters containing invalid values will be framed in red in the Confluent Control Center together with an appropriate error message.

The connectors map known exceptions to exception type ConnectException, which can be handled by Kafka connect accordingly. Errors and warnings are logged using SLF4J, as described in section Logging.

Logging

The connectors make use of SLF4J for logging integration. The loggers use the logger names org.init.ohja.kafka.connect.odatav2.source and org.init.ohja.kafka.connect.odatav2.sink and can be configured e.g., in the log4j configuration properties in the Confluent Platform.

The connector provides additional log location information ohja.location using MDC (mapped diagnostic context). The log location contains the name of the nearest enclosing definition of val, class, trait, object or package and the line number.

Example Log4j 1.x appender:

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m %X{ohja.location}%n

Field projection

Each OData v2 service entity set defines a collection of properties that can be read, updated, deleted, or inserted. An entity distinguishes between key and non-key properties. The configuration of a source connector allows to define a subset of non-key properties that will be extracted to Kafka. Despite this configuration the source connector will always extract all the entity’s key properties.

Selections

The source connector has built-in support for OData query filters. According to SAP Note 1574568 logical operators for SAP OData service query filters are restricted to: ‘eq’,‘ne’,‘le’,‘lt’,‘ge’ and ‘gt’. The source connector supports three additional operators: bt (between), nb (not between) and in (in specified set).
A single filter condition consist of:

  • a property name from the list of filterable properties supplied by the respective service entity, e.g annotated by sap: filterable=true
  • an option defining the OData v2 query filter operator
  • a low value defining the input value for the selected operator or the lower bound of an interval
  • a high value defining the upper bound of an interval

Multiple filter conditions will be combined by a an implicit logical and.

CLI tool

The kafka-connect-odatav2 connectors package contains a command line interface to validate connector properties, ping OData v2 services, retrieve a list of entity set names for a service, extract the schema for entity sets of OData v2 services, list available OData v2 services of an SAP system, get a list of active subscriptions and delete an active subscription.

Since the CLI is written in Scala you can execute it in Scala or Java. To run the CLI you need to provide the following dependencies in the CLASSPATH:

  1. Scala runtime libraries
  2. kafka-clients
  3. kafka-connect-odata connector libraries

Since a Java runtime is provided by the Confluent Platform and the Scala runtime libraries are part of the connector package, executing the CLI with Java would not require installing Scala.

Java

java -cp <kafka-connect-odata>:<kafka-clients> org.init.ohja.kafka.connect.odatav2.OData2App <command> <options>

The Confluent Platform has kafka libraries available in /usr/share/java/kafka/. When the odata connectors package is installed to the plugin path of Connect, the command could look like this:

java -cp \
<CONNECT_PLUGIN_PATH>/init-kafka-connect-odatav2-x.x.x/lib/*:\
/usr/share/java/kafka/* \
org.init.ohja.kafka.connect.odatav2.OData2App \
<commands> <options>

Scala

If an appropriate version of Scala is installed the Scala command can be used. This command already provides the necessary Scala runtime libraries.

scala -cp <kafka-connect-odata>:<kafka-clients> org.init.ohja.kafka.connect.odatav2.OData2App <command> <options>

The output will look like this:

usage:
  OData2App <command> <options>

commands:
  ping -s <relative service path>
  list-entitysets -s <relative service path>
  extract-schema -s <relative service path> -e <entity-set>
  list-services                                              (SAP only)
  active-subscriptions -s <relative service path>            (SAP only)
  delete-subscription -u <subscription URL>                  (SAP only)

mandatory options:
  -p <path to connector properties file>

Hint: Avro schemas may differ if the Single Message Transform(s) in the connector configuration are used.

Restrictions and pending features

  • The connectors are restricted to performing standard OData v2 CRUD operations on entity sets.
  • The connectors do currently support Basic Authentication and OAuth client credentials flow over http and https and KeyStores/TrustStores.
  • The source connector only accepts media type application/atom+xml in delta requests. This is due to the fact that the Olingo implementation for OData v2 lacks support for delta links in JSON.
  • The source connector does not provide support for query options such as filters and projections in expanded navigation properties.
  • The sink connector does not support processing record values without a value schema.

Full enterprise support

Our full enterprise support offers you access to top-tier support from our connector developers, tailored to meet your specific needs. This includes:

  • 8/5 support
  • 60-minutes response time depending on support plan
  • Full application lifecycle support from development to operations
  • Access to expert support engineers
  • Consultancy in case of individual SAP integration requirements

Please contact connector-contact@init-software.de for more information.