Kafka Connect - Webservice Data Source Sink Connector

The Webservice Data Source Sink Connector is a Kafka Connector for writing data to SAP BI Webservice DataSources. The connector builds upon i-OhJa, a collection of libraries and components written in Scala for interaction with SAP systems.

Dependencies

The connector depends on the SAP® Java Connector 3.1 SDK library to connect to SAP® systems. To run the WebDS sink connector you need to provide a copy of SAP® JCo library v3.1.8 (jar and native library) in the classpath or in the plugin path configured for Kafka Connect.

Note

JCo needs to be obtained separately from the SAP® Marketplace. For more detailed information about licensing terms and how to obtain a license visit the SAP® FAQ and the SAP® connectors’ homepage.

License

See licenses for INITs evaluation license (DE and EN) or for more information on the dependencies’ licenses.

Installation

Packaging

The webservice sink connector package name is init-kafka-connect-webds-<connector version>.zip

The zip archive includes one folder called init-kafka-connect-webds-<connector version>, which itself contains the nested folders lib, etc, _assets, and doc.

  • lib/ contains the Java archive(s) needed to be extracted into the plugin.path of Kafka Connect.
  • etc/ contains a sample properties file for the connector that can be supplied as an argument in the CLI during startup of Kafka Connect or by upload to the Confluent Control Center.
  • doc/ contains more detailed documentation about the connector like licenses, readme, configurations and so on.
  • assets/ contains media files like icons and company logos.

Manual

Put the connector jar and the SAP JCo jar together with its native libraries in the configured plugin path of Kafka Connect.

Confluent CLI

The Confluent platform provides a command line installation interface that can be used to install a connector zip file from a local file system, see Confluent CLI Command Reference In addition to that SAP JCo needs to be copied manually to the lib directory of the connectors target installation path.

Configuration

The WebDS connector is fully compatible with the configuration user interface in Confluent Control Center. Compared to using properties files, the configuration UI in Confluent Control Center offers a wider range of features, such as suggested properties, extensive property value recommendations, incremental visibility of applicable configurations, and a rich set of interactive validations.

Once the required fields for an WebDS sink are entered in the configuration UI, a new configuration group for an additional sink will appear. To ensure optimal performance, the maximum number of displayed recommendations in the Confluent Control Center UI is limited to 1000.

For more information, please refer to Configuration Details or the configuration file (doc/sink/configuration.html) in the connector package.

Sink system connection

The WebDS Sink Connector includes configuration properties from SAPĀ® JCo, which are identified by the prefix jco. For a detailed description of these properties, please refer to the Java documentation of the com.sap.conn.jco.ext.DestinationDataProvider interface. The JCo JavaDoc can be found within the sapjco3.jar package.

A minimal SAP JCo client destination configuration looks like this:

# SAP Netweaver application host DNS or IP
jco.client.ashost = 127.0.0.1
# SAP system number
jco.client.sysnr = 20
# SAP client number
jco.client.client = 100
# SAP RFC user
jco.client.user = user
# SAP user password
jco.client.passwd = password

Instead of addressing the application host directly by defining jco.client.ashost, some could use jco.client.mshost to use the Message Server instead for rebalancing system load in SAP.

Encryption and data integrity for the communication between the connector and SAP can be enabled by using SAP Secure network connection (SNC) and the corresponding JCo configuration properties.

The supported authentication types are user/password, SNC Single Sign On (SSO) using X509 certificates and SAP cookie v2 logon ticket.

To establish communication between the connector and the SAP® system and within the framework of authorization management, you need an SAP® user. The user must be either of type Communications or Dialog. To save SAP® dialog resources, the Communications type is recommended. The user must have the following minimum permissions:

  • Object S_RFC:
    ACTVT: 16
    RFC_NAME: RFCPING, SMLG_GET_DEFINED_GROUPS, RFC_GET_FUNCTION_INTERFACE, RFC_METADATA_GET, /BIC/CQ*
    RFC_TYPE: FUNC

  • Object S_RFC:
    ACTVT: 16
    RFC_NAME: RFC1, RSDS_BAPI, RSDS_WS
    RFC_TYPE: FUGR

WebDS sink configuration

A minimal connector sink configuration looks like this:

