Kafka Connect - OData v4 Connectors
The OData v4 connectors package consists of Kafka Connect source and sink connectors that enable seamless interaction with OData v4 services. While these connectors are optimized for SAP OData v4, they can be used with other compatible services as well.
Dependencies
The connectors use the Apache Olingo v4 client library contained in the connectors’ package.
Restrictions and pending features
- OAuth is not yet supported. The connectors do currently support Basic Authentication over http and https and KeyStores/TrustStores.
- The source connector does not provide support for query options such as filters and projections in expanded navigation properties.
- The sink connector does not support processing record values without a value schema.
Configuration
The OData v4 connectors offer full support for the configuration UI in Confluent Control Center, which provides significant advantages over using property files.
This UI offers a wide range of features, including value recommendations, incremental visibility of configurations applicable, rich set of interactive validations. For OData v4 service recommendations in the connector configuration of Confluent Control Center the SAP OData catalogue service /sap/opu/odata4/iwfnd/config/default/iwfnd/catalog/0002 must be activated in the corresponding SAP system. Additionally, the /n/IWFND/V4_ADMIN transaction must be used to publish the OData v4 catalog service.
In the evaluation version of the connector, the number of OData v4 service entities is limited to 5. Once mandatory fields of an OData v4 service entity have been entered in the configuration UI, a new configuration group will appear for configuring another service entity. For performance reasons, the UI of the Confluent Control Center only displays a maximum of 1000 recommendations.
Please refer to Source Configuration Details and Sink Configuration Details. (Or the files doc/sink/configuration.html and doc/source/configuration.html in the connector package). Furthermore, subfolder etc of the connectors package contains configuration template properties files.
Service destination configuration
A minimal OData v4 service destination configuration looks like this:
# OData server host as either DNS or IP
sap.odata.host.address = services.odata.org
# OData server port
sap.odata.host.port = 443
# OData protocol (supported values are http or https)
sap.odata.host.protocol = https
# OData user name for basic authentication
# For services not requiring authentication this can be set to any value
sap.odata.user.name = anonymous
# OData user password for basic authentication
# For services not requiring authentication this can be set to any value
sap.odata.user.pwd = anonymous
Encrypted communication is supported by using HTTPS.
The supported authentication type is basic authentication with a username and password.
Service entity set configuration
A minimal service entity set configuration looks like this:
# OData v4 URL service path
sap.odata#00.service = /V4/Northwind/Northwind.svc/
# OData v4 entity set name
# The entity set name can be queried from the /$metadata service URL
sap.odata#00.entityset = Order_Details
# Kafka topic name the data for this OData service entity set will be pushed to
sap.odata#00.topic = Order_Details
- The service and entityset properties uniquely identify the service entity set.
- To load the recommender, a prefix must be entered.
- topic defines the Kafka output topic the connector producer will use to publish extracted data.
- decimal.mapping can optionally be used to transform DECIMAL types to other appropriate data types if needed.
decimal.mapping = primitive will transform decimals to double or long, depending on the scale.
OData services can be used and queried directly by using modern web browsers. A browser can be used e.g. for testing the service or for identifying properties and values you need to set in the connectors’ configuration.
Custom KeyStore and TrustStore
A KeyStore is used to store private key and identity certificates so the connector can verify its identity to the corresponding application server in an SSL connection. A TrustStore is used to store certificates from Certified Authorities (CA) that verify the certificate presented by the application servers in an SSL connection.
The OData V4 connectors support secure communication with the corresponding application server using the default Java KeyStore and TrustStore as well as a custom KeyStore and TrustStore.
To enable custom KeyStore and TrustStore set the following properties:
# Absolute path to the custom KeyStore on the worker machine.
sap.odata.keystore.location = path/to/your/custom/KeyStore
# The password for the custom KeyStore.
sap.odata.keystore.password = password
# The type of the custom KeyStore.
# e.g. jceks, jks, dks, pkcs11, pkcs12
sap.odata.keystore.type = jks
# Absolute path to the custom TrustStore on the worker machine.
sap.odata.truststore.location = path/to/your/custom/TrustStore
# The password for the custom TrustStore.
sap.odata.truststore.password = password
# The type of the custom TrustStore.
# e.g. jceks, jks, dks, pkcs11, pkcs12
sap.odata.truststore.type = jks
# The protocol that is applied for the encrypted communication between connector and server, like SSL or TLS.
# e.g. SSL, SSLv2, SSLv3, TLS, TLSv1, TLSv1.1, TLSv1.2, TLSv1.3
sap.odata.sslcontext.protocol = SSL
Following types are supported file types for KeyStore and TrustStore: jceks, jks, dks or pkcs12.
The configuration sslcontext.protocol sets the encryption protocol which will be used for the secure communication.
Supported protocols:
SSL, SSLv2, SSLv3, TLS, TLSv1, TLSv1.1, TLSv1.2 und TLSv1.3
Supported KeyStore and TrustStore types:
jceks, jks, dks, pkcs11, pkcs12
Hints
- The custom TrustStore and KeyStore connector configurations overwrite the specific SSL connection configuration for the involved connector and not the global JVM configuration.
- If no custom TrustStore or KeyStore is defined, the system default is used.
- A password is always required for using a custom TrustStore or KeyStore.
- The configuration parameters for the encryption protocol as well the KeyStore and TrustStore types are case sensitive.
Information
Source Connector
Parallelism
The OData V4 source connector’s parallelism is determined by assigning a single worker task to an OData service entity set. This limitation is primarily due to the Connect API. While it’s possible to handle REST calls and data loads package-wise across multiple tasks, but at the point in time when the poll() method is called getting new data from the source, the tasks are already created, and we would need a rebalance to change the task assignments. If the amount of configured OData service entity sets is greater than the maxTasks configuration or the amount of available tasks, a single task will handle extractions for multiple entity sets. Scaling by adding tasks therefore only makes sense if the amount of configured entity sets is greater than the amount of available tasks.
In addition to that, scaling by adding topic partitions makes no sense at all, as we only use one partition to guarantee sequential order.
Delivery semantics
Change Data Capture was officially introduced with the OASIS OData v4 protocol specification. With The Apache Olingo v4 library, the source connector is capable of implementing delta mode based on the specification. This includes offset recovery and exactly-one delivery semantics.
In general, source and sink connectors provide at-least-once delivery guarantees. However, the OData source connector stands out by ensuring that data is processed exactly once When operating in delta mode, or in full mode with the execution period set to -1 for one-time polls.
To enable exactly-once support for source connectors in your Kafka cluster, you will need to update the worker-level configuration property exactly.once.source.support to either preparing or enabled. For further information, see see KIP-618
When deploying a source connector, you can configure the exactly.once.support property to make exactly-once delivery either requested or required (default is requested).
-
required the connector will undergo a preflight check to ensure it can provide exactly-once delivery. If the connector or worker cannot support exactly-once delivery, creation or validation requests will fail.
-
requested the connector will not undergo a preflight check for exactly-once delivery.
Operation Mode
The source connector can be operated in either full or delta mode. You can decide about the operation mode for each single OData v4 service by setting the individual configuration property track-changes to either 0 (full mode) or 1 (delta mode).
Full mode sends a periodic data request to the corresponding source OData v4 service requesting data from an entity set.
Delta mode uses the change-tracking functionality supported by some OData v4 services to perform an initial data request for the corresponding entity set first. Subsequent requests only extract data in delta mode (Change Data Capture), which only contain new, changed or deleted records for the same set of entities as in the initial data request. Deleted entity records will only have the key properties filled in the value part of the message, all other properties will be set to null. Furthermore, deleted entity records will contain either a RECORDMODE or ODQ_CHANGEMODE field set to “D”, as well as a timestamp field DELETED_AT that indicates when the entity was removed from the source system.
Once all available entity records have been extracted, the source connector will pause for exec-period seconds before sending the next data request to an OData v4 service.
Offset handling
The logic offset for each message produced by the source connector contains a
- URL from which the message was requested,
- a record number identifying the position of the message in the response and
- optional a second URL for the succeeding request in case of server side paging (next link) or change tracking (delta link).
If any problems or task rebalances occur, the connector will obtain the most recent offsets from Kafka Connect and will resume extracting the last processed requests from the OData v4 service to ensure that no data is lost. To prevent duplicate messages from being stored in the output topic, all messages up to the record number of the associated logical offset will be treated as duplicates and filtered out before the SourceRecords are passed on to the producer. This implies that if a restart occurs, the source connector will attempt to recover from the last request immediately, before going into a wait-loop for the next execution interval specified by exec-period.
When delta/change tracking mode, this approach enables at-least-once semantics.However, in full mode, it is the responsibility of the source system to ensure that the subsequent call to the last request URL, after restarting the connector instance, returns identical data up to the record index stored in the offset before the connector stopped working.
Graceful backoff
In case of connection or communication issues with the configured OData v4 service endpoint the source connector will implement a retry backoff strategy. The maximum number of retry attempts can be set by using property sap.odata.max.retries. After each unsuccessful connection attempt, the connector will pause execution for a random duration within the range specified by the configuration properties sap.odata.min.retry.backoff.ms and sap.odata.max.retry.backoff.ms.
For more precise control over the maximum wait time when establishing a connection, as well as the maximum wait time for the server to respond before attempting to retry an OData request, the configuration properties sap.odata.connection.connect.timeout.ms and sap.odata.connection.read.timeout.ms can utilized.
Partitioning
The partitions provided with each SourceRecord equals the service and entity set name, as this is the primary key of an entity set as defined by the OData v4 protocol.
Supported data types
The following table gives an overview of the OData v4 data type mapping from OData v4 data types to Kafka connect data types applied by the source connector:
OData v4 | Kafka Connect Schema Type | Java data type |
---|---|---|
EdmBinary | BYTES | java.nio.ByteBuffer |
EdmBoolean | BOOLEAN | java.lang.Boolean |
EdmByte | INT16 | java.lang.Short |
EdmDate | DATE | java.util.Date |
EdmDateTimeOffset | TIMESTAMP | java.util.Date |
EdmDecimal | DECIMAL | java.math.BigDecimal |
EdmDouble | FLOAT64 | java.lang.Double |
EdmDuration | DECIMAL | java.math.BigDecimal |
EdmGuid | STRING | java.lang.String |
EdmInt16 | INT16 | java.lang.Short |
EdmInt32 | INT32 | java.lang.Integer |
EdmInt64 | INT64 | java.lang.Long |
EdmSByte | INT8 | java.lang.Byte |
EdmSingle | FLOAT32 | java.lang.Float |
EdmStream | STRING | java.lang.String |
EdmString | STRING | java.lang.String |
EdmTimeOfDay | TIME | java.util.Date |
EdmEnumType | STRING | java.lang.String |
EdmStructuredType | STRUCT | java.util.Map[String, Object] |
Hints
- The scale value ‘floating’ for properties of type EdmDecimal is not supported.
- The scale value ‘variable’ for properties of type EdmDecimal is converted to the value of the precision facet. If the precision facet is unspecified, ‘6’ is used as a scale with rounding mode ‘half even’ for the conversion. The fallback scale and the rounding mode can be overridden using sap.odata.decimal.scale.fallback and sap.odata.decimal.rounding.mode.
Sink Connector
Delivery semantics
The sink connector makes use of Kafka’s message commit functionality and is therefore able to guarantee at-least-once semantics.
The connector first collects consumed records in a task cache and delivers them to the target service in time intervals established by the connect worker configuration property offset.flush.interval.ms. Moreover, deliveries can be requested outside of this interval based on the number of records accumulated in the task cache with the configuration property sap.odata.flush.trigger.size.
Dead Letter Queue
The sink connector supports the Dead Letter Queue (DLQ) functionality.
Graceful backoff
In case of connection or communication issues with the configured OData v4 service endpoint, the sink connector applies a retry backoff strategy similar to the source connector.
- In single record mode, the sink connector retries a request if any HttpClient transport exception occurs or if a response with the client error code 408 or any server error code > 500 is returned.
- In batch mode, the same rule applies for the $batch request. Errors contained in the batch response do lead to a connector stop and are not retried.
Operation Mode
The sink connector currently offers the operation modes Insert, Update and Dynamic, which can be set through the configuration property sap.odata.operation.mode. The sink connector is currently limited to standard OData v4 CRUD operations, therefore an Upsert mode is currently not supported, but can be offered by the OData service provider as part of the insert or update implementation.
- In Insert mode the sink connector utilizes the HTTP POST method to export entries. Depending on the OData v4 service implementation, this may result in failures if the entry to be inserted already exists in the target system.
The OData standard provides support for deep inserts, which allows nested business objects to be inserted by an OData service as multiple entities along with their associations to each other. See OData deep inserts for more details. - In Update mode the sink connector uses the HTTP PATCH method to partially update OData entities. As a result, only the fields that are available in the record will be updated in the target system, while any missing record fields will be ignored. Depending on the OData v4 service implementation this might lead to failures if the entry to be updated is not available in the target system.
- In Dynamic mode the sink connector uses either HTTP POST or HTTP PATCH to export records. The value of the record field provided by the configuration property sap.odata.operation.mode.property is used at record level to decide which HTTP method is used. If this field is set to ‘I’, HTTP POST will be used, while a value of ‘U’ will result in the use of HTTP PATCH.
Delete Operation
The sink connector has the capability to delete records from the target system when a tombstone record is consumed by configuring the property sap.odata.enable.delete. A tombstone record is identified as a Kafka record that contains a non-null key and a null value. When this feature is enabled, the sink connector will convert tombstone records into delete requests that use the DELETE HTTP method. If the feature is not enabled, all tombstone records will be disregarded.
In order for this feature to function correctly, it is necessary for all key properties of an entry to be present in the record key, and for the record key itself to be of type struct.
Batching Requests
By default, the sink connector groups multiple operations into a single HTTP request payload to reduce the overhead of HTTP transmission and improve overall performance. The maximum batch size is configurable with sap.odata.max.batch.size. For more information about general tweaking of the batch size for connectors, see Batch size for sink connector.
If batching requests is disabled, every record that is read from Kafka will be sent to the target system as a separate request. This means that an HTTP connection will be opened and closed for each individual record.
Data Type Mapping
The following table gives an overview of the OData v4 data type mapping from Kafka connect data types to OData v4 data types applied by the sink connector. The default conversion type is the first one listed in the Kafka Connect Schema Types column.
OData v4 | Kafka Connect Schema Types |
---|---|
EdmBinary | BYTES |
EdmBoolean | BOOLEAN |
EdmByte | INT8, INT16, INT32, INT64 |
EdmDate | DATE, TIMESTAMP, INT64 |
EdmDateTimeOffset | DATE, TIMESTAMP, INT64 |
EdmDecimal | DECIMAL, FLOAT32, FLOAT64, INT8, INT16, INT32, INT64 |
EdmDouble | INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, DECIMAL |
EdmDuration | DECIMAL, INT8, INT16, INT32, INT64 |
EdmGuid | STRING |
EdmInt16 | INT8, INT16, INT32, INT64 |
EdmInt32 | INT8, INT16, INT32, INT64 |
EdmInt64 | INT8, INT16, INT32, INT64 |
EdmSByte | INT8, INT16, INT32, INT64 |
EdmSingle | INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, DECIMAL |
EdmStream | STRING |
EdmString | STRING |
EdmTimeOfDay | TIME, DATE, TIMESTAMP, Int64 |
EdmEnumType | STRING |
EdmStructuredType | STRUCT |
Hints
- Type conversions into OData types from other Kafka Connect Schema Types than the default conversion type might result in inaccurate data or loss of information. E.g. some integer values that can be represented as INT32 may not be accurately represented as EdmInt16.
- The sink connector does not actively use ETags for concurrency control. When issuing a PUT or DELETE request, the sink connector uses the value ’*’ in the If-Match HTTP request header.
Installation
Manual
Put the connector jars and library jars from directory /lib in the configured plugin path of Kafka Connect.
Confluent Hub CLI
The Confluent platform offers a command-line interface for installation, which can be used to install the connectors package zip file from either a local file system.
License
See Lizenzvereinbarung for INITs evaluation license and Dependencies for more information on the dependencies’ licenses. (doc/licenses.html file in connector package)
Usage
Packaging
The Odata v4 connectors package name is init-kafka-connect-odatav4-<connector version>.zip
The zip archive includes one folder called init-kafka-connect-odatav4-<connector version>, which itself contains the nested folders lib, etc, assets and doc.
- lib/ contains the java archives that need to be extracted into the plugin.path of Kafka Connect.
- etc/ contains sample connector configuration property files. These can be supplied as an argument in the CLI during startup of Kafka Connect or by upload to the Confluent Control Center.
- assets/ contains media files like icons and company logos.
- doc/ contains more detailed documentation about the connectors like licenses, readme, configurations and so on.
Data Serialization
The OData v4 connectors are tested with and therefore support the Confluent JSON Converter as well as the AVRO Converter.
SMT
The use of Single Message Transforms in a source connector allows for each record to be passed through one or a chain of several simple transformations before writing data to the Kafka topic. The OData v2 source connector supports SMT and has been successfully tested with a concatenation of two SMTs.
Error Handling
The OData connectors apply different kinds of validations and error handling mechanisms like configuration validation, offset recovery, upstream connection tests, HTTP status checks and connection retries.
Single configuration parameter validation extending the Validator class will throw exceptions of type ConfigException in case of invalid configuration values. Additionally, the connector overrides the validate method to validate interdependent configuration parameters and adds error messages to class ConfigValue in case of any invalid parameter values. The corresponding parameters containing invalid values will be framed in red in the Confluent Control Center together with an appropriate error message.
The connectors map known exceptions to exception type ConnectException, which can be handled by Kafka connect accordingly. Errors and warnings are logged using SLF4J, as described in section Logging.
Logging
The connectors make use of SLF4J for logging integration. The loggers use the names org.init.ohja.kafka.connect.odatav4.source and org.init.ohja.kafka.connect.odatav4.sink and can be configured e.g., in the log4j configuration properties within the Confluent Platform.
The connector provides additional log location information ohja.location using MDC (mapped diagnostic context). The log location contains the name of the nearest enclosing definition of val, class, trait, object or package and the line number.
Example Log4j 1.x appender:
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m %X{ohja.location}%n
Field projection
Each OData v4 service entity set defines a set of properties that can be read, updated, deleted or inserted. An entity distinguishes between key and non-key properties. When The configuration of a source connector allows to define a subset of non-key properties that will be extracted to Kafka. Despite this configuration the source connector will always extract all the entity key properties.
Selections
The source connector has built-in support for System Query Option $filter. According to SAP Note 1574568 logical operators for SAP OData service query filters are restricted to: ‘eq’,‘ne’,‘le’,‘lt’,‘ge’ and ‘gt’. The source connector supports three additional operators: bt (between), nb (not between) and in (in specified set).
A single filter condition consist of:
- a property name from the list of filterable properties supplied by the respective service entity, e.g annotated by sap: filterable=true
- an option defining the OData v4 query filter operator
- a low value defining the input value for the selected operator or the lower bound of an interval
- a high value defining the upper bound of an interval
Multiple filter conditions will be combined by a an implicit logical and.
CLI
The kafka-connect-odatav4 connectors package contains a command line interface to validate connector properties, ping OData v4 services, retrieve a list of entity set names for a service, extract the schema for entity sets of OData v4 services and list available OData v4 services of an SAP system.
Since the CLI is written in Scala you can execute it in Scala or Java. To run the CLI you need to provide the following dependencies in the CLASSPATH:
- Scala runtime libraries
- kafka-clients
- kafka-connect-odata connector libraries
Since a java runtime is provided by the Confluent Platform and the Scala runtime libraries are part of the connector package, executing the CLI with Java would not require installing Scala.
Java
java -cp <kafka-connect-odatav4>:<kafka-clients> org.init.ohja.kafka.connect.odatav4.OData4App <command> <options>
The Confluent Platform has kafka libraries available in /usr/share/java/kafka/. When the odata v4 connectors package is installed to the plugin path of Connect, the command could look like this:
java -cp \
<CONNECT_PLUGIN_PATH>/init-kafka-connect-odatav4-x.x.x/lib/*:\
/usr/share/java/kafka/* \
org.init.ohja.kafka.connect.odatav4.OData4App \
<commands> <options>
Scala
If an appropriate version of Scala is installed the scala command can be used. This command already provides the necessary Scala runtime libraries.
scala -cp <kafka-connect-odatav4>:<kafka-clients> org.init.ohja.kafka.connect.odatav4.OData4App <command> <options>
The output will look like this:
usage:
OData4App <command> <options>
commands:
ping -s <relative service path>
list-entitysets -s <relative service path>
extract-schema -s <relative service path> -e <entity-set>
list-services (SAP only)
active-subscriptions -s <relative service path> (SAP only)
delete-subscription -u <subscription URL> (SAP only)
mandatory options:
-p <path to connector properties file>
Support
Full enterprise support provides expert support from the developers of the connector at a service level agreement suitable for your needs, which may include
- 8/5 support
- 60-minute response times depending on support plan
- Full application lifecycle support from development to operations
- Access to expert support engineers
- Consultancy in case of individual SAP integration requirements
Please contact connector-support@init-software.de for more information.