Kafka Connect - BW aDSO Sink Connector

The BW aDSO Sink Connector is a Kafka Connector designed to enable the writing of data to SAPĀ® BW aDSOs.
The connector builds upon i-OhJa, a collection of libraries and components written in Scala for interaction with SAP systems.

Dependencies

The connector depends on the SAP® Java Connector 3.1 SDK library to connect to SAP® systems. To run the aDSO sink connector you need to provide a copy of SAP® JCo library v3.1.8 (jar and native library) in the classpath or in the plugin path configured for Kafka Connect.

Note

JCo needs to be obtained separately from the SAP® Marketplace. For more detailed information about licensing terms and how to obtain a license visit the SAP® FAQ and the SAP® connectors’ homepage.

License

See licenses for INITs evaluation license (DE and EN) or for more information on the dependencies’ licenses.

SAP system requirements

The connector can be used for any Advanced DataStore object with the property “Write Change Log”.

Installation of RFC Write API Wrapper

The aDSO Sink Connector utilizes the RFC APIs RSDSO_WRITE_API_RFC to load data and RSDSO_ACTIVATE_REQ_API_RFC to activate loaded requests.
The connector zip package contains a custom SAP enhancement as an SAP transport file. The transport contains a small wrapper for the aDSO RFC Write API that introduces specific performance enhancements. It is essential to install this transport across all relevant SAP systems during the connector installation process to ensure that the aDSO Sink Connector functions properly.

Installation

Packaging

The aDSO sink connector package name is: init-kafka-connect-adso-<connector version>.zip

The zip archive includes one folder called init-kafka-connect-adso-<connector version>, which itself contains the nested folders lib, etc, assets, doc and sap.

  • lib/ contains the Java archive(s) needed to be extracted into the plugin.path of Kafka Connect.
  • etc/ contains a sample properties file for the connector that can be supplied as an argument in the CLI during startup of Kafka Connect or by upload to the Confluent Control Center.
  • doc/ contains more detailed documentation about the connector like licenses, readme, configurations and so on.
  • assets/ contains media files like icons and company logos.
  • sap/ contains SAP transports that must be installed in the SAP system to provide performance improvements for the aDSO RFC Write API.

Manual

Put the connector jar and the SAP® JCo jar together with its native libraries in the configured plugin path of Kafka Connect.

Confluent CLI

The Confluent platform provides a command line installation interface that can be used to install a connector zip file from a local file system, see Confluent CLI Command Reference. In addition to that SAP® JCo needs to be copied manually to the lib directory of the connectors target installation path.

Connection configuration

The aDSO connector is fully compatible with the configuration user interface in Confluent Control Center. Compared to using properties files, the configuration UI in Confluent Control Center offers a wider range of features, such as suggested properties, extensive property value recommendations, incremental visibility of applicable configurations, and a rich set of interactive validations.

Once the required fields for an aDSO sink are entered in the configuration UI, a new configuration group for an additional sink will appear. To ensure optimal performance, the maximum number of displayed recommendations in the Confluent Control Center UI is limited to 1000.

For more information, please refer to Configuration Details or the configuration file (doc/sink/configuration.html) in the connector package.

Sink system connection

The aDSO Sink Connector includes configuration properties from SAPĀ® JCo, which are identified by the prefix jco. For a detailed description of these properties, please refer to the Java documentation of the com.sap.conn.jco.ext.DestinationDataProvider interface. The JCo JavaDoc can be found within the sapjco3.jar package.

A minimal SAP® JCo client destination configuration looks like this:

# SAP Netweaver application host DNS or IP
jco.client.ashost = 127.0.0.1
# SAP system number
jco.client.sysnr = 20
# SAP client number
jco.client.client = 100
# SAP RFC user
jco.client.user = user
# SAP user password
jco.client.passwd = password

Instead of addressing the application host directly by defining jco.client.ashost, some could use jco.client.mshost to use the Message Server instead for rebalancing system load in SAP®.

Encryption and data integrity for the communication between the connector and SAP® can be enabled by using SAP® Secure Network Communications (SNC) and the corresponding JCo configuration properties.

The supported authentication types are user/password, SNC Single Sign On (SSO) using X509 certificates and SAP® cookie v2 logon ticket.

