Kafka Connect - BW aDSO Sink Connector
The BW aDSO Sink Connector is a Kafka Connector designed to enable the writing of data to SAPĀ® BW aDSOs.
The connector builds upon i-OhJa, a collection of libraries and components written in Scala for interaction with SAP systems.
Dependencies
The connector depends on the SAP® Java Connector 3.1 SDK library to connect to SAP® systems. To run the aDSO sink connector you need to provide a copy of SAP® JCo library v3.1.7 (jar and native library) in the classpath or in the plugin path configured for Kafka Connect.
JCo needs to be obtained separately from the SAP® Marketplace. For more detailed information about licensing terms and how to obtain a license visit the SAP® FAQ and the SAP® connectors’ homepage.
SAP system requirements
The connector can be used for any Advanced DataStore object with the property “Write Change Log”.
Restrictions and pending features
- SAP® aDSOs only support flat, two-dimensional structures.
- SAP® aDSOs of type “direct update” are not supported yet.
- The amount of aDSOs in the evaluation version of the connector is restricted.
- “Exactly-once” is not supported yet. The semantics provided is at-least-once.
- The sink connector does not support processing record values without a value schema.
Configuration
The aDSO connector is fully compatible with the configuration user interface in Confluent Control Center. Compared to using properties files, the configuration UI in Confluent Control Center offers a wider range of features, such as suggested properties, extensive property value recommendations, incremental visibility of applicable configurations, and a rich set of interactive validations.
Once the required fields for an aDSO sink are entered in the configuration UI, a new configuration group for an additional sink will appear. To ensure optimal performance, the maximum number of displayed recommendations in the Confluent Control Center UI is limited to 1000.
For more information, please refer to Configuration Details or the configuration file (doc/sink/configuration.html) in the connector package.
Sink system connection
The aDSO Sink Connector includes configuration properties from SAPĀ® JCo, which are identified by the prefix jco. For a detailed description of these properties, please refer to the Java documentation of the com.sap.conn.jco.ext.DestinationDataProvider interface. The JCo JavaDoc can be found within the sapjco3.jar package.
A minimal SAP® JCo client destination configuration looks like this:
# SAP Netweaver application host DNS or IP
jco.client.ashost = 127.0.0.1
# SAP system number
jco.client.sysnr = 20
# SAP client number
jco.client.client = 100
# SAP RFC user
jco.client.user = user
# SAP user password
jco.client.passwd = password
Instead of addressing the application host directly by defining jco.client.ashost, some could use jco.client.mshost to use the Message Server instead for rebalancing system load in SAP®.
Encryption and data integrity for the communication between the connector and SAP® can be enabled by using SAP® Secure Network Communications (SNC) and the corresponding JCo configuration properties.
The supported authentication types are user/password, SNC Single Sign On (SSO) using X509 certificates and SAP® cookie v2 logon ticket.
To establish communication between the connector and the SAP® system and within the framework of authorization management, you need an SAP® user. The user must be either of type Communications or Dialog. To save SAP® dialog resources, the Communications type is recommended. The user must have the following minimum permissions:
-
Object S_RFC:
ACTVT: 16
RFC_NAME: RFCPING, SMLG_GET_DEFINED_GROUPS, RFC_METADATA_GET
RFC_TYPE: FUNC -
Object S_RFC:
ACTVT: 16
RFC_NAME: RFC1, /INI/SERADSO
RFC_TYPE: FUGR -
Object S_RS_ADSO:
ACTVT: 03, 23
RSINFOAREA: <name of the info area for the aDSO>
RSOADSONM: <name of the aDSO>
RSOADSOPAR: DEFINITION
aDSO Sink configuration
A minimal connector data sink configuration looks like this:
# technical name of the aDSO
sap.adso#00.name=ZADSOTEST
# name of the topic where data will be fetched from
sap.adso#00.topic=ZADSOTEST
topic defines the Kafka input topic from which the connector will consume data.
SAP® aDSOs have built in support for real-time data acquisition, allowing source systems to push data sets of any batch size, or even single events to the aDSO. The BI system bundles these datasets in so-called requests. A request gets activated if the customizable limits for time or size in the corresponding connector instance exceeds.
Batching Requests
The connector sends records in batches to SAP® standard advance DataStore Objects enabled for writing change logs. The batch size depends on common consumer configuration settings and will not be changed by any custom connector configuration settings.
For more information about general tweaking of the batch size for connectors, see batch size for sink connector.
Information
Parallelism
In Kafka connect each task instance is assigned a set of partitions by the Connect framework and will handle all records received from those partitions. The connector applies to this standard and doesn’t implement any custom parallelism to change this.
To limit the maximum number of tasks for any sink task, you can use the tasks.max configuration property.
Offset handling
The connector completely applies to the standard method for handling offsets and commits in Kafka connect sink connectors.
Delivery semantics
Kafka Connect commits offsets for the data delivered to the sink connector as input to prevent duplicate delivery to the target system. To achieve exactly-once delivery, the target system would need to track messages and offsets. This functionality is not provided by default for aDSOs, so there are scenarios where the connector is not able to ensure exactly-once, but at-least-once delivery semantics.
Graceful backoff
In case of connection or communication issues with the configured SAP® system the connector applies a retry backoff strategy. The maximum number of retry attempts can be set via the sap.adso.max.retries property. After each failed connection attempt, the connector tells Kafka connect to wait for a random number of milliseconds in the range between the values of configuration properties sap.adso.min.retry.backoff.ms and sap.adso.max.retry.backoff.ms before retrying.
Any exceptions related to connection attempts and communication with SAP® are assigned an internal exception group. The list of exception groups for which the backoff retry strategy is applied can be configured using sap.adso.retry.exception.groups property. A complete list of exception groups can be found in the SAP® JCo JavaDocs of com.sap.conn.jco.JCoException.
i-OhJa
i-OhJa is the SAP® integration framework used by the connectors to establish connections to SAP® instances from 3rd-party tools. It provides implementations of various APIs and interface protocols, and enables remote function calls, data transfer, and data format transformation.
- Download the i-ohja One Pager
- SAP® Blog: Big Data integration of SAP Netweaver using i-OhJa
- SAP® Blog: Connect to SAP Netweaver in a Jupyter notebook using i-OhJa
Installation
Manual
Put the connector jar and the SAP® JCo jar together with its native libraries in the configured plugin path of Kafka Connect.
Confluent Hub CLI
The Confluent platform provides a command line installation interface that can be used to install a connector zip file from a local file system. In addition to that SAP® JCo needs to be copied manually to the lib directory of the connectors target installation path.
License
See Lizenzvereinbarung for INITs evaluation license and Dependencies for more information on the dependencies’ licenses. (doc/licenses.html file in connector package)
Supported data types
The connector expects a flat structure in the value part of the Kafka messages. Information in the key part of a message will not be pushed into an aDSO.
Mapping of the fields in the value structure of messages to the fields defined for an aDSO is done based on the field names. SAP® data sources are often defined based on info objects containing slashes in the field names. For most data formats slashes are a forbidden character in field names, so they are replaced by a *_* before applying the field name mapping.
Fields of the aDSO for which no matching field in the corresponding kafka message is defined will be assigned a default value according to the specific data type. If a Kafka messages contains a field with a data type that has no supported data type mapping for the corresponding field type of the aDSO, then a conversion connect exception will be thrown.
SAP® JCo defines internal data types in com.sap.conn.jco.JCoMetaData, each corresponding to one of the built-in types of SAP ABAP®. The aDSO sink connector supports flat structured tables containing the following SAP® basic data types and mappings to Kafka connect org.apache.kafka.connect.data data / schema types:
JCo | Kafka Connect Schema Type | Restrictions |
---|---|---|
TYPE_UTCSECOND | INT8 | between 0 and 315538070400 |
TYPE_UTCSECOND | INT16 | between 0 and 315538070400 |
TYPE_UTCSECOND | INT32 | between 0 and 315538070400 |
TYPE_UTCSECOND | INT64 | between 0 and 315538070400 |
TYPE_UTCMINUTE | INT8 | between 0 and 525896784 |
TYPE_UTCMINUTE | INT16 | between 0 and 525896784 |
TYPE_UTCMINUTE | INT32 | between 0 and 525896784 |
TYPE_UTCMINUTE | INT64 | between 0 and 525896784 |
TYPE_UTCLONG | INT8 | between 0 and 3155380704000000000 |
TYPE_UTCLONG | INT16 | between 0 and 3155380704000000000 |
TYPE_UTCLONG | INT32 | between 0 and 3155380704000000000 |
TYPE_UTCLONG | INT6 | between 0 and 3155380704000000000 |
TYPE_BYTE | INT8 | INT8 interpreted as Byte in an array of length 1 |
TYPE_BYTE | BYTES | |
TYPE_BYTE | BYTES/logical type Decimal | java.math.BigDecimal |
TYPE_TSECOND | INT8 | between 0 and 86401 |
TYPE_TSECOND | INT16 | between 0 and 86401 |
TYPE_TSECOND | INT32 | between 0 and 86401 |
TYPE_TMINUTE | INT8 | between 0 and 1441 |
TYPE_TMINUTE | INT16 | between 0 and 1441 |
TYPE_DTMONTH | INT8 | between 0 and 119988 |
TYPE_DTMONTH | INT16 | between 0 and 119988 |
TYPE_DTMONTH | INT32 | between 0 and 119988 |
TYPE_XSTRING | BOOLEAN | “X”=true, "“=false | | TYPE_XSTRING | STRING | | | TYPE_XSTRING | schema type not in (MAP,STRUCT,ARRAY) | | | TYPE_STRING | BOOLEAN | ”X“=true, ”"=false |
TYPE_STRING | STRING | |
TYPE_STRING | schema type not in (MAP,STRUCT,ARRAY) | |
TYPE_DTWEEK | INT8 | between 0 and 521725 |
TYPE_DTWEEK | INT16 | between 0 and 521725 |
TYPE_DTWEEK | INT32 | between 0 and 521725 |
TYPE_FLOAT | FLOAT32 | |
TYPE_FLOAT | FLOAT64 | |
TYPE_DTDAY | INT8 | between 0 and 3652061 |
TYPE_DTDAY | INT16 | between 0 and 3652061 |
TYPE_DTDAY | INT32 | between 0 and 3652061 |
TYPE_TIME | INT32/logical type Time | java.lang.Integer or java.util.Date |
TYPE_TIME | STRING | pattern HHmmss |
TYPE_INT8 | INT8 | |
TYPE_INT8 | INT16 | |
TYPE_INT8 | INT32 | |
TYPE_INT8 | INT64 | |
TYPE_INT8 | INT64/logical type Timestamp | java.util.Date (milliseconds since Unix epoch) |
TYPE_INT2 | INT8 | |
TYPE_INT2 | INT16 | |
TYPE_INT1 | INT8 | between 0 and 255 |
TYPE_INT1 | INT16 | between 0 and 255 |
TYPE_DATE | INT32/logical type Date | java.lang.Integer or java.util.Date |
TYPE_DATE | STRING | pattern yyyyMMdd |
TYPE_CHAR | BOOLEAN | ‘X’=true, ‘ ’=false |
TYPE_CHAR(length) | STRING | string.length <= length |
TYPE_CDAY | INT8 | between 0 and 366 |
TYPE_CDAY | INT16 | between 0 and 366 |
TYPE_BYTE | INT8 | INT8 interpreted as Byte |
TYPE_NUM(length) | INT8 | INT8 > 0 and INT8.length <= length |
TYPE_NUM(length) | INT16 | INT16 > 0 and INT16.length <= length |
TYPE_NUM(length) | INT32 | INT32 > 0 and INT32.length <= length |
TYPE_NUM(length) | INT64 | INT64 > 0 and INT64.length <= length |
TYPE_NUM(length) | STRING | string.length <= length and string only contains digits |
TYPE_NUM(length) | INT64/logical type Timestamp | java.util.Date (milliseconds since Unix epoch) and INT64.length <= length |
TYPE_INT | INT8 | |
TYPE_INT | INT16 | |
TYPE_INT | INT32 | |
TYPE_BCD | FLOAT32 | |
TYPE_BCD | FLOAT64 | |
TYPE_BCD | BYTES/logical type Decimal | b[] or java.math.BigDecimal |
TYPE_DECF16 | FLOAT32 | |
TYPE_DECF16 | FLOAT64 | |
TYPE_DECF16 | BYTES/logical type Decimal | b[] or java.math.BigDecimal |
TYPE_DECF34 | FLOAT32 | |
TYPE_DECF34 | FLOAT64 | |
TYPE_DECF34 | BYTES/logical type Decimal | b[] or java.math.BigDecimal |
Usage
Packaging
The aDSO sink connector package name is: init-kafka-connect-adso-<connector version>.zip
The zip archive includes one folder called init-kafka-connect-adso-<connector version>, which itself contains the nested folders lib, etc, assets, doc and sap.
- lib/ contains the java archive(s) needed to be extracted into the plugin.path of Kafka Connect.
- etc/ contains a sample properties file for the connector that can be supplied as an argument in the CLI during startup of Kafka Connect or by upload to the Confluent Control Center.
- doc/ contains more detailed documentation about the connector like licenses, readme, configurations and so on.
- assets/ contains media files like icons and company logos.
- sap/ contains SAP transports
Data Serialization
The connector comes with support for Confluent JSON Converter as well as the AVRO Converter. Using Avro for data serialization requires the connector to translate field names provided by an aDSO into valid Avro names by replacing illegal characters with "_".
SMT
The use of Single Message Transforms in a sink connector allows for each record to be passed through one or a chain of several simple transformations before writing to the Kafka topic. The connector supports SMT and has been successfully tested with a concatenation of two SMTs.
Error Handling
The connector applies different kinds of validations and error handling mechanisms like configuration validation, upstream connection tests and connection retries with graceful back-off.
Single configuration parameter validation extending the Validator class will throw exceptions of type ConfigException in case of invalid configuration values. The corresponding parameters containing invalid values will be framed in red in the Confluent Control Center. The connector maps known exceptions to exception type ConnectException, which can be handled by Kafka connect accordingly.
Errors and warnings are logged using SLF4J, as described in section Logging.
The sink connector supports error handling using the Kafka Connect Reporter and can be configured to write erroneous records to a dead letter queue.
Logging
The connector makes use of SLF4J for logging integration. The logger uses the logger name org.init.ohja.kafka.connect.adso.sink and can be configured e.g., in the log4j configuration properties in the Confluent Platform.
SAP® JCo includes a logger called com.sap.conn.jco which can only be used with log4j. In addition to setting the logging level for the JCo logger one can use configuration property jco.trace_level to fine tune the level of logging.
The connector provides additional log location information ohja.location using MDC (mapped diagnostic context). The log location contains the name of the nearest enclosing definition of val, class, trait, object or package and the line number.
Example Log4j 1.x appender:
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m %X{ohja.location}%n
Data sink monitoring in SAP
SAP® BI related transaction RSA1 can be used for monitoring and administration purposes.
CLI
The kafka-connect-adso connector contains a command line interface to validate connector properties, test the connection to a SAP® system, retrieve a list of available aDSOs and query details of an aDSO.
Since the CLI is written in Scala you can execute it in Scala or Java. To run the CLI you need to provide the following dependencies in the CLASSPATH:
- Scala runtime libraries
- kafka-clients
- kafka-connect-adso connector libraries
- SAP® Java Connector 3.1 SDK
Since a java runtime is provided by the Confluent Platform and the Scala runtime libraries are part of the connector package, executing the CLI with Java would not require installing Scala.
Java
java -cp <kafka-connect-adso>:<kafka-clients>:<sapjco3> org.init.ohja.kafka.connect.adso.sink.ADSODetailsApp <command> <options>
The Confluent Platform has kafka libraries available in /usr/share/java/kafka/. When aDSO sink connector package and sapjco3 are installed to the plugin path of Connect, the command could look like this:
java -cp \
<CONNECT_PLUGIN_PATH>/init-kafka-connect-adso-x.x.x/lib/*:\
/usr/share/java/kafka/* \
org.init.ohja.kafka.connect.adso.sink.ADSODetailsApp \
<commands> <options>
Scala
If an appropriate version of Scala is installed the scala command can be used. This command already provides the necessary Scala runtime libraries.
scala -cp <kafka-connect-adso>:<kafka-clients>:<sapjco3> org.init.ohja.kafka.connect.adso.sink.ADSODetailsApp <command> <options>
Output
The output will look like this:
usage:
ADSODetailsApp <command> <options>
commands:
ping
list-adsos
adso-details -s <sink system> -n <adso name>
mandatory options:
-p <path to connector properties file>
Support
Full enterprise support provides expert support from the developers of the connector at a service level agreement suitable for your needs, which may include
- 8/5 support
- 60-minute response times depending on support plan
- Full application lifecycle support from development to operations
- Access to expert support engineers
- Consultancy in case of individual SAP integration requirements
Please contact connector-support@init-software.de for more information.