Stream Changes from Oracle Database to Confluent Platform

The connector creates change events for database changes and sends them to Kafka topics. A change event includes a key and a value, with their structures based on the schema of the table where the change occurred.

Every message in key and value consists of two components: a schema and a payload. The schema defines the structure of the payload, and the payload holds the actual data.

Keys

The key includes a field for each column in the table’s primary key. If the table does not have a primary key but has a unique index, then the connector uses the column(s) from the unique index as the change event key(s).

For example, consider the following employees table from the sample database schema:

CREATE TABLE sample.employees (
  id NUMBER(9) NOT NULL PRIMARY KEY,
  first_name VARCHAR2(64) NOT NULL,
  last_name VARCHAR2(64),
  hire_date TIMESTAMP
);

The connector is configured with AVRO data format. The topic.prefix configuration property is set to cflt.

The schema describes the structure of the key column(s). In this example, there is a single field ID of type int corresponding to the id column in the employees table.

{
  "connect.name": "cflt.SAMPLE.EMPLOYEES.Key",
  "fields": [
   {
     "name": "ID",
     "type": "int"
   }
  ],
  "name": "Key",
  "namespace": "cflt.SAMPLE.EMPLOYEES",
  "type": "record"
}

Values

The structure of the value in a change event message mirrors that of the key. It includes both a schema section, which describes the structure of the payload, and a payload section, which holds the actual data.

The schema of the change event’s values describes the envelope structure of the payload and the nested fields within it.

{
  "connect.name": "cflt.SAMPLE.EMPLOYEES.Envelope",
  "connect.version": 2,
  "fields": [
   {
     "default": null,
     "name": "before",
     "type": [
     "null",
      {
        "connect.name": "cflt.SAMPLE.EMPLOYEES.Value",
        "fields": [
         ...
        ],
        "name": "Value",
        "type": "record"
      }
     ]
   },
   {
     "default": null,
     "name": "after",
     "type": [
     "null",
     "Value"
     ]
   },
   {
     "name": "source",
     "type": {
     "connect.name": "io.confluent.connect.oracle.xstream.Source",
     "fields": [
      ...
     ],
     "name": "Source",
     "namespace": "io.confluent.connect.oracle.xstream",
     "type": "record"
     }
   },
   {
     "default": null,
     "name": "transaction",
     "type": [
     "null",
      {
        "connect.name": "event.block",
        "connect.version": 1,
        "fields": [
         ...
        ],
        "name": "block",
        "namespace": "event",
        "type": "record"
      }
     ]
   },
   {
     "name": "op",
     "type": "string"
   },
   {
     "default": null,
     "name": "ts_ms",
     "type": [
     "null",
     "long"
     ]
   },
   {
     "default": null,
     "name": "ts_us",
     "type": [
     "null",
     "long"
     ]
   },
   {
     "default": null,
     "name": "ts_ns",
     "type": [
     "null",
     "long"
     ]
   }
  ],
  "name": "Envelope",
  "namespace": "cflt.SAMPLE.EMPLOYEES",
  "type": "record"
}

The value schema includes the following fields:

Field Description
before (Optional) Contains the state of the row before the change occurred. The name of this schema takes the form <topic_prefix>.<schema_name>.<table_name>.Value. This is null for read (snapshot), create and truncate events.
after (Optional) Contains the state of the row after the change occurred. The name of this schema takes the form <topic_prefix>.<schema_name>.<table_name>.Value. This is null for delete and truncate events.
transaction (Optional) Contains metadata information associated with the transaction, such as the transaction id. This field is not part of the current release.
op (Mandatory) Contains a string value describing the type of operation. It includes one of the following values: c (create or insert), u (update), d (delete), r (read, which indicates a snapshot), or t (truncate).
ts_ms (ts_us, ts_ns) (Optional) Contains the time (based on the system clock in the JVM that runs the Kafka Connect task) at which the connector generated the event.
source (Mandatory) Contains metadata about the source of the change event, such as the table name and the timestamp when the change occurred. The name of this schema is io.confluent.connect.oracle.xstream.Source.

The source block contains the following fields:

Field Description
connector Connector name
version Connector version
name Value of topic.prefix configuration property, for example, cflt
db Database name, for example, ORCLPDB1
schema Schema name, for example, SAMPLE
table Table name, for example, EMPLOYEES
snapshot If the event is part of an ongoing snapshot or not. For example, last
txId The transaction ID (n/a for snapshots). For example, 4.31.4721
ts_ms (ts_us, ts_ns) Timestamp when the record in the source database changed (for snapshots, the timestamp indicates when the snapshot occurred).
scn SCN of the change. For snapshots, this is the snapshot SCN, for example, 49066206
lcr_position Position of the LCR (null for snapshots). For example, 0000000002ed814d00000001000000010000000002ed814c000000010000000102
user_name Username who made the change (not applicable for snapshots)
row_id Row ID associated with the changed row (not applicable for snapshots)