Stream Changes from Oracle Database to Confluent Platform¶
The connector creates change events for database changes and sends them to Kafka topics. A change event
includes a key
and a value
, with their structures based on the schema of the table where the change occurred.
Every message in key and value consists of two components: a schema
and a payload
. The schema defines the
structure of the payload, and the payload holds the actual data.
Keys¶
The key includes a field for each column in the table’s primary key. If the table does not have a primary key but has a unique index, then the connector uses the column(s) from the unique index as the change event key(s).
For example, consider the following employees
table from the sample
database schema:
CREATE TABLE sample.employees (
id NUMBER(9) NOT NULL PRIMARY KEY,
first_name VARCHAR2(64) NOT NULL,
last_name VARCHAR2(64),
hire_date TIMESTAMP
);
The connector is configured with AVRO data format. The topic.prefix
configuration property is set to cflt
.
The schema describes the structure of the key column(s). In this example, there is a single field ID
of type int
corresponding to the id
column in the employees
table.
{
"connect.name": "cflt.SAMPLE.EMPLOYEES.Key",
"fields": [
{
"name": "ID",
"type": "int"
}
],
"name": "Key",
"namespace": "cflt.SAMPLE.EMPLOYEES",
"type": "record"
}
The payload contains the value of the key column(s).
{
"ID": 1
}
You can interpret this key as identifying the specific row in the sample.employees
table
(produced by the connector configured with the cflt
topic prefix) where the id
primary key
column has a value of 1
.
Values¶
The structure of the value in a change event message mirrors that of the key. It includes both a schema section, which describes the structure of the payload, and a payload section, which holds the actual data.
The schema of the change event’s values describes the envelope structure of the payload and the nested fields within it.
{
"connect.name": "cflt.SAMPLE.EMPLOYEES.Envelope",
"connect.version": 2,
"fields": [
{
"default": null,
"name": "before",
"type": [
"null",
{
"connect.name": "cflt.SAMPLE.EMPLOYEES.Value",
"fields": [
...
],
"name": "Value",
"type": "record"
}
]
},
{
"default": null,
"name": "after",
"type": [
"null",
"Value"
]
},
{
"name": "source",
"type": {
"connect.name": "io.confluent.connect.oracle.xstream.Source",
"fields": [
...
],
"name": "Source",
"namespace": "io.confluent.connect.oracle.xstream",
"type": "record"
}
},
{
"default": null,
"name": "transaction",
"type": [
"null",
{
"connect.name": "event.block",
"connect.version": 1,
"fields": [
...
],
"name": "block",
"namespace": "event",
"type": "record"
}
]
},
{
"name": "op",
"type": "string"
},
{
"default": null,
"name": "ts_ms",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "ts_us",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "ts_ns",
"type": [
"null",
"long"
]
}
],
"name": "Envelope",
"namespace": "cflt.SAMPLE.EMPLOYEES",
"type": "record"
}
The value schema includes the following fields:
Field | Description |
---|---|
before | (Optional) Contains the state of the row before the change occurred. The name of this schema takes the form <topic_prefix>.<schema_name>.<table_name>.Value . This is null for read (snapshot), create and truncate events. |
after | (Optional) Contains the state of the row after the change occurred. The name of this schema takes the form <topic_prefix>.<schema_name>.<table_name>.Value . This is null for delete and truncate events. |
transaction | (Optional) Contains metadata information associated with the transaction, such as the transaction id . This field is not part of the current release. |
op | (Mandatory) Contains a string value describing the type of operation. It includes one of the following values: c (create or insert), u (update), d (delete), r (read, which indicates a snapshot), or t (truncate). |
ts_ms (ts_us, ts_ns) | (Optional) Contains the time (based on the system clock in the JVM that runs the Kafka Connect task) at which the connector generated the event. |
source | (Mandatory) Contains metadata about the source of the change event, such as the table name and the timestamp when the change occurred. The name of this schema is io.confluent.connect.oracle.xstream.Source . |
The source block contains the following fields:
Field | Description |
---|---|
connector | Connector name |
version | Connector version |
name | Value of topic.prefix configuration property, for example, cflt |
db | Database name, for example, ORCLPDB1 |
schema | Schema name, for example, SAMPLE |
table | Table name, for example, EMPLOYEES |
snapshot | If the event is part of an ongoing snapshot or not. For example, last |
txId | The transaction ID (n/a for snapshots). For example, 4.31.4721 |
ts_ms (ts_us, ts_ns) | Timestamp when the record in the source database changed (for snapshots, the timestamp indicates when the snapshot occurred). |
scn | SCN of the change. For snapshots, this is the snapshot SCN, for example, 49066206 |
lcr_position | Position of the LCR (null for snapshots). For example, 0000000002ed814d00000001000000010000000002ed814c000000010000000102 |
user_name | Username who made the change (not applicable for snapshots) |
row_id | Row ID associated with the changed row (not applicable for snapshots) |
The change event’s values include fields for each column in the table. Following are the examples of different
types of change events that could be generated from the employees
table:
A read event is generated for snapshot records. Here’s an example of a read event (op=r
) generated
for a snapshot record. The after
field contains the values for the row at the time of the snapshot:
{
"before": null,
"after": {
"cflt.SAMPLE.EMPLOYEES.Value": {
"ID": 1,
"FIRST_NAME": "Jack",
"LAST_NAME": {
"string": "Reacher"
},
"HIRE_DATE": {
"long": 1704088800000000
}
}
}
...
}
A create event is generated when a record is created in the source table. Here’s an example of a
create event (op=c
) generated when a row is inserted. The after
field contains the values
inserted into the columns of the row:
{
"before": null,
"after": {
"cflt.SAMPLE.EMPLOYEES.Value": {
"ID": 2,
"FIRST_NAME": "Ethan",
"LAST_NAME": {
"string": "Hunt"
},
"HIRE_DATE": {
"long": 1711963800000000
}
}
}
...
}
An update event is generated when an existing record is updated in the source table. Here’s an example
of an update event (op=u
) generated when a row is updated. The before
field contains the previous
state of the row before the update, and the after
field contains the updated state of the row:
{
"before": {
"cflt.SAMPLE.EMPLOYEES.Value": {
"ID": 2,
"FIRST_NAME": "Ethan",
"LAST_NAME": {
"string": "Hunt"
},
"HIRE_DATE": {
"long": 1711963800000000
}
}
},
{
"after": {
"cflt.SAMPLE.EMPLOYEES.Value": {
"ID": 2,
"FIRST_NAME": "Ethan",
"LAST_NAME": {
"string": "Blake"
},
"HIRE_DATE": {
"long": 1711963800000000
}
}
}
...
}
The connector emits three events when the primary key column(s) of an existing row is updated:
- A DELETE event with the old key for the row.
- A tombstone event with the old key for the row.
- An INSERT event that provides the new key for the row.
A delete event is generated when a record is deleted from the source table. Here’s an example of a
delete event (op=d
) generated when a row is deleted. The before
field contains the previous
state of the row before it was deleted:
{
"before": {
"cflt.SAMPLE.EMPLOYEES.Value": {
"ID": 1,
"FIRST_NAME": {
"string": "Jack"
},
"LAST_NAME": {
"string": "Reacher"
},
"HIRE_DATE": {
"long": 1704088800000000
}
}
},
"after": null,
...
}
By default, the connector follows a delete event with a tombstone event (compaction) that has the
same key and a null
value. You can modify this behavior by setting the tombstones.on.delete
connector
configuration property.
A truncate event is generated when a source table is truncated.
The example below shows a truncate event (op=t
) generated when a table is truncated. The message key is null
, and both the before
and after
fields are set to null
:
{
"before": null,
"after": null,
...
}
By default, the connector does not capture truncate events. You can change this using the skipped.operations
configuration property.
Note
For change event topics with multiple partitions, there is no ordering guarantee for the change events (create, update, so on), or truncate events related to a table. Ordering is guaranteed only for change event topics that use a single partition.