# A SAP BI webservice data source is identified by a primary key combination 
# containing a system name and a data source name.
sap.webds#00.system=MYSELF
sap.webds#00.name=ZJCOTESTVERT1
sap.webds#00.topic=ZDO1VERT

topic defines the Kafka input topic from which the connector will consume data.

Available Webservice data sources in the SAP sink system can be looked up by executing transaction RSA1, opening the Modeling - Source Systems - Web Service perspective and clicking on the available system. This will guide you to a list of data sources available for the selected system.

SAP BI Webservice data sources offer real-time data acquisition, allowing source systems to push data sets of any batch size or single events to the webservice data source. These datasets are grouped into requests by the BI system. A request is closed either when the customizable limits for time or size in the corresponding Infopackage are exceeded, or if an RDA daemon actively closes the request. DTPs for RDA and daemon jobs can then be used to automatically transform and write the data to targets like data store objects.

Batching requests

The connector sends records to SAP BW webservice data sources in batches. The batch size is determined by common consumer configuration settings, and will not be changed by any custom connector configuration settings.
For more information about adjusting the batch size for connectors in general, see batch size for sink connector.

Parallelism

Within Kafka Connect, each task instance is responsible for handling all records received from a set of partitions assigned by the Connect framework. The connector applies to this standard and does not introduce any custom parallelism to alter this behavior. To limit the maximum number of tasks for any sink task, you can use the tasks.max configuration property.

Offset handling

The connector completely applies to the standard way of handling offsets and commits in Kafka Connect sink connectors.

Delivery semantics

Kafka Connect commits offsets for data delivered to the sink connector to prevent duplicate data in the target system. To achieve exactly-once delivery, the target system must keep track of message and offset information, which is not provided by default for web service data sources. There may be scenarios where the connector cannot guarantee exactly-once delivery, but rather at-least-once delivery semantics.

Graceful backoff

In case of connection or communication issues with the configured SAP system the connector applies a retry backoff strategy. The maximum number of retry attempts can be configured using property sap.webds.max.retries. After each unsuccessful connection attempt, the connector will pause for a random amount of time between the values specified in the configuration properties sap.webds.min.retry.backoff.ms and sap.webds.max.retry.backoff.ms before executing a retry.

Any exceptions for connection attempts and communication with SAP are assigned an internal exception group. The list of exception groups for which the backoff retry strategy is applied can be configured using property sap.webds.retry.exception.groups. A complete list of exception groups can be found in the SAP JCo JavaDocs of com.sap.conn.jco.JCoException. BAPI return messages of any message class can be included, too.

JMX metrics

The Webservice Data Source Sink Connector supports all the connector and task metrics provided by Kafka Connect through Java Management Extensions (JMX). In addition, the Webservice Data Source Sink Connector provides extra JMX metrics for accessing state managed by the connector.

MBean: org.init.ohja.kafka.connect:type=webds-sink-task-metrics,connector=([-.w]+),task=([d]+)

Metric Explanation
retries Count of retries performed in the connector task that is in retrying state.
${configGroup}-webds-system Webservice data source identifier “logical system” configured in the configuration group.
${configGroup}-webds-name Webservice data source identifier: “data source name” configured in the configuration group.

i-OhJa

i-OhJa is the SAP® integration framework used by the connectors to establish connections to SAP® instances from 3rd-party tools. It provides implementations of various APIs and interface protocols, and enables remote function calls, data transfer, and data format transformation.

Supported data types

The connector expects a flat structure in the value part of the Kafka messages. Information in the key part of a message will not be pushed into a webservice data source.
Mapping of the fields in the value structure of messages to the fields defined for a webservice data source is done based on the field names. SAP data sources are often defined based on InfoObjects containing slashes in the field names. For most data formats slashes are a forbidden character in field names, so they are replaced by a *_* before applying the field name mapping.
Fields of the webservice data source for which no matching field in the corresponding kafka message is defined will be assigned a default value according to the specific data type. If a Kafka messages contains a field with a data type that has no supported data type mapping for the corresponding field type of the webservice data source, then a conversion connect exception will be thrown.

SAP JCo defines internal data types in com.sap.conn.jco.JCoMetaData, each corresponding to one of the built-in types of SAP ABAP. The webservice sink connector supports flat structured tables containing the following SAP basic data types and mappings to Kafka Connect org.apache.kafka.connect.data data / schema types:

