Data Management

Supported Data Types and Formats

Data Serialization

The connector comes with support for Confluent JSON Converter as well as the AVRO Converter. Using Avro for data serialization requires the connector to translate field names provided by a webservice data source into valid Avro names by replacing illegal characters by an underscore (_).

Data Type Mapping

Message Structure:

  • The connector expects a flat structure in the value part of the Kafka messages.
  • Information in the key part of a message will not be pushed into a webservice data source.

Field Mapping:

  • Mapping of the fields in the value structure of messages to the fields defined for a webservice data source is done based on the field names.
  • SAP data sources are often defined based on InfoObjects containing slashes (/) in the field names. For most data formats slashes are a forbidden character in field names, so they are replaced by an underscore (_) before applying the field name mapping.

Missing Fields, Field Type Mismatch and Conversion Errors:

  • If a field in the webservice data source does not have a corresponding field in the Kafka message, it will be assigned a default value based on its specific data type.
  • If a Kafka message contains a field with a data type, that has no supported mapping for the corresponding field type of the webservice data source, then a conversion connect exception will be thrown.

Supported SAP® Data Types:

SAP JCo defines internal data types in com.sap.conn.jco.JCoMetaData, each corresponding to one of the built-in types of SAP ABAP. The webservice sink connector supports flat structured tables containing the following SAP basic data types and mappings to Kafka Connect org.apache.kafka.connect.data data/schema types:

JCo Kafka Connect Schema Type Restrictions
TYPE_UTCSECOND INT8 between 0 and 315538070400
TYPE_UTCSECOND INT16 between 0 and 315538070400
TYPE_UTCSECOND INT32 between 0 and 315538070400
TYPE_UTCSECOND INT64 between 0 and 315538070400
TYPE_UTCMINUTE INT8 between 0 and 525896784
TYPE_UTCMINUTE INT16 between 0 and 525896784
TYPE_UTCMINUTE INT32 between 0 and 525896784
TYPE_UTCMINUTE INT64 between 0 and 525896784
TYPE_UTCLONG INT8 between 0 and 3155380704000000000
TYPE_UTCLONG INT16 between 0 and 3155380704000000000
TYPE_UTCLONG INT32 between 0 and 3155380704000000000
TYPE_UTCLONG INT6 between 0 and 3155380704000000000
TYPE_BYTE INT8 INT8 interpreted as Byte in an array of length 1
TYPE_BYTE BYTES
TYPE_TSECOND INT8 between 0 and 86401
TYPE_TSECOND INT16 between 0 and 86401
TYPE_TSECOND INT32 between 0 and 86401
TYPE_TMINUTE INT8 between 0 and 1441
TYPE_TMINUTE INT16 between 0 and 1441
TYPE_DTMONTH INT8 between 0 and 119988
TYPE_DTMONTH INT16 between 0 and 119988
TYPE_DTMONTH INT32 between 0 and 119988
TYPE_XSTRING BOOLEAN “X”=true, "“=false | | TYPE_XSTRING | STRING | | | TYPE_XSTRING | schema type not in (MAP,STRUCT,ARRAY) | | | TYPE_STRING | BOOLEAN | ”X“=true, ”"=false
TYPE_STRING STRING
TYPE_STRING schema type not in (MAP,STRUCT,ARRAY)
TYPE_DTWEEK INT8 between 0 and 521725
TYPE_DTWEEK INT16 between 0 and 521725
TYPE_DTWEEK INT32 between 0 and 521725
TYPE_FLOAT FLOAT32
TYPE_FLOAT FLOAT64
TYPE_DTDAY INT8 between 0 and 3652061
TYPE_DTDAY INT16 between 0 and 3652061
TYPE_DTDAY INT32 between 0 and 3652061
TYPE_TIME INT32/logical type Time java.lang.Integer or java.util.Date
TYPE_TIME STRING pattern HHmmss
TYPE_INT8 INT8
TYPE_INT8 INT16
TYPE_INT8 INT32
TYPE_INT8 INT64
TYPE_INT2 INT8
TYPE_INT2 INT16
TYPE_INT1 INT8 between 0 and 255
TYPE_INT1 INT16 between 0 and 255
TYPE_DATE INT32/logical type Date java.lang.Integer or java.util.Date
TYPE_DATE STRING pattern yyyyMMdd
TYPE_CHAR BOOLEAN ‘X’=true, ‘ ’=false
TYPE_CHAR(length) STRING string.length <= length
TYPE_CDAY INT8 between 0 and 366
TYPE_CDAY INT16 between 0 and 366
TYPE_BYTE INT8 INT8 interpreted as Byte
TYPE_NUM(length) INT8 INT8 > 0 and INT8.length <= length
TYPE_NUM(length) INT16 INT16 > 0 and INT16.length <= length
TYPE_NUM(length) INT32 INT32 > 0 and INT32.length <= length
TYPE_NUM(length) INT64 INT64 > 0 and INT64.length <= length
TYPE_NUM(length) STRING string.length <= length and string only contains digits
TYPE_INT INT8
TYPE_INT INT16
TYPE_INT INT32
TYPE_BCD FLOAT32
TYPE_BCD FLOAT64
TYPE_BCD BYTES/logical type Decimal b[] or java.math.BigDecimal
TYPE_DECF16 FLOAT32
TYPE_DECF16 FLOAT64
TYPE_DECF16 BYTES/logical type Decimal b[] or java.math.BigDecimal
TYPE_DECF34 FLOAT32
TYPE_DECF34 FLOAT64
TYPE_DECF34 BYTES/logical type Decimal b[] or java.math.BigDecimal

Single Message Transforms (SMT)

Single Message Transforms (SMTs) allow for lightweight, real-time modifications of data as it passes through Kafka Connect. SMTs can be applied in source connectors before writing data to Kafka topics or in sink connectors before sending data to external systems.

Use Cases for SMTs

  1. Data Filtering: Remove unnecessary fields or records based on conditions, focusing only on relevant data.
  2. Field Manipulation: Modify fields by renaming, masking sensitive data, or changing formats, ensuring consistency and compatibility.
  3. Field Enrichment: Add metadata or default values to provide more context to messages.
  4. Transformation Chains: Combine multiple SMTs for complex transformations.
  5. Routing and Partitioning: Dynamically change the Kafka topic or partition for records based on their contents.

The Webservice Data Source Sink Connector supports the chaining of multiple SMTs, allowing for flexible and powerful data processing, enhancing the capabilities of data pipelines in Kafka Connect.

Schema Registry Integration

The connector is fully compatible with Confluent Schema Registry, allowing seamless integration of data with Kafka topics that use Avro, JSON Schema, or Protobuf formats. This compatibility ensures that all schema definitions for messages are stored and managed centrally, simplifying the development and maintenance of applications that consume the data.

Advantages of Schema Registry Compatibility:

  • Data Consistency: Enforces a well-defined schema for messages, preventing issues like missing fields or type mismatches.
  • Schema Evolution: Supports backward and forward schema compatibility, allowing you to update IDoc structures without breaking existing consumers.
  • Reduced Data Size: Avro and Protobuf serialization minimize the payload size, improving data transmission efficiency.
  • Centralized Schema Management: Simplifies handling multiple message formats by storing all schema versions in one place.

This integration enhances data governance and ensures robust handling of messages in Kafka.