Kafka Connect - OData v4 Connectors

The OData v4 connectors package consists of Kafka Connect source and sink connectors that enable seamless interaction with OData v4 services. While these connectors are optimized for SAP OData v4, they can be used with other compatible services as well.

Dependencies

The connectors use the Apache Olingo v4 client library contained in the connectors’ package.

License

See licenses for INITs evaluation license (DE and EN) or for more information on the dependencies’ licenses.

Installation

Packaging

The OData v4 connectors package name is init-kafka-connect-odatav4-<connector version>.zip

The zip archive includes one folder called init-kafka-connect-odatav4-<connector version>, which itself contains the nested folders lib, etc, assets and doc.

  • lib/ contains the Java archives that need to be extracted into the plugin.path of Kafka Connect.
  • etc/ contains sample connector configuration property files. These can be supplied as an argument in the CLI during startup of Kafka Connect or by upload to the Confluent Control Center.
  • assets/ contains media files like icons and company logos.
  • doc/ contains more detailed documentation about the connectors like licenses, readme, configurations and so on.

Manual

Put the connector jars and library jars from directory /lib in the configured plugin path of Kafka Connect.

Confluent Hub CLI

The Confluent platform offers a command-line interface for installation, which can be used to install the connectors package zip file from either a local file system.

General configuration

The OData v4 connectors offer full support for the configuration UI in Confluent Control Center, which provides significant advantages over using property files.
This UI offers a wide range of features, including value recommendations, incremental visibility of configurations applicable, rich set of interactive validations. For OData v4 service recommendations in the connector configuration of Confluent Control Center the SAP OData catalogue service /sap/opu/odata4/iwfnd/config/default/iwfnd/catalog/0002 must be activated in the corresponding SAP system. Additionally, the /n/IWFND/V4_ADMIN transaction must be used to publish the OData v4 catalog service.

In the evaluation version of the connector, the number of OData v4 service entities is limited to 5. Once mandatory fields of an OData v4 service entity have been entered in the configuration UI, a new configuration group will appear for configuring another service entity. For performance reasons, the UI of the Confluent Control Center only displays a maximum of 1000 recommendations.

Please refer to Source Configuration Details and Sink Configuration Details. (Or the files doc/sink/configuration.html and doc/source/configuration.html in the connector package). Furthermore, subfolder etc of the connectors package contains configuration template properties files.

Service destination configuration

A minimal OData v4 service destination configuration looks like this:

# OData server host as either DNS or IP
sap.odata.host.address = services.odata.org
# OData server port
sap.odata.host.port = 443
# OData protocol (supported values are http or https)
sap.odata.host.protocol = https
# OData user name for basic authentication
# For services not requiring authentication this can be set to any value
sap.odata.user.name = anonymous
# OData user password for basic authentication
# For services not requiring authentication this can be set to any value
sap.odata.user.pwd = anonymous

Encrypted communication is supported by using HTTPS.
The supported authentication types are basic authentication with a username and password and OAuth client credentials flow.

Service entity set configuration

A minimal service entity set configuration looks like this:

# OData v4 URL service path
sap.odata#00.service = /V4/Northwind/Northwind.svc/
# OData v4 entity set name
# The entity set name can be queried from the /$metadata service URL
sap.odata#00.entityset = Order_Details
# Kafka topic name the data for this OData service entity set will be pushed to
sap.odata#00.topic = Order_Details
  • The service and entityset properties uniquely identify the service entity set.
  • To load the recommender, a prefix must be entered.
  • topic defines the Kafka output topic the connector producer will use to publish extracted data.
  • decimal.mapping can optionally be used to transform DECIMAL types to other appropriate data types if needed.
    decimal.mapping = primitive will transform decimals to double or long, depending on the scale.

OData services can be used and queried directly by using modern web browsers. A browser can be used e.g. for testing the service or for identifying properties and values you need to set in the connectors’ configuration.

Custom KeyStore and TrustStore

