Data Management
Supported Data Types and Formats
Data Serialization
The connector comes with support for Confluent JSON Converter as well as the AVRO Converter. Using Avro for data serialization requires the connector to translate field names provided by a webservice data source into valid Avro names by replacing illegal characters by an underscore (_
).
Data Type Mapping
Message Structure:
- The connector expects a flat structure in the value part of the Kafka messages.
- Information in the key part of a message will not be pushed into a webservice data source.
Field Mapping:
- Mapping of the fields in the value structure of messages to the fields defined for a webservice data source is done based on the field names.
- SAP data sources are often defined based on InfoObjects containing slashes (
/
) in the field names. For most data formats slashes are a forbidden character in field names, so they are replaced by an underscore (_
) before applying the field name mapping.
Missing Fields, Field Type Mismatch and Conversion Errors:
- If a field in the webservice data source does not have a corresponding field in the Kafka message, it will be assigned a default value based on its specific data type.
- If a Kafka message contains a field with a data type, that has no supported mapping for the corresponding field type of the webservice data source, then a conversion connect exception will be thrown.
Supported SAP® Data Types:
SAP JCo defines internal data types in com.sap.conn.jco.JCoMetaData
, each corresponding to one of the built-in types of SAP ABAP. The webservice sink connector supports flat structured tables containing the following SAP basic data types and mappings to Kafka Connect org.apache.kafka.connect.data
data/schema types:
JCo | Kafka Connect Schema Type | Restrictions |
---|---|---|
TYPE_UTCSECOND | INT8 | between 0 and 315538070400 |
TYPE_UTCSECOND | INT16 | between 0 and 315538070400 |
TYPE_UTCSECOND | INT32 | between 0 and 315538070400 |
TYPE_UTCSECOND | INT64 | between 0 and 315538070400 |
TYPE_UTCMINUTE | INT8 | between 0 and 525896784 |
TYPE_UTCMINUTE | INT16 | between 0 and 525896784 |
TYPE_UTCMINUTE | INT32 | between 0 and 525896784 |
TYPE_UTCMINUTE | INT64 | between 0 and 525896784 |
TYPE_UTCLONG | INT8 | between 0 and 3155380704000000000 |
TYPE_UTCLONG | INT16 | between 0 and 3155380704000000000 |
TYPE_UTCLONG | INT32 | between 0 and 3155380704000000000 |
TYPE_UTCLONG | INT6 | between 0 and 3155380704000000000 |
TYPE_BYTE | INT8 | INT8 interpreted as Byte in an array of length 1 |
TYPE_BYTE | BYTES | |
TYPE_TSECOND | INT8 | between 0 and 86401 |
TYPE_TSECOND | INT16 | between 0 and 86401 |
TYPE_TSECOND | INT32 | between 0 and 86401 |
TYPE_TMINUTE | INT8 | between 0 and 1441 |
TYPE_TMINUTE | INT16 | between 0 and 1441 |
TYPE_DTMONTH | INT8 | between 0 and 119988 |
TYPE_DTMONTH | INT16 | between 0 and 119988 |
TYPE_DTMONTH | INT32 | between 0 and 119988 |
TYPE_XSTRING | BOOLEAN | “X”=true, "“=false | | TYPE_XSTRING | STRING | | | TYPE_XSTRING | schema type not in (MAP,STRUCT,ARRAY) | | | TYPE_STRING | BOOLEAN | ”X“=true, ”"=false |
TYPE_STRING | STRING | |
TYPE_STRING | schema type not in (MAP,STRUCT,ARRAY) | |
TYPE_DTWEEK | INT8 | between 0 and 521725 |
TYPE_DTWEEK | INT16 | between 0 and 521725 |
TYPE_DTWEEK | INT32 | between 0 and 521725 |
TYPE_FLOAT | FLOAT32 | |
TYPE_FLOAT | FLOAT64 | |
TYPE_DTDAY | INT8 | between 0 and 3652061 |
TYPE_DTDAY | INT16 | between 0 and 3652061 |
TYPE_DTDAY | INT32 | between 0 and 3652061 |
TYPE_TIME | INT32/logical type Time | java.lang.Integer or java.util.Date |
TYPE_TIME | STRING | pattern HHmmss |
TYPE_INT8 | INT8 | |
TYPE_INT8 | INT16 | |
TYPE_INT8 | INT32 | |
TYPE_INT8 | INT64 | |
TYPE_INT2 | INT8 | |
TYPE_INT2 | INT16 | |
TYPE_INT1 | INT8 | between 0 and 255 |
TYPE_INT1 | INT16 | between 0 and 255 |
TYPE_DATE | INT32/logical type Date | java.lang.Integer or java.util.Date |
TYPE_DATE | STRING | pattern yyyyMMdd |
TYPE_CHAR | BOOLEAN | ‘X’=true, ‘ ’=false |
TYPE_CHAR(length) | STRING | string.length <= length |
TYPE_CDAY | INT8 | between 0 and 366 |
TYPE_CDAY | INT16 | between 0 and 366 |
TYPE_BYTE | INT8 | INT8 interpreted as Byte |
TYPE_NUM(length) | INT8 | INT8 > 0 and INT8.length <= length |
TYPE_NUM(length) | INT16 | INT16 > 0 and INT16.length <= length |
TYPE_NUM(length) | INT32 | INT32 > 0 and INT32.length <= length |
TYPE_NUM(length) | INT64 | INT64 > 0 and INT64.length <= length |
TYPE_NUM(length) | STRING | string.length <= length and string only contains digits |
TYPE_INT | INT8 | |
TYPE_INT | INT16 | |
TYPE_INT | INT32 | |
TYPE_BCD | FLOAT32 | |
TYPE_BCD | FLOAT64 | |
TYPE_BCD | BYTES/logical type Decimal | b[] or java.math.BigDecimal |
TYPE_DECF16 | FLOAT32 | |
TYPE_DECF16 | FLOAT64 | |
TYPE_DECF16 | BYTES/logical type Decimal | b[] or java.math.BigDecimal |
TYPE_DECF34 | FLOAT32 | |
TYPE_DECF34 | FLOAT64 | |
TYPE_DECF34 | BYTES/logical type Decimal | b[] or java.math.BigDecimal |
Single Message Transforms (SMT)
Single Message Transforms (SMTs) allow for lightweight, real-time modifications of data as it passes through Kafka Connect. SMTs can be applied in source connectors before writing data to Kafka topics or in sink connectors before sending data to external systems.
Use Cases for SMTs
- Data Filtering: Remove unnecessary fields or records based on conditions, focusing only on relevant data.
- Field Manipulation: Modify fields by renaming, masking sensitive data, or changing formats, ensuring consistency and compatibility.
- Field Enrichment: Add metadata or default values to provide more context to messages.
- Transformation Chains: Combine multiple SMTs for complex transformations.
- Routing and Partitioning: Dynamically change the Kafka topic or partition for records based on their contents.
The Webservice Data Source Sink Connector supports the chaining of multiple SMTs, allowing for flexible and powerful data processing, enhancing the capabilities of data pipelines in Kafka Connect.
Schema Registry Integration
The connector is fully compatible with Confluent Schema Registry, allowing seamless integration of data with Kafka topics that use Avro, JSON Schema, or Protobuf formats. This compatibility ensures that all schema definitions for messages are stored and managed centrally, simplifying the development and maintenance of applications that consume the data.
Advantages of Schema Registry Compatibility:
- Data Consistency: Enforces a well-defined schema for messages, preventing issues like missing fields or type mismatches.
- Schema Evolution: Supports backward and forward schema compatibility, allowing you to update IDoc structures without breaking existing consumers.
- Reduced Data Size: Avro and Protobuf serialization minimize the payload size, improving data transmission efficiency.
- Centralized Schema Management: Simplifies handling multiple message formats by storing all schema versions in one place.
This integration enhances data governance and ensures robust handling of messages in Kafka.