Kafka Connect - SAP RFC Source Connector
The SAP RFC Source Connector is a Kafka Connector for retrieving data from SAP RFC/RFM (remote enabled function modules).
The connector builds upon i-OhJa, a collection of libraries and components written in Scala for interaction with SAP systems.
Dependencies
The connector depends on the SAP® Java Connector 3.1 SDK library to connect to SAP® systems. To run the RFC source connector you need to provide a copy of SAP® JCo library v3.1.8 (jar and native library) in the classpath or in the plugin path configured for Kafka Connect.
JCo needs to be obtained separately from the SAP® Marketplace. For more detailed information about licensing terms and how to obtain a license visit the SAP® FAQ and the SAP® connectors’ homepage.
License
See licenses for INITs evaluation license (DE and EN) or for more information on the dependencies’ licenses.
Installation
Packaging
The RFC source connector package name is: init-kafka-connect-rfc-<connector version>.zip
The zip archive includes one folder called init-kafka-connect-rfc-<connector version>, which itself contains the nested folders lib, etc, assets and doc.
- lib/ contains the uber Java archive that needs to be extracted into the plugin.path of Kafka Connect.
- etc/ contains a sample properties file for the connector that can be supplied as an argument in the CLI during startup of Kafka Connect or by upload to the Confluent Control Center.
- doc/ contains more detailed documentation about the connector like licenses, readme, configurations and so on.
- assets/ contains media files like icons and company logos.
Manual
Put the connector jar and the SAP JCo jar together with its native libraries in the configured plugin path of Kafka Connect.
Confluent CLI
The Confluent platform provides a command line installation interface that can be used to install the connector zip file from a local file system, see Confluent CLI Command Reference. In addition to that SAP JCo needs to be copied manually to the lib directory of the connectors target installation path.
Configuration
The RFC/RFM connector is fully compatible with the configuration user interface in Confluent Control Center. Compared to using properties files, the configuration UI in Confluent Control Center offers a wider range of features, such as suggested properties, extensive property value recommendations, incremental visibility of applicable configurations, and a rich set of interactive validations.
Once the required fields for an RFC/RFM source are entered in the configuration UI, a new configuration group for an additional source will appear. To ensure optimal performance, the maximum number of displayed recommendations in the Confluent Control Center UI is limited to 1000.
Value recommendations for the group name are loaded only after entering a prefix with an asterisk, such as “Z*”. For performance reasons, the maximum amount of displayed recommendations in the UI of the Confluent Control Center is set to 1000.
For more information, please refer to Configuration Details or the configuration file (doc/source/configuration.html) in the connector package.
Source system connection
The RFC/RFM Source Connector includes configuration properties from SAPĀ® JCo, which are identified by the prefix jco. For a detailed description of these properties, please refer to the Java documentation of the com.sap.conn.jco.ext.DestinationDataProvider interface. The JCo JavaDoc can be found within the sapjco3.jar package.
A minimal SAP JCo client destination configuration looks like this:
# SAP Netweaver application host DNS or IP
jco.client.ashost = 127.0.0.1
# SAP system number
jco.client.sysnr = 20
# SAP client number
jco.client.client = 100
# SAP RFC user
jco.client.user = user
# SAP user password
jco.client.passwd = password
Instead of addressing the application host directly by defining jco.client.ashost, some could use jco.client.mshost to use the Message Server instead for rebalancing system load in SAP.
Encryption and data integrity for the communication between the connector and SAP can be enabled by using SAP Secure network connection (SNC) and the corresponding JCo configuration properties.
The supported authentication types are user/password, SNC Single Sign On (SSO) using X509 certificates and SAP cookie v2 logon ticket.
The SAP user needs at least following authorizations for extracting data out of e.g. a SAPI data source:
-
Object S_RFC:
ACTVT: 16
RFC_NAME: RFCPING, SMLG_GET_DEFINED_GROUPS, RFC_GET_FUNCTION_INTERFACE, RFC_METADATA_GET
RFC_TYPE: FUNC -
Object S_RFC:
ACTVT: 16
RFC_NAME: RFC1, RSDS_BAPI
RFC_TYPE: FUGR
RFC source configuration
A minimal connector source configuration looks like this:
sap.rfc#00.name = ZFUNCTIONNAME
sap.rfc#00.group = ZFUNCTIONGROUP
sap.rfc#00.offset-parameter.in = IS_OFFSET
sap.rfc#00.offset-parameter.out = ES_OFFSET
sap.rfc#00.topic = TESTTOPIC
- The name and group properties have to match what has been configured in SAP as name and group for the corresponding remote enabled function module. To load the recommender, a prefix must be entered. The group name does not have to be uniquely chosen from among the recommenders. The function name, on the other hand, must be unique and can be selected from the recommenders.
- offset-parameter.out is an optional configuration parameter and refers to an exporting parameter of the remote enabled function module. Its intent is to provide a unique logical offset for the results of each function call. Simple data structures as well as deeply nested structures are supported.
- offset-parameter.in refers to an importing parameter of the remote enabled function module and needs to have a compatible data type to offset-parameter.out. During runtime the connector will provide the latest offset being processed by the connector to the next function call.
- topic defines the Kafka output topic the connector producer will use to publish extracted data.
Optional data conversion settings
- decimal.mapping can be used to transform DECIMAL types to other appropriate data types if needed.
decimal.mapping = primitive will transform decimals to double or long, depending on the scale. - SAP defines an own internal format for storing amounts, e.g. 1 japanese yen is stored as 0.01. You can change the representation of currency amounts by setting configuration property currency.conversion.
Parallelism
The RFC source connector achieves parallelism by assigning each RFC/RFM to a single worker task. If the number of configured RFCs/RFMs exceeds either the maxTasks configuration or the available number of tasks, a single task will handle extractions for multiple RFMs. Scaling by adding tasks therefore only makes sense if the amount of configured RFMs is greater than the amount of available tasks.
In addition to that scaling by adding topic partitions makes no sense at all, as we only use one partition to guarantee sequential order.
Offset handling
The RFM configured to be called by the connector can return an arbitrary offset using an exporting parameter of the RFM, configured using configuration offset-parameter.out. The offset parameters can have an arbitrary type, as long as contained data types are supported by the connector. In a succeeding function call to the RFM the connector will pass the offset value returned to the preceding call using the RFM’s offset importing parameter, configured using configuration offset-parameter.in.
Delivery guarantees
The connector offers no special delivering guarantees. The delivering guarantees depend on the implementation of the remote enabled function module of the SAP source system.
At-least-once semantics can be achieved if the RFM makes proper use of the offset parameters configured by offset-parameter.in and offset-parameter.out. These parameters can be used to implement a kind of source system change data capture and will automatically be synchronized with Kafka offsets by the connector.
Exactly-once-semantics can only be achieved if a function call is regarded as a single message, as offsets are related to single function calls and not e.g. to single records as part of a table parameter of the RFM.
The RFC source connector stands out by ensuring that data is processed exactly once when offset-parameter.in and offset-parameter.out are not empty.
To enable exactly-once support for source connectors in your Kafka cluster, you will need to update the worker-level configuration property exactly.once.source.support to either preparing or enabled.
When using a source connector, you can configure the exactly.once.support property to make exactly-once delivery either requested or required (default is requested).
-
required the connector will undergo a preflight check to ensure it can provide exactly-once delivery. If the connector or worker cannot support exactly-once delivery, creation or validation requests will fail.
-
requested the connector will not undergo a preflight check for exactly-once delivery.
Graceful backoff
In case of connection or communication issues with the configured SAP system the connector applies a retry backoff strategy. The maximum number of retry attempts can be configured using property sap.rfc.max.retries. After each unsuccessful connection attempt, the connector will pause for a random amount of time between the values specified in the configuration properties sap.rfc.min.retry.backoff.ms and sap.rfc.max.retry.backoff.ms.
Any exceptions for connection attempts and communication with SAP are assigned an internal exception group. The list of exception groups for which the backoff retry strategy is applied can be configured using property sap.rfc.retry.exception.groups. A complete list of exception groups can be found in the SAP JCo JavaDocs of com.sap.conn.jco.JCoException. BAPI return messages of any message class can be included, too.
Partitioning
The partitions provided with each RFC/RFM result equals the name of the function module, as each function module name is unique per SAP instance.
JMX metrics
The RFC Source Connector supports all the connector and task metrics provided by Kafka Connect through Java Management Extensions (JMX). In addition, the RFC Source Connector provides extra JMX metrics for accessing state managed by the connector.
MBean: org.init.ohja.kafka.connect:type=rfc-source-task-metrics,connector=([-.w]+),task=([d]+)
Metric | Explanation |
---|---|
retries | Count of retries performed in the connector task that is in retrying state. |
${configGroup}-rfc-group | RFC function group name configured in the configuration group. |
${configGroup}-rfc-function | RFC function name configured in the configuration group. |
${configGroup}-offset | Current logical connector offset for for RFC function configured in the configuration group. |
i-OhJa
i-OhJa is the SAP® integration framework used by the connectors to establish connections to SAP® instances from 3rd-party tools. It provides implementations of various APIs and interface protocols, and enables remote function calls, data transfer, and data format transformation.
- SAP® Blog: Big Data integration of SAP Netweaver using i-OhJa
- SAP® Blog: Connect to SAP Netweaver in a Jupyter notebook using i-OhJa
Supported data types
SAP JCo defines internal data types in com.sap.conn.jco.JCoMetaData, each corresponding to one of the built-in types of SAP ABAP. The RFC source connector supports nested data structures and adheres to the following mappings of SAP data types to Kafka connect.
JCo | Kafka Connect Schema Type | Java data type |
---|---|---|
TYPE_CHAR | STRING | java.lang.String |
TYPE_DECF16 | Decimal | java.math.BigDecimal |
TYPE_DECF34 | Decimal | java.math.BigDecimal |
TYPE_DATE | Date | java.util.Date |
TYPE_BCD | Decimal | java.math.BigDecimal |
TYPE_FLOAT | FLOAT64 | java.lang.Double |
TYPE_INT1 | INT16 | java.lang.Short |
TYPE_INT2 | INT16 | java.lang.Short |
TYPE_INT | INT32 | java.lang.Integer |
TYPE_INT8 | INT64 | java.lang.Long |
TYPE_BYTE | BYTES | java.nio.ByteBuffer |
TYPE_NUM | STRING | java.lang.String |
TYPE_XSTRING | STRING | java.lang.String |
TYPE_TIME | Time | java.util.Date |
TYPE_STRING | STRING | java.lang.String |
TYPE_UTCLONG | INT64 | java.lang.Long |
TYPE_UTCMINUTE | INT64 | java.lang.Long |
TYPE_UTCSECOND | INT64 | java.lang.Long |
TYPE_DTDAY | INT32 | java.lang.Integer |
TYPE_DTWEEK | INT32 | java.lang.Integer |
TYPE_DTMONTH | INT32 | java.lang.Integer |
TYPE_TSECOND | INT32 | java.lang.Integer |
TYPE_TMINUTE | INT16 | java.lang.Short |
TYPE_CDAY | INT32 | java.lang.Integer |
TYPE_STRUCTURE | STRUCT | java.util.Map[String, Object] |
TYPE_TABLE | ARRAY | java.util.List[Struct] |
Supported features
Data serialization
The RFC connector supports both the Confluent JSON Converter and the AVRO Converter. Using Avro for data serialization requires the RFC source connector to translate field names provided by an RFC/RFM into valid Avro names by replacing illegal characters with "_".
SMT
The use of Single Message Transforms in a source connector allows for each record to be passed through one or a chain of several simple transformations before writing to the Kafka topic. The RFC source connector supports SMT and has been successfully tested with a concatenation of two SMTs.
Error handling
The RFC connector applies different kind of validations and error handling mechanisms like configuration validation, offset recovery, upstream connection tests, connection retries with graceful back-off and repeatable source request extractions.
Single configuration parameter validation extending the Validator class will throw exceptions of type ConfigException in case of invalid configuration values. Additionally, the connector overrides the validate method to validate interdependent configuration parameters and adds error messages to class ConfigValue in case of any invalid parameter values. The corresponding parameters containing invalid values will be framed in red in the Confluent Control Center.
The connector maps known exceptions to exception type ConnectException, which can be handled by Kafka connect accordingly. Errors and warnings are logged using SLF4J, as described in section Logging.
Logging
The RFC connector makes use of SLF4J for logging integration. The logger uses the logger name org.init.ohja.kafka.connect.rfc.source and can be configured e.g., in the log4j configuration properties in the Confluent Platform.
SAP JCo includes a logger called com.sap.conn.jco which can only be used with log4j. In addition to setting the logging level for the JCo logger one can use configuration property jco.trace_level to fine tune the level of logging.
The connector provides additional log location information ohja.location using MDC (mapped diagnostic context). The log location contains the name of the nearest enclosing definition of val, class, trait, object or package and the line number.
Example Log4j 1.x appender:
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m %X{ohja.location}%n
Message schema
When a remote enabled function module is called, the resulting message and schema include all of the parameters defined in the function module, including the IMPORTING parameters.
CLI tool
The kafka-connect-rfc connector contains a command line interface to validate connector properties, test the connection to a SAP system, retrieve a list of available RFMs and query details of an RFM.
Since the CLI is written in Scala you can execute it in Scala or Java. To run the CLI you need to provide the following dependencies in the CLASSPATH:
- Scala runtime libraries
- kafka-clients
- kafka-connect-rfc connector libraries
- SAP Java Connector 3.1 SDK
Since a Java runtime is provided by the Confluent Platform and the Scala runtime libraries are part of the connector package, executing the CLI with Java would not require installing Scala.
Java
java -cp <kafka-connect-rfc>:<kafka-clients>:<sapjco3> org.init.ohja.kafka.connect.rfc.source.RFCDetailsApp <command> <options>
The Confluent Platform has kafka libraries available in /usr/share/java/kafka/. When RFC source connector package and sapjco3 are installed to the plugin path of Connect, the command could look like this:
java -cp \
<CONNECT_PLUGIN_PATH>/init-kafka-connect-rfc-x.x.x/lib/*:\
/usr/share/java/kafka/*: \
/usr/share/java/kafka-serde-tools/* \
org.init.ohja.kafka.connect.rfc.source.RFCDetailsApp \
<commands> <options>
Scala
If an appropriate version of Scala is installed the Scala command can be used. This command already provides the necessary Scala runtime libraries.
scala -cp <kafka-connect-rfc>:<kafka-clients>:<sapjco3> org.init.ohja.kafka.connect.rfc.source.RFCDetailsApp <command> <options>
The output will look like this:
usage:
RFCDetailsApp <command> <options>
commands:
ping
list-groups
list-rfm -g <group>
rfm-details -n <rfm name>
extract-schema -n <rfm name>
mandatory options:
-p <path to connector properties file>
Hint: Avro schemas may differ if the Single Message Transform(s) in the connector configuration are used.
Restrictions and pending features
- The amount of RFC/RFM modules in the evaluation version of the connector is restricted.
- Offsets are assigned to single function calls and not to e.g. single records as part of a table typed parameter.
Full enterprise support
Full enterprise support provides expert support from the developers of the connector at a service level agreement suitable for your needs, which may include
- 8/5 support
- 60-minute response times depending on support plan
- Full application lifecycle support from development to operations
- Access to expert support engineers
- Consultancy in case of individual SAP integration requirements
Please contact connector-contact@init-software.de for more information.