To establish communication between the connector and the SAP® system and within the framework of authorization management, you need an SAP® user. The user must be either of type Communications or Dialog. To save SAP® dialog resources, the Communications type is recommended. The user must have the following minimum permissions:

  • Object S_RFC:
    ACTVT: 16
    RFC_NAME: RFCPING, SMLG_GET_DEFINED_GROUPS, RFC_METADATA_GET
    RFC_TYPE: FUNC

  • Object S_RFC:
    ACTVT: 16
    RFC_NAME: RFC1, /INI/SERADSO
    RFC_TYPE: FUGR

  • Object S_RS_ADSO:
    ACTVT: 03, 23
    RSINFOAREA: <name of the info area for the aDSO>
    RSOADSONM: <name of the aDSO>
    RSOADSOPAR: DEFINITION

aDSO Sink configuration

A minimal connector data sink configuration looks like this:

# technical name of the aDSO
sap.adso#00.name=ZADSOTEST
# name of the topic where data will be fetched from
sap.adso#00.topic=ZADSOTEST

topic defines the Kafka input topic from which the connector will consume data.

SAP® aDSOs have built in support for real-time data acquisition, allowing source systems to push data sets of any batch size, or even single events to the aDSO. The BI system bundles these datasets in so-called requests. A request gets activated if the customizable limits for time or size in the corresponding connector instance exceeds.

Batching Requests

The connector sends records in batches to SAP® standard advance DataStore Objects enabled for writing change logs. The batch size depends on common consumer configuration settings and will not be changed by any custom connector configuration settings.
For more information about general tweaking of the batch size for connectors, see batch size for sink connector.

Parallelism

In Kafka Connect each task instance is assigned a set of partitions by the Connect framework and will handle all records received from those partitions. The connector applies to this standard and doesn’t implement any custom parallelism to change this.
To limit the maximum number of tasks for any sink task, you can use the tasks.max configuration property.

Offset handling

The connector completely applies to the standard method for handling offsets and commits in Kafka Connect sink connectors.

Delivery semantics

Kafka Connect commits offsets for the data delivered to the sink connector as input to prevent duplicate delivery to the target system. To achieve exactly-once delivery, the target system would need to track messages and offsets. This functionality is not provided by default for aDSOs, so there are scenarios where the connector is not able to ensure exactly-once, but at-least-once delivery semantics.

Graceful backoff

In case of connection or communication issues with the configured SAP® system the connector applies a retry backoff strategy. The maximum number of retry attempts can be set via the sap.adso.max.retries property. After each failed connection attempt, the connector tells Kafka Connect to wait for a random number of milliseconds in the range between the values of configuration properties sap.adso.min.retry.backoff.ms and sap.adso.max.retry.backoff.ms before retrying.

Any exceptions related to connection attempts and communication with SAP® are assigned an internal exception group. The list of exception groups for which the backoff retry strategy is applied can be configured using sap.adso.retry.exception.groups property. A complete list of exception groups can be found in the SAP® JCo JavaDocs of com.sap.conn.jco.JCoException.

JMX metrics

The BW aDSO Sink Connector supports all the connector and task metrics provided by Kafka Connect through Java Management Extensions (JMX). In addition, BW aDSO Sink Connector provides extra JMX metrics for accessing state managed by the connector.

MBean: org.init.ohja.kafka.connect:type=adso-sink-task-metrics,connector=([-.w]+),task=([d]+)

Metric Explanation
retries Count of retries performed in the connector task that is in retrying state.
${configGroup}-name Advanced Data Store Object name configured in the configuration group.
${configGroup}-request-begin Start timestamp of the request for the Advanced Data Store Object configured in the configuration group.
${configGroup}-row-count Row count of inserted rows in a request for the Advanced Data Store Object configured in the configuration group.

i-OhJa

i-OhJa is the SAP® integration framework used by the connectors to establish connections to SAP® instances from 3rd-party tools. It provides implementations of various APIs and interface protocols, and enables remote function calls, data transfer, and data format transformation.

Supported data types

The connector expects a flat structure in the value part of the Kafka messages. Information in the key part of a message will not be pushed into an aDSO.
Mapping of the fields in the value structure of messages to the fields defined for an aDSO is done based on the field names. SAP® data sources are often defined based on InfoObjects containing slashes in the field names. For most data formats slashes are a forbidden character in field names, so they are replaced by a *_* before applying the field name mapping.
Fields of the aDSO for which no matching field in the corresponding kafka message is defined will be assigned a default value according to the specific data type. If a Kafka messages contains a field with a data type that has no supported data type mapping for the corresponding field type of the aDSO, then a conversion connect exception will be thrown.

SAP® JCo defines internal data types in com.sap.conn.jco.JCoMetaData, each corresponding to one of the built-in types of SAP ABAP®. The aDSO sink connector supports flat structured tables containing the following SAP® basic data types and mappings to Kafka Connect org.apache.kafka.connect.data data / schema types:

JCo Kafka Connect Schema Type Restrictions
TYPE_UTCSECOND INT8 between 0 and 315538070400
TYPE_UTCSECOND INT16 between 0 and 315538070400
TYPE_UTCSECOND INT32 between 0 and 315538070400
TYPE_UTCSECOND INT64 between 0 and 315538070400
TYPE_UTCMINUTE INT8 between 0 and 525896784
TYPE_UTCMINUTE INT16 between 0 and 525896784
TYPE_UTCMINUTE INT32 between 0 and 525896784
TYPE_UTCMINUTE INT64 between 0 and 525896784
TYPE_UTCLONG INT8 between 0 and 3155380704000000000
TYPE_UTCLONG INT16 between 0 and 3155380704000000000
TYPE_UTCLONG INT32 between 0 and 3155380704000000000
TYPE_UTCLONG INT6 between 0 and 3155380704000000000
TYPE_BYTE INT8 INT8 interpreted as Byte in an array of length 1
TYPE_BYTE BYTES
TYPE_BYTE BYTES/logical type Decimal java.math.BigDecimal
TYPE_TSECOND INT8 between 0 and 86401
TYPE_TSECOND INT16 between 0 and 86401
TYPE_TSECOND INT32 between 0 and 86401
TYPE_TMINUTE INT8 between 0 and 1441
TYPE_TMINUTE INT16 between 0 and 1441
TYPE_DTMONTH INT8 between 0 and 119988
TYPE_DTMONTH INT16 between 0 and 119988
TYPE_DTMONTH INT32 between 0 and 119988
TYPE_XSTRING BOOLEAN “X”=true, "“=false | | TYPE_XSTRING | STRING | | | TYPE_XSTRING | schema type not in (MAP,STRUCT,ARRAY) | | | TYPE_STRING | BOOLEAN | ”X“=true, ”"=false
TYPE_STRING STRING
TYPE_STRING schema type not in (MAP,STRUCT,ARRAY)
TYPE_DTWEEK INT8 between 0 and 521725
TYPE_DTWEEK INT16 between 0 and 521725
TYPE_DTWEEK INT32 between 0 and 521725
TYPE_FLOAT FLOAT32
TYPE_FLOAT FLOAT64
TYPE_DTDAY INT8 between 0 and 3652061
TYPE_DTDAY INT16 between 0 and 3652061
TYPE_DTDAY INT32 between 0 and 3652061
TYPE_TIME INT32/logical type Time java.lang.Integer or java.util.Date
TYPE_TIME STRING pattern HHmmss
TYPE_INT8 INT8
TYPE_INT8 INT16
TYPE_INT8 INT32
TYPE_INT8 INT64
TYPE_INT8 INT64/logical type Timestamp java.util.Date (milliseconds since Unix epoch)
TYPE_INT2 INT8
TYPE_INT2 INT16
TYPE_INT1 INT8 between 0 and 255
TYPE_INT1 INT16 between 0 and 255
TYPE_DATE INT32/logical type Date java.lang.Integer or java.util.Date
TYPE_DATE STRING pattern yyyyMMdd
TYPE_CHAR BOOLEAN ‘X’=true, ‘ ’=false
TYPE_CHAR(length) STRING string.length <= length
TYPE_CDAY INT8 between 0 and 366
TYPE_CDAY INT16 between 0 and 366
TYPE_BYTE INT8 INT8 interpreted as Byte
TYPE_NUM(length) INT8 INT8 > 0 and INT8.length <= length
TYPE_NUM(length) INT16 INT16 > 0 and INT16.length <= length
TYPE_NUM(length) INT32 INT32 > 0 and INT32.length <= length
TYPE_NUM(length) INT64 INT64 > 0 and INT64.length <= length
TYPE_NUM(length) STRING string.length <= length and string only contains digits
TYPE_NUM(length) INT64/logical type Timestamp java.util.Date (milliseconds since Unix epoch) and INT64.length <= length
TYPE_INT INT8
TYPE_INT INT16
TYPE_INT INT32
TYPE_BCD FLOAT32
TYPE_BCD FLOAT64
TYPE_BCD BYTES/logical type Decimal b[] or java.math.BigDecimal
TYPE_DECF16 FLOAT32
TYPE_DECF16 FLOAT64
TYPE_DECF16 BYTES/logical type Decimal b[] or java.math.BigDecimal
TYPE_DECF34 FLOAT32
TYPE_DECF34 FLOAT64
TYPE_DECF34 BYTES/logical type Decimal b[] or java.math.BigDecimal

