Kafka Connect - Webservice Data Source Sink Connector
The Webservice Data Source Sink Connector is a Kafka Connector for writing data to SAP BI Webservice DataSources. The connector builds upon i-OhJa, a collection of libraries and components written in Scala for interaction with SAP systems.
Dependencies
The connector depends on the SAP® Java Connector 3.1 SDK library to connect to SAP® systems. To run the WebDS sink connector you need to provide a copy of SAP® JCo library v3.1.7 (jar and native library) in the classpath or in the plugin path configured for Kafka Connect.
JCo needs to be obtained separately from the SAP® Marketplace. For more detailed information about licensing terms and how to obtain a license visit the SAP® FAQ and the SAP® connectors’ homepage.
Restrictions and pending features
- SAP BI webservice data sources only support flat, two-dimensional structures.
- The amount of Webservice Data Sources in the evaluation version of the connector is restricted.
- The sink connector does not support processing record values without a value schema.
Configuration
The WebDS connector is fully compatible with the configuration user interface in Confluent Control Center. Compared to using properties files, the configuration UI in Confluent Control Center offers a wider range of features, such as suggested properties, extensive property value recommendations, incremental visibility of applicable configurations, and a rich set of interactive validations.
Once the required fields for an WebDS sink are entered in the configuration UI, a new configuration group for an additional sink will appear. To ensure optimal performance, the maximum number of displayed recommendations in the Confluent Control Center UI is limited to 1000.
For more information, please refer to Configuration Details or the configuration file (doc/sink/configuration.html) in the connector package.
Sink system connection
The WebDS Sink Connector includes configuration properties from SAPĀ® JCo, which are identified by the prefix jco. For a detailed description of these properties, please refer to the Java documentation of the com.sap.conn.jco.ext.DestinationDataProvider interface. The JCo JavaDoc can be found within the sapjco3.jar package.
A minimal SAP JCo client destination configuration looks like this:
# SAP Netweaver application host DNS or IP
jco.client.ashost = 127.0.0.1
# SAP system number
jco.client.sysnr = 20
# SAP client number
jco.client.client = 100
# SAP RFC user
jco.client.user = user
# SAP user password
jco.client.passwd = password
Instead of addressing the application host directly by defining jco.client.ashost, some could use jco.client.mshost to use the Message Server instead for rebalancing system load in SAP.
Encryption and data integrity for the communication between the connector and SAP can be enabled by using SAP Secure network connection (SNC) and the corresponding JCo configuration properties.
The supported authentication types are user/password, SNC Single Sign On (SSO) using X509 certificates and SAP cookie v2 logon ticket.
To establish communication between the connector and the SAP® system and within the framework of authorization management, you need an SAP® user. The user must be either of type Communications or Dialog. To save SAP® dialog resources, the Communications type is recommended. The user must have the following minimum permissions:
-
Object S_RFC:
ACTVT: 16
RFC_NAME: RFCPING, SMLG_GET_DEFINED_GROUPS, RFC_GET_FUNCTION_INTERFACE, RFC_METADATA_GET, /BIC/CQ*
RFC_TYPE: FUNC -
Object S_RFC:
ACTVT: 16
RFC_NAME: RFC1, RSDS_BAPI, RSDS_WS
RFC_TYPE: FUGR
WebDS sink configuration
A minimal connector sink configuration looks like this:
# A SAP BI webservice data source is identified by a primary key combination
# containing a system name and a data source name.
sap.webds#00.system=MYSELF
sap.webds#00.name=ZJCOTESTVERT1
sap.webds#00.topic=ZDO1VERT
topic defines the Kafka input topic from which the connector will consume data.
Available Webservice data sources in the SAP sink system can be looked up by executing transaction RSA1, opening the Modeling - Source Systems - Web Service perspective and clicking on the available system. This will guide you to a list of data sources available for the selected system.
SAP BI Webservice data sources offer real-time data acquisition, allowing source systems to push data sets of any batch size or single events to the webservice data source. These datasets are grouped into requests by the BI system. A request is closed either when the customizable limits for time or size in the corresponding Infopackage are exceeded, or if an RDA daemon actively closes the request. DTPs for RDA and daemon jobs can then be used to automatically transform and write the data to targets like data store objects.
Batching Requests
The connector sends records to SAP BW webservice data sources in batches. The batch size is determined by common consumer configuration settings, and will not be changed by any custom connector configuration settings.
For more information about adjusting the batch size for connectors in general, see batch size for sink connector.
Information
Parallelism
Within Kafka Connect, each task instance is responsible for handling all records received from a set of partitions assigned by the Connect framework. The connector applies to this standard and does not introduce any custom parallelism to alter this behavior. To limit the maximum number of tasks for any sink task, you can use the tasks.max configuration property.
Offset handling
The connector completely applies to the standard way of handling offsets and commits in Kafka connect sink connectors.
Delivery semantics
Kafka Connect commits offsets for data delivered to the sink connector to prevent duplicate data in the target system. To achieve exactly-once delivery, the target system must keep track of message and offset information, which is not provided by default for web service data sources. There may be scenarios where the connector cannot guarantee exactly-once delivery, but rather at-least-once delivery semantics.
Graceful backoff
In case of connection or communication issues with the configured SAP system the connector applies a retry backoff strategy. The maximum number of retry attempts can be configured using property sap.webds.max.retries. After each unsuccessful connection attempt, the connector will pause for a random amount of time between the values specified in the configuration properties sap.webds.min.retry.backoff.ms and sap.webds.max.retry.backoff.ms before executing a retry.
Any exceptions for connection attempts and communication with SAP are assigned an internal exception group. The list of exception groups for which the backoff retry strategy is applied can be configured using property sap.webds.retry.exception.groups. A complete list of exception groups can be found in the SAP JCo JavaDocs of com.sap.conn.jco.JCoException. BAPI return messages of any message class can be included, too.
i-OhJa
i-OhJa is the SAP® integration framework used by the connectors to establish connections to SAP® instances from 3rd-party tools. It provides implementations of various APIs and interface protocols, and enables remote function calls, data transfer, and data format transformation.
- Download the i-ohja One Pager
- SAP® Blog: Big Data integration of SAP Netweaver using i-OhJa
- SAP® Blog: Connect to SAP Netweaver in a Jupyter notebook using i-OhJa
Installation
Manual
Put the connector jar and the SAP JCo jar together with its native libraries in the configured plugin path of Kafka Connect.
Confluent Hub CLI
The Confluent platform provides a command line installation interface that can be used to install a connector zip file from a local file system. In addition to that SAP JCo needs to be copied manually to the lib directory of the connectors target installation path.
License
See Lizenzvereinbarung for INITs evaluation license and Dependencies for more information on the dependencies’ licenses. (doc/licenses.html file in connector package)
Supported data types
The connector expects a flat structure in the value part of the Kafka messages. Information in the key part of a message will not be pushed into a webservice data source.
Mapping of the fields in the value structure of messages to the fields defined for a webservice data source is done based on the field names. SAP data sources are often defined based on infoobjects containing slashes in the field names. For most data formats slashes are a forbidden character in field names, so they are replaced by a *_* before applying the field name mapping.
Fields of the webservice data source for which no matching field in the corresponding kafka message is defined will be assigned a default value according to the specific data type. If a Kafka messages contains a field with a data type that has no supported data type mapping for the corresponding field type of the webservice data source, then a conversion connect exception will be thrown.
SAP JCo defines internal data types in com.sap.conn.jco.JCoMetaData, each corresponding to one of the built-in types of SAP ABAP. The webservice sink connector supports flat structured tables containing the following SAP basic data types and mappings to Kafka connect org.apache.kafka.connect.data data / schema types:
JCo | Kafka Connect Schema Type | Restrictions |
---|---|---|
TYPE_UTCSECOND | INT8 | between 0 and 315538070400 |
TYPE_UTCSECOND | INT16 | between 0 and 315538070400 |
TYPE_UTCSECOND | INT32 | between 0 and 315538070400 |
TYPE_UTCSECOND | INT64 | between 0 and 315538070400 |
TYPE_UTCMINUTE | INT8 | between 0 and 525896784 |
TYPE_UTCMINUTE | INT16 | between 0 and 525896784 |
TYPE_UTCMINUTE | INT32 | between 0 and 525896784 |
TYPE_UTCMINUTE | INT64 | between 0 and 525896784 |
TYPE_UTCLONG | INT8 | between 0 and 3155380704000000000 |
TYPE_UTCLONG | INT16 | between 0 and 3155380704000000000 |
TYPE_UTCLONG | INT32 | between 0 and 3155380704000000000 |
TYPE_UTCLONG | INT6 | between 0 and 3155380704000000000 |
TYPE_BYTE | INT8 | INT8 interpreted as Byte in an array of length 1 |
TYPE_BYTE | BYTES | |
TYPE_TSECOND | INT8 | between 0 and 86401 |
TYPE_TSECOND | INT16 | between 0 and 86401 |
TYPE_TSECOND | INT32 | between 0 and 86401 |
TYPE_TMINUTE | INT8 | between 0 and 1441 |
TYPE_TMINUTE | INT16 | between 0 and 1441 |
TYPE_DTMONTH | INT8 | between 0 and 119988 |
TYPE_DTMONTH | INT16 | between 0 and 119988 |
TYPE_DTMONTH | INT32 | between 0 and 119988 |
TYPE_XSTRING | BOOLEAN | “X”=true, "“=false | | TYPE_XSTRING | STRING | | | TYPE_XSTRING | schema type not in (MAP,STRUCT,ARRAY) | | | TYPE_STRING | BOOLEAN | ”X“=true, ”"=false |
TYPE_STRING | STRING | |
TYPE_STRING | schema type not in (MAP,STRUCT,ARRAY) | |
TYPE_DTWEEK | INT8 | between 0 and 521725 |
TYPE_DTWEEK | INT16 | between 0 and 521725 |
TYPE_DTWEEK | INT32 | between 0 and 521725 |
TYPE_FLOAT | FLOAT32 | |
TYPE_FLOAT | FLOAT64 | |
TYPE_DTDAY | INT8 | between 0 and 3652061 |
TYPE_DTDAY | INT16 | between 0 and 3652061 |
TYPE_DTDAY | INT32 | between 0 and 3652061 |
TYPE_TIME | INT32/logical type Time | java.lang.Integer or java.util.Date |
TYPE_TIME | STRING | pattern HHmmss |
TYPE_INT8 | INT8 | |
TYPE_INT8 | INT16 | |
TYPE_INT8 | INT32 | |
TYPE_INT8 | INT64 | |
TYPE_INT2 | INT8 | |
TYPE_INT2 | INT16 | |
TYPE_INT1 | INT8 | between 0 and 255 |
TYPE_INT1 | INT16 | between 0 and 255 |
TYPE_DATE | INT32/logical type Date | java.lang.Integer or java.util.Date |
TYPE_DATE | STRING | pattern yyyyMMdd |
TYPE_CHAR | BOOLEAN | ‘X’=true, ‘ ’=false |
TYPE_CHAR(length) | STRING | string.length <= length |
TYPE_CDAY | INT8 | between 0 and 366 |
TYPE_CDAY | INT16 | between 0 and 366 |
TYPE_BYTE | INT8 | INT8 interpreted as Byte |
TYPE_NUM(length) | INT8 | INT8 > 0 and INT8.length <= length |
TYPE_NUM(length) | INT16 | INT16 > 0 and INT16.length <= length |
TYPE_NUM(length) | INT32 | INT32 > 0 and INT32.length <= length |
TYPE_NUM(length) | INT64 | INT64 > 0 and INT64.length <= length |
TYPE_NUM(length) | STRING | string.length <= length and string only contains digits |
TYPE_INT | INT8 | |
TYPE_INT | INT16 | |
TYPE_INT | INT32 | |
TYPE_BCD | FLOAT32 | |
TYPE_BCD | FLOAT64 | |
TYPE_BCD | BYTES/logical type Decimal | b[] or java.math.BigDecimal |
TYPE_DECF16 | FLOAT32 | |
TYPE_DECF16 | FLOAT64 | |
TYPE_DECF16 | BYTES/logical type Decimal | b[] or java.math.BigDecimal |
TYPE_DECF34 | FLOAT32 | |
TYPE_DECF34 | FLOAT64 | |
TYPE_DECF34 | BYTES/logical type Decimal | b[] or java.math.BigDecimal |
Usage
Packaging
The webservice sink connector package name is init-kafka-connect-webds-<connector version>.zip
The zip archive includes one folder called init-kafka-connect-webds-<connector version>, which itself contains the nested folders lib, etc, _assets, and doc.
- lib/ contains the java archive(s) needed to be extracted into the plugin.path of Kafka Connect.
- etc/ contains a sample properties file for the connector that can be supplied as an argument in the CLI during startup of Kafka Connect or by upload to the Confluent Control Center.
- doc/ contains more detailed documentation about the connector like licenses, readme, configurations and so on.
- assets/ contains media files like icons and company logos.
Data Serialization
The connector comes with support for Confluent JSON Converter as well as the AVRO Converter. Using Avro for data serialization requires the connector to translate field names provided by a webservice data source into valid Avro names by replacing illegal characters with "_".
SMT
The use of Single Message Transforms in a source connector allows for each record to be passed through one or a chain of several simple transformations before writing to the Kafka topic. The connector supports SMT and has been successfully tested with a concatenation of two SMTs.
Error Handling
The connector applies different kinds of validations and error handling mechanisms like configuration validation, upstream connection tests and connection retries with graceful back-off.
Single configuration parameter validation extending the Validator class will throw exceptions of type ConfigException in case of invalid configuration values. The corresponding parameters containing invalid values will be framed in red in the Confluent Control Center. The connector maps known exceptions to exception type ConnectException, which can be handled by Kafka connect accordingly.
Errors and warnings are logged using SLF4J, as described in section Logging.
The sink connector supports error handling using the Kafka Connect Reporter and can be configured to write erroneous records to a dead letter queue.
Logging
The connector makes use of SLF4J for logging integration. The logger uses the logger name org.init.ohja.kafka.connect.webds.sink and can be configured e.g., in the log4j configuration properties in the Confluent Platform.
SAP JCo includes a logger called com.sap.conn.jco which can only be used with log4j. In addition to setting the logging level for the JCo logger one can use configuration property jco.trace_level to fine tune the level of logging.
The connector provides additional log location information ohja.location using MDC (mapped diagnostic context). The log location contains the name of the nearest enclosing definition of val, class, trait, object or package and the line number.
Example Log4j 1.x appender:
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m %X{ohja.location}%n
Data Source monitoring in SAP
SAP BI related transaction RSA1 can be used for monitoring and administration purposes. In the Modeling - Data Sources perspective you fill be able to see the open and closed data load requests and you will have the possibility to inspect the data persisted in the so called Persisting Staging Area.
CLI
The kafka-connect-webds connector contains a command line interface to validate connector properties, test the connection to a SAP system, retrieve a list of available webservice data source and query details of a webservice data source.
Since the CLI is written in Scala you can execute it in Scala or Java. To run the CLI you need to provide the following dependencies in the CLASSPATH:
- Scala runtime libraries
- kafka-clients
- kafka-connect-webds connector libraries
- SAP Java Connector 3.1 SDK
Since a java runtime is provided by the Confluent Platform and the Scala runtime libraries are part of the connector package, executing the CLI with Java would not require installing Scala.
Java
java -cp <kafka-connect-webds>:<kafka-clients>:<sapjco3> org.init.ohja.kafka.connect.webds.sink.WebDSDetailsApp <command> <options>
The Confluent Platform has kafka libraries available in /usr/share/java/kafka/. When webds sink connector package and sapjco3 are installed to the plugin path of Connect, the command could look like this:
java -cp \
<CONNECT_PLUGIN_PATH>/init-kafka-connect-webds-x.x.x/lib/*:\
/usr/share/java/kafka/* \
org.init.ohja.kafka.connect.webds.sink.WebDSDetailsApp \
<commands> <options>
Scala
If an appropriate version of Scala is installed the scala command can be used. This command already provides the necessary Scala runtime libraries.
scala -cp <kafka-connect-webds>:<kafka-clients>:<sapjco3> org.init.ohja.kafka.connect.webds.sink.WebDSDetailsApp <command> <options>
The output will look like this:
usage:
WebDSDetailsApp <command> <options>
commands:
ping
list-webds
webds-details -s <source system> -w <data source name>
mandatory options:
-p <path to connector properties file>
Support
Full enterprise support provides expert support from the developers of the connector at a service level agreement suitable for your needs, which may include
- 8/5 support
- 60-minute response times depending on support plan
- Full application lifecycle support from development to operations
- Access to expert support engineers
- Consultancy in case of individual SAP integration requirements
Please contact connector-support@init-software.de for more information.