Kafka Connect - OData Business Events Source Connector
The OData Business Events Source Connector is a Kafka Connect source for data replication out of SAP OData v2 business event services.
Dependencies
The connectors use the Apache Olingo v2 client library contained in the connectors package.
License
See licenses for INITs evaluation license (DE and EN) or for more information on the dependencies’ licenses.
Installation
Packaging
The OData Business Events Source Connector package name is: init-kafka-connect-odatabusevent-<connector version>.zip
The zip archive includes one folder called init-kafka-connect-odatabusevent-<connector version>, which itself contains the nested folders lib, etc, assets and doc.
- lib/ contains the Java archives that need to be extracted into the plugin.path of Kafka Connect.
- etc/ contains sample connector configuration property files. These can be supplied as an argument in the CLI during startup of Kafka Connect or by upload to the Confluent Control Center.
- assets/ contains media files like icons and company logos.
- doc/ contains more detailed documentation about the connectors like licenses, readme, configurations and so on.
Manual
Put the connector jars and library jars from directory /lib in the configured plugin path of Kafka Connect.
Confluent CLI
The Confluent platform provides a command line installation interface that can be used to install the connectors package zip file from a local file system, see Confluent CLI Command Reference
Configuration
The OData Business Events Source Connector offers complete support for the configuration UI in Confluent Control Center. Unlike using property files, configuring the connectors through Confluent Control Center provides several benefits, including proposed properties, a comprehensive set of recommended property values, incremental visibility of applicable configurations, an extensive range of interactive validations, and more.
The evaluation version of the connector is limited to 5 SAP business event object types. Once the mandatory fields of a service entity have been entered in the configuration UI, a new configuration group for configuring an additional service entity will appear. To ensure optimal performance, the UI of the Confluent Control Center only displays a maximum of 1000 recommendations.
For more detailed information, please refer to Configuration Details or the source configuration file (doc/source/configuration.html) in the connector package. Additionally, the subfolder etc within the connectors package contains configuration template properties files.
SAP subscriber configuration
The subscriber code used in the connectors’ configuration needs to be maintained in the corresponding SAP system configuration. You can either use maintenance view V_SUBSCRBMAINT to e.g. create the subscriber ID ‘KBES’ or navigate through the SAP menu via transaction SPRO -> Cross-Application Components-> Processes and Tools for Enterprise Applications -> Business Event Handling -> Subscriber -> Create Subscriber ID.
The SAP user being used needs at least the following authorizations for being able to subscribe for business events:
- Object BEH_SUBSCR:
BEH_SUBSCR: KBES
The OData Business Events Source Connector is only able to add subscriptions but not to delete them. If you reduce the number of subscriptions for a connector in the connector configuration, this will not result in deleting subscriptions in the backend, but filtering for the required task codes.
Service destination configuration
A minimal OData v2 service destination configuration looks like this:
# OData v2 server host as either DNS or IP
sap.odata.host.address = services.odata.org
# OData v2 server port
sap.odata.host.port = 443
# OData v2 protocol (supported values are http or https)
sap.odata.host.protocol = https
# OData v2 user name for basic authentication.
# For services not requiring authentication this can be set to any value.
sap.odata.user.name = anonymous
# OData v2 user password for basic authentication.
# For services not requiring authentication this can be set to any value.
sap.odata.user.pwd = anonymous
#Unique business event subscriber ID used to subscribe to SAP business events.
sap.busevent.subscriber.code = KBES
Encrypted communication is supported by using HTTPS.
The supported authentication types are basic authentication with a username and password and OAuth client credentials flow.
Service entity set configuration
A minimal service entity set configuration looks like this:
# Business event object type
sap.busevent#00.object.type = SalesOrder
# Business event object task codes
sap.busevent#00.object.task-code = Deleted,Created,Changed
# Kafka topic name
sap.busevent#00.topic = Order_Details
- The object.type and object.task-code properties uniquely identify the business events to be consumed.
- To load the recommenders for object type an entity name, the name or a prefix must be entered.
- topic defines the Kafka output topic the connector producer will use to publish extracted data.
Modern web browsers allow for direct use and querying of OData services. This feature can be used, e.g. to test the service or to identify the properties and values required for the connectors’ configuration.
Parallelism
To define the parallelism of the source connector, each business event object type is assigned to exactly one worker task. This restriction is due to the Connect API, which handles REST calls and data loads package-wise in multiple tasks. However, at the point when the poll() method is called to retrieve new data from the source, the tasks have already been created, and rebalancing is required to change task assignments.
If the number of configured object types exceeds the maxTasks configuration or the available tasks, a single task will handle extractions for business event object types. Scaling by adding tasks therefore only makes sense if the amount of configured object types is greater than the amount of available tasks. In addition to that, scaling by adding topic partitions makes no sense at all, as the connector only uses one partition to guarantee sequential order.
Delivery semantics
Business events issued by SAP include a unique identifier (GUID). Together with the message offset semantics provided by Kafka Connect, the connector implements offset recovery and exactly-one delivery semantics in the sense of Kafka Connect.
Subscription and business event services
The connector makes use of the following OData v2 service endpoints to handle SAP business events:
- /sap/opu/odata/sap/CA_BEH_SUBSCRIPTION_SRV This service is used for managing subscriptions for business events. You can get a list of active subscriptions from entity set SubscriptionRead.
Transaction code SWETYPV can be used in SAP to see event subscriptions for business events of receiver type BEH. - /sap/opu/odata/sap/C_BEHQUEUEDATA_CDS This service is used to retrieve actual business events for an active subscription. You can get a list of available business events object types and task codes using entity set I_BusObjects.
Transaction code SWEQADM can be used in SAP to administer, monitor and trace the business event queue.
SAP Business events are generated through BOR events and stored in an event queue. These events are not immediately pushed to a subscriber, but require a poll-based approach through the OData service mentioned earlier. Furthermore, every business event carries a group of shared attributes:
- BusinessEvent: Unique business event identifier (GUID)
- SAPObjectTaskTypeName: object change task code description
- SAPObjectTypeName: object type description
- SAPObjectType: object type name
- BusEventSubscriberCode: subscription identifier, equals configuration property sap.busevent.subscriber.code
- SAPObjectTaskCode: object change task code name
- BusinessEventSubscriberName: Subscription name
- BusEventPriority: business event priority
- CreationUTCDateTime: business event creation timestamp in format YYYYMMDDhhmmss
Business events containing the above-mentioned attributes can directly be pushed to the Kafka target topic. In this case only key fields of the affected business object will be provided. In contrast to this, by setting configuration property object.resolve to 1, you can provide another OData service and entity set to get more detailed information about the affected business object via lookups. See SAP API Business Hub for a set of OData business APIs provided by SAP. A requirement is that the OData service entity used to query detailed business object data contains the same key properties as the corresponding business event object. The resulting business object data will be enriched by following business event data: BusinessEvent, SAPObjectType, SAPObjectTaskCode and CreationUTCDateTime. As a lookup of the business event data is done by the connector itself during event extraction, it is not guaranteed that the business data is read at exactly the same point in time as the business event happens. This means that e.g. if multiple events affect the same business object in a short period of time, the resolved data for all these events could be the same and would be equal to the least state of the business object.
See Business Event Handling APIs for more details.
Offset handling
The logic offset for each message produced by the source connector contains a
- business event identifier (GUID)
- business event creation timestamp
In case of any issues or task rebalances, the connector will get the latest offsets from Kafka Connect and restart extracting the last processed requests from the OData v2 service to ensure no data loss. All the messages up to the record number of the corresponding logical offset will be regarded as duplicates and then be filtered out before the SourceRecords are handed over to the producer, to make sure that no duplicate messages will be stored in the output topic. This means that in all cases of a restart the source connector will immediately try to recover from the last request, before going into a wait-loop for the next execution interval configured by exec-period.
Graceful backoff
In case of connection or communication issues with the configured OData v2 service endpoint, the source connector applies a retry backoff strategy. The maximum number of retry attempts can be configured using property sap.odata.max.retries. After each failed connection attempt, the connector blocks execution for a random number of milliseconds in the range between the values of configuration properties sap.odata.min.retry.backoff.ms and sap.odata.max.retry.backoff.ms.
Partitioning
The partitions provided with each SourceRecord equals the object type name, as this is the primary key of a business event type as defined by SAP.
Supported data types
The following table gives an overview of the OData v2 data type mapping from OData v2 data types to Kafka connect data types applied by the source connector:
OData v2 | Kafka Connect Schema Type | Java data type |
---|---|---|
Bit | INT8 | java.lang.Byte |
Uint7 | INT8 | java.lang.Byte |
EdmSByte | INT8 | java.lang.Byte |
EdmByte | INT16 | java.lang.Short |
EdmGuid | STRING | java.lang.String |
EdmTime | TIME | java.util.Date |
EdmInt16 | INT16 | java.lang.Short |
EdmInt32 | INT32 | java.lang.Integer |
EdmInt64 | INT64 | java.lang.Long |
EdmBinary | BYTES | java.nio.ByteBuffer |
EdmDouble | FLOAT64 | java.lang.Double |
EdmSingle | FLOAT32 | java.lang.Float |
EdmString | STRING | java.lang.String |
EdmBoolean | BOOL | java.lang.Boolean |
EdmDecimal | DECIMAL | java.math.BigDecimal |
EdmDateTime | TIMESTAMP | java.util.Date |
EdmDateTimeOffset | TIMESTAMP | java.util.Date |
EdmStructuralType | STRUCT | java.util.Map[String, Object] |
JMX metrics
The OData Business Events Source Connector supports all the connector and task metrics provided by Kafka Connect through Java Management Extensions (JMX). In addition, the OData Business Events Source Connector provides extra JMX metrics for accessing state managed by the connector.
MBean: org.init.ohja.kafka.connect:type=busevent-source-task-metrics,connector=([-.w]+),task=([d]+)
Metric | Explanation |
---|---|
retries | Count of retries performed in the connector task that is in retrying state. |
${configGroup}-object-type | Name of the business object type for Business Event configured in the configuration group. |
${configGroup}-object-task-code | Task code of the business object type for Business Event configured in the configuration group. |
${configGroup}-latest-event | GUID and timestamp of latest extracted Business Event configured in the configuration group. |
Usage
Data serialization
The OData Business Events Source Connector supports the various data converters, like the JSON Converter as well as the AVRO Converter.
SMT
The use of Single Message Transforms in a source connector allows for each record to be passed through one or a chain of several simple transformations before writing data to the Kafka topic. The OData v2 source connector supports SMT and has been successfully tested with a concatenation of two SMTs.
Error handling
The Business Events Source Connector applies different kinds of validations and error handling mechanisms like configuration validation, offset recovery, upstream connection tests, HTTP status checks and connection retries.
Single configuration parameter validation extending the Validator class will throw exceptions of type ConfigException in case of invalid configuration values. Additionally, the connector overrides the validate method to validate interdependent configuration parameters and adds error messages to class ConfigValue in case of any invalid parameter values. The corresponding parameters containing invalid values will be framed in red in the Confluent Control Center together with an appropriate error message.
The connector maps known exceptions to exception type ConnectException, which can be handled by Kafka connect accordingly. Errors and warnings are logged using SLF4J, as described in section Logging.
Logging
The connectors make use of SLF4J for logging integration. The logger uses the logger name org.init.ohja.kafka.connect.odata.busevent.source and can be configured e.g., in the log4j configuration properties in the Confluent Platform.
The connector provides additional log location information ohja.location using MDC (mapped diagnostic context). The log location contains the name of the nearest enclosing definition of val, class, trait, object or package and the line number.
Example Log4j 1.x appender:
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m %X{ohja.location}%n
Field projection
Each OData v2 service entity set defines a set of properties that can be read. An entity distinguishes between key and non-key properties. if configuration property object.resolve is set to 1, the configuration of a connector allows to define a subset of non-key properties that will be extracted to Kafka. Despite this configuration the source connector will always extract all the business event object key properties, together with information about the corresponding business event.
CLI tool
The kafka-connect-odatabusevent connector contains a command line interface to validate connector properties, test the connection to a SAP system, retrieve a list of available business event object types and active subscriptions and delete active subscriptions from the SAP source system.
Since the CLI is written in Scala you can execute it in Scala or Java. To run the CLI you need to provide the following dependencies in the CLASSPATH:
- Scala runtime libraries
- kafka-clients
- kafka-connect-odatabusevent connector libraries
Since a Java runtime is provided by the Confluent Platform and the Scala runtime libraries are part of the connector package, executing the CLI with Java would not require installing Scala.
Java
java -cp <kafka-connect-odatabusevent>:<kafka-clients> org.init.ohja.kafka.connect.odata.busevent.source.ODataBusinessEventApp <command> <options>
The Confluent Platform has kafka libraries available in /usr/share/java/kafka/. When odatabusevent source connector package is installed to the plugin path of Connect, the command could look like this:
java -cp \
<CONNECT_PLUGIN_PATH>/init-kafka-connect-odatabusevent-x.x.x/lib/*:\
/usr/share/java/kafka/*: \
/usr/share/java/kafka-serde-tools/* \
org.init.ohja.kafka.connect.odata.busevent.source.ODataBusinessEventApp \
<commands> <options>
Scala
If an appropriate version of Scala is installed the Scala command can be used. This command already provides the necessary Scala runtime libraries.
scala -cp <kafka-connect-odatabusevent>:<kafka-clients> org.init.ohja.kafka.connect.odata.busevent.source.ODataBusinessEventApp <command> <options>
The output will look like this:
usage:
ODataBusinessEventApp <command> <options>
commands:
ping
object-codes
active-subscriptions
delete-subscription -c <subscriber code> -o <object type> -t <object task>
extract-schema -s <relative service path> -e <entity-set>
mandatory options:
-p <path to connector properties file>
Hint: Avro schemas may differ if the Single Message Transform(s) in the connector configuration are used.
Restrictions and pending features
- The connector is currently limited to SAP OData v2 services and the business events released by SAP.
- The connector does not provide support for query options such as projections in expanded navigation properties.
- The connector does currently support Basic Authentication and OAuth client credentials flow over http and https and KeyStores/TrustStores.
Full enterprise support
Our full enterprise support offers you access to top-tier support from our connector developers, tailored to meet your specific needs. This includes:
- 8/5 support
- 60-minute response times depending on support plan
- Full application lifecycle support from development to operations
- Access to expert support engineers
- Consultancy in case of individual SAP integration requirements
Please contact connector-contact@init-software.de for more information.