A KeyStore is used to store private key and identity certificates so the connector can verify its identity to the corresponding application server in an SSL connection. A TrustStore is used to store certificates from Certified Authorities (CA) that verify the certificate presented by the application servers in an SSL connection.

The OData V4 connectors support secure communication with the corresponding application server using the default Java KeyStore and TrustStore as well as a custom KeyStore and TrustStore.

To enable custom KeyStore and TrustStore set the following properties:

# Absolute path to the custom KeyStore on the worker machine.
sap.odata.keystore.location = path/to/your/custom/KeyStore
# The password for the custom KeyStore.
sap.odata.keystore.password = password
# The type of the custom KeyStore.
# e.g. jceks, jks, dks, pkcs11, pkcs12
sap.odata.keystore.type = jks
# Absolute path to the custom TrustStore on the worker machine.
sap.odata.truststore.location = path/to/your/custom/TrustStore
# The password for the custom TrustStore.
sap.odata.truststore.password = password
# The type of the custom TrustStore.
# e.g. jceks, jks, dks, pkcs11, pkcs12
sap.odata.truststore.type = jks
# The protocol that is applied for the encrypted communication between connector and server, like SSL or TLS.
# e.g. SSL, SSLv2, SSLv3, TLS, TLSv1, TLSv1.1, TLSv1.2, TLSv1.3
sap.odata.sslcontext.protocol = SSL

Following types are supported file types for KeyStore and TrustStore: jceks, jks, dks or pkcs12.

The configuration sslcontext.protocol sets the encryption protocol which will be used for the secure communication.

Supported protocols

SSL, SSLv2, SSLv3, TLS, TLSv1, TLSv1.1, TLSv1.2 und TLSv1.3

Supported KeyStore and TrustStore types

jceks, jks, dks, pkcs11, pkcs12

Hints

  • The custom TrustStore and KeyStore connector configurations overwrite the specific SSL connection configuration for the involved connector and not the global JVM configuration.
  • If no custom TrustStore or KeyStore is defined, the system default is used.
  • A password is always required for using a custom TrustStore or KeyStore.
  • The configuration parameters for the encryption protocol as well the KeyStore and TrustStore types are case sensitive.

Custom HTTP header

Custom HTTP headers are optional fields that can be included in the header section of an HTTP request. A custom HTTP header field allows for the inclusion of specific information or metadata with each HTTP request. These headers are not part of the standard set defined by HTTP/1.1 specifications but can be defined and used by connector for various purposes. This additional information could be used for authentication, tracking, logging, or any other custom functionality that necessitates passing specific details along with each HTTP request.

Every HTTP request will include the specified custom headers. The custom HTTP headers should be formatted as key-value pairs separated by commas. To activate a custom HTTP header, configure the associated property as follows:

# All HTTP requests will include provided custom headers.
sap.odata.custom.http.header = key1:val1, key2:val2

Supported features

Data Serialization

The OData v4 connectors are tested with and therefore support the Confluent JSON Converter as well as the AVRO Converter.

SMT

The use of Single Message Transforms in a source connector allows for each record to be passed through one or a chain of several simple transformations before writing data to the Kafka topic. The OData v4 connectors supports SMT and has been successfully tested with a concatenation of two SMTs.

Error Handling

The OData connectors apply different kinds of validations and error handling mechanisms like configuration validation, offset recovery, upstream connection tests, HTTP status checks and connection retries.

Single configuration parameter validation extending the Validator class will throw exceptions of type ConfigException in case of invalid configuration values. Additionally, the connector overrides the validate method to validate interdependent configuration parameters and adds error messages to class ConfigValue in case of any invalid parameter values. The corresponding parameters containing invalid values will be framed in red in the Confluent Control Center together with an appropriate error message.

The connectors map known exceptions to exception type ConnectException, which can be handled by Kafka connect accordingly. Errors and warnings are logged using SLF4J, as described in section Logging.

