Oracle XStream CDC Source Connector for Confluent Platform

The self-managed Oracle XStream CDC Source connector for Confluent Platform captures all changes made to rows in an Oracle database and represents the changes as change event records in Apache Kafka® topics. The connector uses Oracle’s XStream API to read changes from the database redo log.

Oracle XStream is a set of components and APIs in Oracle database that enables client applications, like the connector, to receive changes from an Oracle database.

The connector leverages XStream Out to capture both Data Manipulation Language (DML) and Data Definition Language (DDL) changes from the database redo log. When XStream Out is used, a capture process captures changes made to an Oracle database, converts the changes into Logical Change Records (LCRs), and sends the LCRs to an outbound server. The outbound server then sends the LCRs to the connector.

Note

The connector is built using the Debezium and Kafka Connect frameworks.

Features

The Oracle XStream CDC Source connector includes the following features:

Snapshot

When you start the connector for the first time, it takes a snapshot of the schema for each captured table and, optionally, captures a consistent snapshot of the current state of the rows for these tables. As part of this snapshot process, the connector acquires a lock (ROW SHARE MODE) on each of the captured tables. This lock, required only for capturing the table schema and not the row data, is hence held for a short duration. The connector uses an Oracle Flashback query to capture the state of the existing rows. You can customize the snapshot behavior by using the snapshot.mode configuration property.

Note

If the connector is interrupted, stopped, or fails during the snapshot process of any tables, upon recovery or restart, the connector restarts all snapshots from the beginning. It is currently not possible to resume a snapshot of a table that is changing while ensuring that all changes to that table have been captured.

The connector supports parallel snapshots. The connector supports a single task, but you can use multiple threads to speed up the snapshot process. The connector distributes the captured tables across the threads, although it does not split a single table across multiple threads. The snapshot.max.threads configuration property controls the number of threads the connector uses during the initial snapshot. To enable parallel processing, you should set this property to a value greater than 1. By using parallel snapshots, the connector can process multiple tables at the same time, improving performance by distributing the workload across the available threads. Each thread uses a separate database connection.

Streaming

After the initial snapshot is completed, the connector starts streaming changes for the specified tables. The connector streams changes from the Oracle database using Oracle’s XStream Out API. During this phase of operation:

  • The connector starts by attaching to the XStream outbound server specified in the database.out.server.name configuration property.
  • After successfully attaching to the outbound server, the connector receives changes made to the captured tables, and writes these changes as records to the appropriate change event topics in Kafka. Each change includes the full state of the row.

The connector receives changes from the database in transaction commit order. It ensures that events for each table are written to the change event topic in the same order as they occurred in the database.

Note

An XStream outbound server can support only one active client session at a time. This means multiple connectors cannot be attached to the same outbound server simultaneously. As a result, separate outbound servers must be configured for each connector.

Change event topics

The connector writes change events for all changes in a table to a specific Apache Kafka® topic dedicated to that table.

The connector uses two configuration properties to identify which tables to capture from the database:

  • The table.include.list configuration specifies a comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes should be captured.
  • The table.exclude.list configuration specifies a comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes should not be captured.

Note

The tables to be captured from the database must be specified in both the connector configuration (for example, using the table.include.list configuration property) and in the rule sets of the capture process and outbound server to which the connector is attached.

The connector can capture changes from tables across different schemas within the same database. A separate change event topic is created for each table being captured, ensuring that changes are streamed to distinct topics per table.

Schema changes

The connector stores the schema of captured tables over time in a dedicated topic, known as the schema history topic.

  • This topic is initially populated with the table schema during the initial snapshot.
  • It is subsequently updated as the connector processes DDL statements (like CREATE, ALTER) during the streaming phase.

Upon a connector restart, the connector reads from this topic to rebuild the schema of each captured table as it existed at the point in time when streaming resumes. This ensures that the connector can correctly interpret the change events based on the schema at the time the changes were made.

You can configure the name of the database schema history topic by using the schema.history.internal.kafka.topic configuration property. This topic should be configured with a single partition and an infinite retention period.

Note

The database schema history topic is intended for internal connector use only.

At-least-once delivery

The connector guarantees that records are delivered at least once to the Kafka topic.

Before and after state for change events

For update operations, the connector emits:

  • The state of the row before the update, with the original values.
  • The state of the row after the update, with the modified values.

Oracle multi-tenant architecture support

Each instance of the connector can capture tables from a single Pluggable Database (PDB). The PDB name, where the tables are located, can be configured using the database.pdb.name configuration property.