JCo Kafka Connect Schema Type Restrictions
TYPE_UTCSECOND INT8 between 0 and 315538070400
TYPE_UTCSECOND INT16 between 0 and 315538070400
TYPE_UTCSECOND INT32 between 0 and 315538070400
TYPE_UTCSECOND INT64 between 0 and 315538070400
TYPE_UTCMINUTE INT8 between 0 and 525896784
TYPE_UTCMINUTE INT16 between 0 and 525896784
TYPE_UTCMINUTE INT32 between 0 and 525896784
TYPE_UTCMINUTE INT64 between 0 and 525896784
TYPE_UTCLONG INT8 between 0 and 3155380704000000000
TYPE_UTCLONG INT16 between 0 and 3155380704000000000
TYPE_UTCLONG INT32 between 0 and 3155380704000000000
TYPE_UTCLONG INT6 between 0 and 3155380704000000000
TYPE_BYTE INT8 INT8 interpreted as Byte in an array of length 1
TYPE_BYTE BYTES
TYPE_TSECOND INT8 between 0 and 86401
TYPE_TSECOND INT16 between 0 and 86401
TYPE_TSECOND INT32 between 0 and 86401
TYPE_TMINUTE INT8 between 0 and 1441
TYPE_TMINUTE INT16 between 0 and 1441
TYPE_DTMONTH INT8 between 0 and 119988
TYPE_DTMONTH INT16 between 0 and 119988
TYPE_DTMONTH INT32 between 0 and 119988
TYPE_XSTRING BOOLEAN “X”=true, "“=false | | TYPE_XSTRING | STRING | | | TYPE_XSTRING | schema type not in (MAP,STRUCT,ARRAY) | | | TYPE_STRING | BOOLEAN | ”X“=true, ”"=false
TYPE_STRING STRING
TYPE_STRING schema type not in (MAP,STRUCT,ARRAY)
TYPE_DTWEEK INT8 between 0 and 521725
TYPE_DTWEEK INT16 between 0 and 521725
TYPE_DTWEEK INT32 between 0 and 521725
TYPE_FLOAT FLOAT32
TYPE_FLOAT FLOAT64
TYPE_DTDAY INT8 between 0 and 3652061
TYPE_DTDAY INT16 between 0 and 3652061
TYPE_DTDAY INT32 between 0 and 3652061
TYPE_TIME INT32/logical type Time java.lang.Integer or java.util.Date
TYPE_TIME STRING pattern HHmmss
TYPE_INT8 INT8
TYPE_INT8 INT16
TYPE_INT8 INT32
TYPE_INT8 INT64
TYPE_INT2 INT8
TYPE_INT2 INT16
TYPE_INT1 INT8 between 0 and 255
TYPE_INT1 INT16 between 0 and 255
TYPE_DATE INT32/logical type Date java.lang.Integer or java.util.Date
TYPE_DATE STRING pattern yyyyMMdd
TYPE_CHAR BOOLEAN ‘X’=true, ‘ ’=false
TYPE_CHAR(length) STRING string.length <= length
TYPE_CDAY INT8 between 0 and 366
TYPE_CDAY INT16 between 0 and 366
TYPE_BYTE INT8 INT8 interpreted as Byte
TYPE_NUM(length) INT8 INT8 > 0 and INT8.length <= length
TYPE_NUM(length) INT16 INT16 > 0 and INT16.length <= length
TYPE_NUM(length) INT32 INT32 > 0 and INT32.length <= length
TYPE_NUM(length) INT64 INT64 > 0 and INT64.length <= length
TYPE_NUM(length) STRING string.length <= length and string only contains digits
TYPE_INT INT8
TYPE_INT INT16
TYPE_INT INT32
TYPE_BCD FLOAT32
TYPE_BCD FLOAT64
TYPE_BCD BYTES/logical type Decimal b[] or java.math.BigDecimal
TYPE_DECF16 FLOAT32
TYPE_DECF16 FLOAT64
TYPE_DECF16 BYTES/logical type Decimal b[] or java.math.BigDecimal
TYPE_DECF34 FLOAT32
TYPE_DECF34 FLOAT64
TYPE_DECF34 BYTES/logical type Decimal b[] or java.math.BigDecimal

Supported features

Data serialization

The connector comes with support for Confluent JSON Converter as well as the AVRO Converter. Using Avro for data serialization requires the connector to translate field names provided by a webservice data source into valid Avro names by replacing illegal characters with "_".