Logging

The connectors make use of SLF4J for logging integration. The loggers use the names org.init.ohja.kafka.connect.odatav4.source and org.init.ohja.kafka.connect.odatav4.sink and can be configured e.g., in the log4j configuration properties within the Confluent Platform.

The connector provides additional log location information ohja.location using MDC (mapped diagnostic context). The log location contains the name of the nearest enclosing definition of val, class, trait, object or package and the line number.

Example Log4j 1.x appender:

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m %X{ohja.location}%n

Field projection

Each OData v4 service entity set defines a set of properties that can be read, updated, deleted or inserted. An entity distinguishes between key and non-key properties. When The configuration of a source connector allows to define a subset of non-key properties that will be extracted to Kafka. Despite this configuration the source connector will always extract all the entity key properties.

Selections

The source connector has built-in support for System Query Option $filter. According to SAP Note 1574568 logical operators for SAP OData service query filters are restricted to: ‘eq’,‘ne’,‘le’,‘lt’,‘ge’ and ‘gt’. The source connector supports three additional operators: bt (between), nb (not between) and in (in specified set).
A single filter condition consist of:

  • a property name from the list of filterable properties supplied by the respective service entity, e.g annotated by sap: filterable=true
  • an option defining the OData v4 query filter operator
  • a low value defining the input value for the selected operator or the lower bound of an interval
  • a high value defining the upper bound of an interval

Multiple filter conditions will be combined by a an implicit logical and.

CLI tool

The kafka-connect-odatav4 connectors package contains a command line interface to validate connector properties, ping OData v4 services, retrieve a list of entity set names for a service, extract the schema for entity sets of OData v4 services and list available OData v4 services of an SAP system.

Since the CLI is written in Scala you can execute it in Scala or Java. To run the CLI you need to provide the following dependencies in the CLASSPATH:

  1. Scala runtime libraries
  2. kafka-clients
  3. kafka-connect-odata connector libraries

Since a Java runtime is provided by the Confluent Platform and the Scala runtime libraries are part of the connector package, executing the CLI with Java would not require installing Scala.

Java

java -cp <kafka-connect-odatav4>:<kafka-clients> org.init.ohja.kafka.connect.odatav4.OData4App <command> <options>

The Confluent Platform has kafka libraries available in /usr/share/java/kafka/. When the odata v4 connectors package is installed to the plugin path of Connect, the command could look like this:

java -cp \
<CONNECT_PLUGIN_PATH>/init-kafka-connect-odatav4-x.x.x/lib/*:\
/usr/share/java/kafka/* \
org.init.ohja.kafka.connect.odatav4.OData4App \
<commands> <options>

Scala

If an appropriate version of Scala is installed the Scala command can be used. This command already provides the necessary Scala runtime libraries.

scala -cp <kafka-connect-odatav4>:<kafka-clients> org.init.ohja.kafka.connect.odatav4.OData4App <command> <options>

The output will look like this:

usage:
  OData4App <command> <options>

commands:
  ping -s <relative service path>
  list-entitysets -s <relative service path>
  extract-schema -s <relative service path> -e <entity-set>
  list-services                                              (SAP only)

mandatory options:
  -p <path to connector properties file>

Hint: Avro schemas may differ if the Single Message Transform(s) in the connector configuration are used.

Restrictions and pending features

  • The connectors do currently support Basic Authentication and OAuth client credentials flow over http and https and KeyStores/TrustStores.
  • The source connector does not provide support for query options such as filters and projections in expanded navigation properties.
  • The sink connector does not support processing record values without a value schema.

Full enterprise support

Full enterprise support provides expert support from the developers of the connector at a service level agreement suitable for your needs, which may include

  • 8/5 support
  • 60-minute response times depending on support plan
  • Full application lifecycle support from development to operations
  • Access to expert support engineers
  • Consultancy in case of individual SAP integration requirements

Please contact connector-contact@init-software.de for more information.