Note

If you need to read from tables in the Container Database (CDB), do not specify a value for the database.pdb.name configuration property.

Customizable data type handling

For certain data types, such as numeric and temporal, you can customize how the connector maps them to Connect data types by modifying configuration properties. This allows for greater flexibility in handling different types of data, ensuring that the change events reflect the desired format and meet specific requirements.

Tombstone events

When a row is deleted in the source table, a delete change event is generated and sent to the Kafka topic. Subsequently, the connector emits a tombstone event with the same key as the original record, but with a null value. Tombstone records are used in Kafka’s log compaction process to ensure that only the most recent state of a record is retained in the log.

You can modify this behavior using the tombstones.on.delete configuration property.

Heartbeats

The connector periodically updates the outbound server with the position of the latest change it has processed, enabling the database to purge archived redo logs containing already processed transactions. However, if the database is inactive or no changes are being made to the captured tables, the connector cannot advance the position and update the outbound server.

Heartbeats are a mechanism that allows the connector to continue advancing the position even when the database is inactive or no changes are occurring to the captured tables. When enabled, the connector:

  • Creates a dedicated heartbeat topic.
  • Emits a simple event to this topic at regular intervals as needed.

This interval can be configured using the heartbeat.interval.ms configuration property. It is recommended to set the heartbeat.interval.ms configuration to a value with an order of minutes to hours. The default value of heartbeat.interval.ms is 0, which disables emission of heartbeat records from the connector.

Note

The heartbeat topic is intended for internal connector use only.

Automated error recovery

The connector has automated retries for handling various retriable errors. When a retriable error occurs, the connector automatically restarts in an attempt to recover. It will retry up to three times before stopping and entering a failed state, which requires user intervention to resolve.

The list of retriable errors is fixed and cannot be configured by the user.

Metrics

The connector provides built-in metrics for tracking the progress and performance of snapshot, streaming, and schema history processes. These metrics allow for better monitoring and insight into the connector’s operations, ensuring efficient data capture and processing.

Oracle Real Application Cluster (RAC) support

The connector fully supports Oracle RACs, enabling seamless integration with Oracle’s clustered databases, and ensuring high availability and fault tolerance.

Requirements and current limitations

The following sections provides usage requirements and current limitations.

Oracle versions

The connector is compatible with the following Oracle versions:

  1. Oracle 19c Enterprise Edition
  2. Oracle 21c Enterprise Edition

Java versions

The connector requires Java version 17 or higher.

Confluent Platform versions

The connector can be installed in Kafka Connect clusters running Confluent Platform 7.6 or higher.

Limitations

Be sure to review the following information:

  • The connector has not been tested against Oracle Exadata and managed database services from cloud service providers (CSPs), such as OCI and RDS.
  • The connector does not work with Oracle Autonomous Databases and Oracle Standby databases (using Oracle Data Guard).
  • The connector does not support Downstream Capture configurations.
  • The connector does not support the following Single Message Transforms (SMTs): GzipDecompress, TimestampRouter, and MessageTimestampRouter.

Configuration properties

For a complete list of configuration properties for the Oracle XStream CDC Source connector, see Configuration Reference for Oracle XStream CDC Source Connector for Confluent Platform.

Supported data types

The connector creates change events for database changes. Each change event mirrors the table’s schema, with a field for every column value. The data type of each table column determines how the connector represents the column values in the corresponding change event fields.

For certain data types, such as numeric data types, you can customize how the connector maps them by modifying the default configuration settings. This allows more control over handling various data types, ensuring that the change events reflect the desired format and meet specific requirements.

Character data types

The following table describes how the connector maps character types.

Oracle data type Connect type
CHAR STRING
VARCHAR / VARCHAR2 STRING
NCHAR STRING
NVARCHAR STRING

In all cases, the connector ensures that character data is converted to a string type in Kafka Connect when creating change events.

Numeric data types

You can adjust how the connector maps numeric data types by changing the decimal.handling.mode configuration property.

The table below shows the mapping of numeric types when decimal.handling.mode is set to precise.

Oracle data type Connect type Notes
NUMBER(P, S <= 0) INT8 / INT16 / INT32 / INT64 / BYTES

Based on the precision and scale, the connector selects a matching Kafka Connect integer type:

  • If the precision minus the scale (P - S) is less than 3, it uses INT8.
  • If P - S is less than 5, it uses INT16.
  • If P - S is less than 10, it uses INT32.
  • If P - S is less than 19, it uses INT64.
  • If P - S is 19 or greater, it uses BYTES (org.apache.kafka.connect.data.Decimal).