SMT

The use of Single Message Transforms in a source connector allows for each record to be passed through one or a chain of several simple transformations before writing to the Kafka topic. The connector supports SMT and has been successfully tested with a concatenation of two SMTs.

Error handling

The connector applies different kinds of validations and error handling mechanisms like configuration validation, upstream connection tests and connection retries with graceful back-off.

Single configuration parameter validation extending the Validator class will throw exceptions of type ConfigException in case of invalid configuration values. The corresponding parameters containing invalid values will be framed in red in the Confluent Control Center. The connector maps known exceptions to exception type ConnectException, which can be handled by Kafka Connect accordingly.
Errors and warnings are logged using SLF4J, as described in section Logging.
The sink connector supports error handling using the Kafka Connect Reporter and can be configured to write erroneous records to a dead letter queue.

Logging

The connector makes use of SLF4J for logging integration. The logger uses the logger name org.init.ohja.kafka.connect.webds.sink and can be configured e.g., in the log4j configuration properties in the Confluent Platform.

SAP JCo includes a logger called com.sap.conn.jco which can only be used with log4j. In addition to setting the logging level for the JCo logger one can use configuration property jco.trace_level to fine tune the level of logging.

The connector provides additional log location information ohja.location using MDC (mapped diagnostic context). The log location contains the name of the nearest enclosing definition of val, class, trait, object or package and the line number.

Example Log4j 1.x appender:

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m %X{ohja.location}%n

Data source monitoring in SAP

SAP BI related transaction RSA1 can be used for monitoring and administration purposes. In the Modeling - Data Sources perspective you fill be able to see the open and closed data load requests and you will have the possibility to inspect the data persisted in the so called Persisting Staging Area.
Requests
PSA

CLI tool

The kafka-connect-webds connector contains a command line interface to validate connector properties, test the connection to a SAP system, retrieve a list of available webservice data source and query details of a webservice data source.

Since the CLI is written in Scala you can execute it in Scala or Java. To run the CLI you need to provide the following dependencies in the CLASSPATH:

  1. Scala runtime libraries
  2. kafka-clients
  3. kafka-connect-webds connector libraries
  4. SAP Java Connector 3.1 SDK

Since a Java runtime is provided by the Confluent Platform and the Scala runtime libraries are part of the connector package, executing the CLI with Java would not require installing Scala.

Java

java -cp <kafka-connect-webds>:<kafka-clients>:<sapjco3> org.init.ohja.kafka.connect.webds.sink.WebDSDetailsApp <command> <options>

The Confluent Platform has kafka libraries available in /usr/share/java/kafka/. When webds sink connector package and sapjco3 are installed to the plugin path of Connect, the command could look like this:

java -cp \
<CONNECT_PLUGIN_PATH>/init-kafka-connect-webds-x.x.x/lib/*:\
/usr/share/java/kafka/*: \
/usr/share/java/kafka-serde-tools/* \
org.init.ohja.kafka.connect.webds.sink.WebDSDetailsApp \
<commands> <options>

Scala

If an appropriate version of Scala is installed the Scala command can be used. This command already provides the necessary Scala runtime libraries.

scala -cp <kafka-connect-webds>:<kafka-clients>:<sapjco3> org.init.ohja.kafka.connect.webds.sink.WebDSDetailsApp <command> <options>

The output will look like this:

usage:
   WebDSDetailsApp <command> <options>

commands:
   ping
   list-webds
   extract-schema -s <source system> -w <data source name>  
   webds-details -s <source system> -w <data source name>

mandatory options:
   -p <path to connector properties file>

Hint: Avro schemas may differ if the Single Message Transform(s) in the connector configuration are used.

Restrictions and pending features

  • SAP BI webservice data sources only support flat, two-dimensional structures.
  • The amount of Webservice Data Sources in the evaluation version of the connector is restricted.
  • The sink connector does not support processing record values without a value schema.

Full enterprise support

Full enterprise support provides expert support from the developers of the connector at a service level agreement suitable for your needs, which may include

  • 8/5 support
  • 60-minute response times depending on support plan
  • Full application lifecycle support from development to operations
  • Access to expert support engineers
  • Consultancy in case of individual SAP integration requirements

Please contact connector-contact@init-software.de for more information.