Supported features

Data serialization

The connector comes with support for Confluent JSON Converter as well as the AVRO Converter. Using Avro for data serialization requires the connector to translate field names provided by an aDSO into valid Avro names by replacing illegal characters with "_".

SMT

The use of Single Message Transforms in a sink connector allows for each record to be passed through one or a chain of several simple transformations before writing to the Kafka topic. The connector supports SMT and has been successfully tested with a concatenation of two SMTs.

Error handling

The connector applies different kinds of validations and error handling mechanisms like configuration validation, upstream connection tests and connection retries with graceful back-off.

Single configuration parameter validation extending the Validator class will throw exceptions of type ConfigException in case of invalid configuration values. The corresponding parameters containing invalid values will be framed in red in the Confluent Control Center. The connector maps known exceptions to exception type ConnectException, which can be handled by Kafka Connect accordingly.
Errors and warnings are logged using SLF4J, as described in section Logging.
The sink connector supports error handling using the Kafka Connect Reporter and can be configured to write erroneous records to a dead letter queue.

Logging

The connector makes use of SLF4J for logging integration. The logger uses the logger name org.init.ohja.kafka.connect.adso.sink and can be configured e.g., in the log4j configuration properties in the Confluent Platform.

SAP® JCo includes a logger called com.sap.conn.jco which can only be used with log4j. In addition to setting the logging level for the JCo logger one can use configuration property jco.trace_level to fine tune the level of logging.

The connector provides additional log location information ohja.location using MDC (mapped diagnostic context). The log location contains the name of the nearest enclosing definition of val, class, trait, object or package and the line number.

Example Log4j 1.x appender:

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m %X{ohja.location}%n

Data sink monitoring in SAP

SAP® BI related transaction RSA1 can be used for monitoring and administration purposes.

CLI tool

The kafka-connect-adso connector contains a command line interface to validate connector properties, test the connection to a SAP® system, retrieve a list of available aDSOs and query details of an aDSO.

Since the CLI is written in Scala you can execute it in Scala or Java. To run the CLI you need to provide the following dependencies in the CLASSPATH:

  1. Scala runtime libraries
  2. kafka-clients
  3. kafka-connect-adso connector libraries
  4. SAP® Java Connector 3.1 SDK

Since a Java runtime is provided by the Confluent Platform and the Scala runtime libraries are part of the connector package, executing the CLI with Java would not require installing Scala.

Java

java -cp <kafka-connect-adso>:<kafka-clients>:<sapjco3> org.init.ohja.kafka.connect.adso.sink.ADSODetailsApp <command> <options>

The Confluent Platform has kafka libraries available in /usr/share/java/kafka/. When aDSO sink connector package and sapjco3 are installed to the plugin path of Connect, the command could look like this:

java -cp \
<CONNECT_PLUGIN_PATH>/init-kafka-connect-adso-x.x.x/lib/*:\
/usr/share/java/kafka/*: \
/usr/share/java/kafka-serde-tools/* \
org.init.ohja.kafka.connect.adso.sink.ADSODetailsApp \
<commands> <options>

Scala

If an appropriate version of Scala is installed the Scala command can be used. This command already provides the necessary Scala runtime libraries.

scala -cp <kafka-connect-adso>:<kafka-clients>:<sapjco3> org.init.ohja.kafka.connect.adso.sink.ADSODetailsApp <command> <options>

Output

The output will look like this:

usage:
   ADSODetailsApp <command> <options>
   
commands:
   ping
   list-adsos
   adso-details -s <sink system> -n <adso name>
   extract-schema -n <adso name>
   
mandatory options:
   -p <path to connector properties file>

Hint: Avro schemas may differ if the Single Message Transform(s) in the connector configuration are used.

Restrictions and pending features

  • SAP® aDSOs only support flat, two-dimensional structures.
  • SAP® aDSOs of type “direct update” are not supported yet.
  • The amount of aDSOs in the evaluation version of the connector is restricted.
  • “Exactly-once” is not supported yet. The semantics provided is at-least-once.
  • The sink connector does not support processing record values without a value schema.

Full enterprise support

Full enterprise support provides expert support from the developers of the connector at a service level agreement suitable for your needs, which may include

  • 8/5 support
  • 60-minute response times depending on support plan
  • Full application lifecycle support from development to operations
  • Access to expert support engineers
  • Consultancy in case of individual SAP integration requirements

Please contact connector-contact@init-software.de for more information.