Mastering Timestamp-Based CDC Hurdles: Solution Implementation

Introduction

In the execution phase of mastering Timestamp-Based Change Data Capture (CDC) hurdles, the focus lies on implementing INSERT, DELETE, and UPDATE operations in our PostgreSQL database and using the proven solution we discussed in the previous blog to achieve the comprehensive data replication. This solution will overcome the challenges (discussed in the previous blog) of using Timestamp-based Change Data Capture. We have explored some features in the Pentaho Data Integration (PDI) tools to achieve data replication to overcome the challenges in our previous blog. In this blog, we will explore the execution phase of the proven solution and its results.

As this blog post is the continuation part of the previous blog, if you have designed the CDC pipeline in the Pentaho Data Integration (PDI) toolset, you are all set to go. If you are viewing this series of blogs for the first time, you need to design the CDC pipeline of the solution to proceed further.

Execution of the CDC pipeline

Using this CDC pipeline, we will execute INSERT, DELETE and UPDATE operations and compare the data in both the tables (Source table and Target table) whether the data (changes) is replicated accordingly or not. Before that, we need to verify the below assumptions.

Assumptions

Before executing INSERT , DELETE and UPDATE operations using this CDC pipeline, let us assume

  • The tables of both Source and Target Databases have the same structure, comprising of these columns { id (primary key), name, last_update}
  • Last_update (Timestamp column) is updated.
  • Both the tables consist of 5 similar pre-existing records.
# Check the Data in Source table.
postgres=# select * from t1;
 id |  name  |        last_update         
----+--------+----------------------------
  1 | Akhil  | 2024-01-02 11:21:34.28668
  2 | Venkat | 2024-01-02 11:29:04.582673
  3 | Sai    | 2024-01-02 11:29:14.701115
  4 | Nalini | 2024-01-02 11:29:26.612513
  5 | Ram    | 2024-01-02 11:29:40.322224
(5 rows)

# Check the Data in Target table.
postgres=# select * from t2;
 id |  name  |        last_update         
----+--------+----------------------------
  1 | Akhil  | 2024-01-02 11:21:34.28668
  2 | Venkat | 2024-01-02 11:29:04.582673
  3 | Sai    | 2024-01-02 11:29:14.701115
  4 | Nalini | 2024-01-02 11:29:26.612513
  5 | Ram    | 2024-01-02 11:29:40.322224
(5 rows)
  • We can see the data in both the tables (Source & Target) is similar.

Insert operation

  • Insert a record into the source table and check the records in the table using the following codes.
# Insert a record into Source table.
postgres=# insert into t1 values (6,'Krishna',now());
INSERT 0 1

# Check the Data in Source table after INSERT operation.
postgres=# select * from t1;
 id |  name   |        last_update         
----+---------+----------------------------
  1 | Akhil   | 2024-01-02 11:21:34.28668
  2 | Venkat  | 2024-01-02 11:29:04.582673
  3 | Sai     | 2024-01-02 11:29:14.701115
  4 | Nalini  | 2024-01-02 11:29:26.612513
  5 | Ram     | 2024-01-02 11:29:40.322224
  6 | Krishna | 2024-01-02 13:11:57.373751
(6 rows)
  • Run the transformation in the Pentaho Data Integration (PDI) tool.
  • Compare the data in the target table with the source table using the following code.
# Verify the data in Target table. 
postgres=# select * from t2;
 id |  name   |        last_update         
----+---------+----------------------------
  1 | Akhil   | 2024-01-02 11:21:34.28668
  2 | Venkat  | 2024-01-02 11:29:04.582673
  3 | Sai     | 2024-01-02 11:29:14.701115
  4 | Nalini  | 2024-01-02 11:29:26.612513
  5 | Ram     | 2024-01-02 11:29:40.322224
  6 | Krishna | 2024-01-02 12:59:59.028452
(6 rows)
  • We can see the data is replicated successfully from the Source table to the Target table in case of INSERT operations. Let us verify DELETE the operation also.

Delete operation

  • Delete a record from the source table and check the records using the following codes.
# Delete a record from Source table.
postgres=# delete from t1 where id = 5;
DELETE 1

# Check the Data in Source table after DELETE operation.
postgres=# select * from t1;
 id |  name   |        last_update         
----+---------+----------------------------
  1 | Akhil   | 2024-01-02 11:21:34.28668
  2 | Venkat  | 2024-01-02 11:29:04.582673
  3 | Sai     | 2024-01-02 11:29:14.701115
  4 | Nalini  | 2024-01-02 11:29:26.612513
  6 | Krishna | 2024-01-02 13:11:57.373751
(5 rows)
  • Run the transformation in the Pentaho Data Integration (PDI) tool.
  • Compare the data in the target table with the source table using the following code.
# Verify the data in Target table.
postgres=# select * from t2;
 id |  name   |        last_update         
----+---------+----------------------------
  1 | Akhil   | 2024-01-02 11:21:34.28668
  2 | Venkat  | 2024-01-02 11:29:04.582673
  3 | Sai     | 2024-01-02 11:29:14.701115
  4 | Nalini  | 2024-01-02 11:29:26.612513
  6 | Krishna | 2024-01-02 13:11:57.373751
(5 rows)
  • We can see the data is deleted in the Target table accordingly. Let us move to the last operation UPDATE now.

Update operation

  • Update a record in the source table and check the records using the following statements.
# Update a record in the Source table. 
postgres=# update t1 set id=7,last_update=now() where id=4;
UPDATE 1

# Check the data in the Source table after UPDATE operation.
postgres=# select * from t1;
 id |  name   |        last_update         
----+---------+----------------------------
  1 | Akhil   | 2024-01-02 11:21:34.28668
  2 | Venkat  | 2024-01-02 11:29:04.582673
  3 | Sai     | 2024-01-02 11:29:14.701115
  6 | Krishna | 2024-01-02 13:11:57.373751
  7 | Nalini  | 2024-01-02 13:15:36.956686
(5 rows)
  • Run the transformation in the Pentaho Data Integration (PDI) tool.
  • Compare the data in the target table with the source table using the following code.
# Verify the data in Target table.
postgres=# select * from t2;
 id |  name   |        last_update         
----+---------+----------------------------
  1 | Akhil   | 2024-01-02 11:21:34.28668
  2 | Venkat  | 2024-01-02 11:29:04.582673
  3 | Sai     | 2024-01-02 11:29:14.701115
  6 | Krishna | 2024-01-02 13:11:57.373751
  7 | Nalini  | 2024-01-02 13:15:36.956686
(5 rows)

As we can see the data is updated in the Target table as per the changes in the Source table.

Note

  • While doing INSERT and UPDATE operations, ensure that we insert or update the value of timestamp columns along with other changes every time.

Summary

To sum up, we have implemented the CDC for INSERT, DELETE, and UPDATE operations in our PostgreSQL database. Using a Proven solution, we overcame the hurdles of Timestamp-based Change Data Capture mentioned in the previous blog.

We highly recommend you follow the series of topics around the Data Integration with PostgreSQL (the World’s Most Advanced Open Source Relational Database System) for better understanding. We have so far explored the Installation of the Pentaho Data Integration (PDI) tool and connection to the PostgreSQL database, learned about Change Data Capture and its methods and the implementation of Timestamp-based Change Data Capture, and mastered the hurdles of Timestamp-based Change Data Capture with the proven solution using PDI toolset. Try to set up a CDC for all INSERT, DELETE, and UPDATE operations in your PostgreSQL database as outlined here. Connect with us if you need any additional information.

Stay tuned !!

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>