NUMBER columns with a scale of 0 represent integer numbers. A negative scale indicates rounding in Oracle, for example, a scale of -2 causes rounding to hundreds.

NUMBER(P, S > 0) BYTES org.apache.kafka.connect.data.Decimal
NUMBER(P, [, * ]) STRUCT

io.debezium.data.VariableScaleDecimal

Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form.

SMALLINT, INT, INTEGER BYTES

org.apache.kafka.connect.data.Decimal

Oracle maps SMALLINT, INT and INTEGER to NUMBER(38,0). As a result, these types can hold values that exceed the maximum range of any of the INT types.

NUMERIC, DECIMAL INT8 / INT16 / INT32 / INT64 / BYTES Handles in the same way as the NUMBER data type (note that scale defaults to 0 for NUMERIC).

FLOAT[(P)]

Maps to FLOAT(126) when P not mentioned

STRUCT

io.debezium.data.VariableScaleDecimal

Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form.

REAL - Maps to FLOAT(63)

DOUBLE PRECISION - Maps to FLOAT(126)

STRUCT

io.debezium.data.VariableScaleDecimal

Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form.

BINARY_FLOAT FLOAT32  
BINARY_DOUBLE FLOAT64  

Note

When decimal.handling.mode is set to:

  • string: The Oracle numeric data types are mapped to the Kafka Connect STRING type.
  • double: The Oracle numeric data types are mapped to the Kafka Connect FLOAT64 type.

Temporal data types

You can adjust how the connector maps some of the temporal data types by changing the time.precision.mode configuration property.

The table below shows the mapping of temporal types:

Oracle data type Connect type Notes
DATE INT64

Based on time.precision.mode:

  • adaptive: io.debezium.time.Timestamp
  • connect: org.apache.kafka.connect.data.Timestamp
TIMESTAMP[(P)] INT64

Based on time.precision.mode:

adaptive:

If precision <= 3: io.debezium.time.Timestamp

  • Represents the number of milliseconds since the UNIX epoch, without timezone information.

Else if precision <= 6: io.debezium.time.MicroTimestamp

  • Represents the number of microseconds since the UNIX epoch, without timezone information.

Else: io.debezium.time.NanoTimestamp

  • Represents the number of nanoseconds since the UNIX epoch, without timezone information.

connect:

org.apache.kafka.connect.data.Timestamp

Represents the number of milliseconds since the UNIX epoch, without timezone information.

TIMESTAMP WITH TIMEZONE STRING

io.debezium.time.ZonedTimestamp

A string representation of a timestamp with timezone information.

TIMESTAMP WITH LOCAL TIMEZONE STRING

io.debezium.time.ZonedTimestamp

A string representation of a timestamp in UTC.

INTERVAL YEAR[(P)] TO MONTH STRING

io.debezium.time.Interval

The string representation of the interval value in the ISO 8601 duration format: P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S.

INTERVAL DAY[(P)] TO SECOND[(FP)] STRING

io.debezium.time.Interval

The string representation of the interval value in the ISO 8601 duration format: P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S.

Note

When time.precision.mode is set to connect, there could be a loss of precision if the fractional second precision of a column exceeds 3, because Oracle supports a higher level of precision than the logical types in Kafka Connect.

Oracle End User Terms

In addition to the terms of your applicable agreement with Confluent, your use of the Oracle XStream CDC Source connector for Confluent Platform is subject to the following flow down terms from Oracle:

  • You must provide Confluent with prior notice if you transfer, assign, or grant any rights or interests to another individual or entity with respect to your use of the Oracle XStream CDC Source connector for Confluent Platform.
  • You agree, to the extent permitted by applicable law, that Oracle has no liability for (a) any damages, whether direct, indirect, incidental, special, punitive or consequential, and (b) any loss of profits, revenue, data or data use, arising from the use of the programs with respect to your use of the Oracle XStream CDC Source connector for Confluent Platform.
  • You agree that Oracle is not required to perform any obligations to you as part of your use of the Oracle XStream CDC Source connector for Confluent Platform.
  • Only applicable if you are an end user at any government level. If Oracle suspends any authorization or licenses in connection with the Oracle XStream CDC Source connector for Confluent Platform, Confluent may immediately suspend your access to the Oracle XStream CDC Source connector for Confluent Platform until Confluent resolves the issue with Oracle.