PostgreSQL 14

Performance tuning with Apache Spark – Introduction

Introduction:

Welcome back to our ongoing series on Data transformation with Apache Spark! In our previous posts, we’ve covered essential topics like setting up Apache Spark on Ubuntu, integrating data with Spark, and querying datasets using Apache Drill. Now, we’re stepping into the realm of performance tuning—a critical aspect of maximizing the efficiency and throughput of your Spark applications. In this installment, we’ll look at the intricacies of fine-tuning Apache Spark for optimal performance, specifically focusing on optimizing the transformation of data from a Postgres table to Amazon S3.

Overview:

Apache Spark stands as a powerhouse for processing vast volumes of data. However, harnessing its full potential requires careful calibration of various parameters. In this blog post, we’ll lead you through the glance steps of performance optimization techniques tailored specifically to the task of seamlessly transferring data from a Postgres database to the cloud-based storage solution, Amazon S3.

Data Serialization:

Data serialization is the process of converting objects into a byte stream and vice versa. This is an optimization technique that allows data to be efficiently passed between different nodes in a cluster and stored in a variety of formats.  Spark uses different methods of serialization, we’ll focus on Kryo serialization:

Kryo Serialization:

Kryo serialization is a technique used by Spark to serialize objects more quickly. It’s designed to be fast and efficient

  • Kryo is faster and more compact than Java serialization, often up to 10 times faster. It also has a smaller memory footprint than Java serialization, which is important when shuffling and caching large amounts of data.
  • Kryo is used by default in Spark, but it requires custom registration and is not natively supported to serialize to the disk.
  • You must set a configuration property i.e. org.apache.spark.serializer.KryoSerializer if you want to set the Kryo serializer as part of a Spark job.

Real-Time Example: Optimizing with Kryo Serialization

Consider a Spark application that processes large datasets from a Postgres table, transforming and storing the results in S3. By switching from Java serialization to Kryo serialization, the application can achieve better performance. Here’s how you can enable Kryo serialization in your Spark configuration:

val sparkConf = new SparkConf() sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.registerKryoClasses(Array(classOf[YourClass])) val spark = SparkSession.builder().config(sparkConf).getOrCreate()

This configuration change can lead to faster processing times, especially when dealing with large datasets.

Caching:

Spark caching is a feature that helps improve the performance of Spark applications. It stores intermediate results in an accessible place that can be quickly recalled.

  • cache() and persist() are two methods used in this technique
  • cache() stores it in the memory, and persist() stores it in the user-defined storage level.
  •  RDD.Cache()would always store the data in memory. RDD.Persist() allows the storage of some part of data in the memory and some part on the disk.

Real-Time Example: Strategic Caching in Data Transformation

Imagine a scenario where your Spark job involves multiple transformations on data retrieved from a Postgres table before storing it in S3. If certain transformations are reused, caching intermediate datasets can significantly reduce the total execution time. Here’s an example of using caching effectively:

val dataset = spark.read.jdbc(postgresURL, "tableName", connectionProperties) val intermediateDataset = dataset.filter(...).cache()

This intermediate dataset can then be used in multiple actions without the need to recompute it from the original source, thus optimizing the overall performance.

Data Structure Tuning:

Data structure tuning in Apache Spark is a crucial optimization technique that significantly impacts the performance, efficiency, and scalability of Spark applications. The choice of data structure—RDDs (Resilient Distributed Datasets), DataFrames, and Datasets—plays a vital role in how Spark manages and processes data. Each data structure has its own set of features and optimizations that can either leverage Spark’s capabilities to the fullest or become a bottleneck if not used appropriately. Understanding the characteristics and optimal use cases for each data structure allows developers to design Spark applications that are both fast and resource-efficient.

RDDs (Resilient Distributed Datasets) are the fundamental data structure of Spark, providing low-level functionality and fine-grained control over data. They are immutable, distributed collections of objects that enable fault-tolerant, parallel data processing. RDDs are best suited for scenarios where you need full control over data processing and optimization, such as custom partitioning schemes or when performing operations that are not easily expressible with DataFrames or Datasets API.

DataFrames are a distributed collection of data organized into named columns, conceptually similar to tables in a relational database. Built on top of RDDs, DataFrames provide a higher-level API that leverages Spark’s Catalyst optimizer for improved performance and efficiency. DataFrames are ideal for handling structured and semi-structured data, allowing for efficient storage and processing of large datasets. They are particularly well-suited for SQL queries, data aggregation, and columnar operations.

Datasets are a type-safe version of DataFrames that provide the benefits of RDDs (type safety and object-oriented programming) with the optimization advantages of DataFrames. Datasets are particularly useful when you require type safety for compile-time checks or prefer working with strongly typed data structures in Scala or Java. They are also beneficial for custom transformations that are not easily expressed with DataFrame operations.

Practical Example: Data Structure Optimization

Imagine a Spark application that processes large volumes of structured log data stored in a distributed file system. The goal is to perform data cleansing, transformation, and aggregation to generate summary reports.

Initial Approach using RDDs:

val logsRDD = sc.textFile("path/to/logs") val cleanedLogsRDD = logsRDD.filter(_.contains("INFO")).map(transformLogEntry) // Perform aggregation val summaryRDD = cleanedLogsRDD.mapToPair(...).reduceByKey(...)

Optimized Approach using DataFrames:

val logsDF = spark.read.json("path/to/logs") val cleanedLogsDF = logsDF.filter($"level" === "INFO").selectExpr("transformLogEntry(columns)") // Perform aggregation using DataFrame API for better optimization val summaryDF = cleanedLogsDF.groupBy("date").agg(count("message"))

In this example, switching from RDDs to DataFrames allows the application to leverage Spark’s Catalyst optimizer for better performance during filtering, transformation, and aggregation operations. The declarative nature of the DataFrame API not only makes the code more concise but also enables Spark to optimize the execution plan more effectively.

Garbage collection tuning:

Garbage collection is an essential component of any Java-based application, and this is especially true for Apache Spark, which heavily relies on the Java Virtual Machine (JVM) to execute its code. GC is the process of reclaiming memory used by objects that are no longer needed by the application, thus preventing memory leaks and ensuring the efficient use of memory resources. However, aggressive or poorly optimized garbage collection can lead to increased pause times, affecting the overall throughput and latency of Spark applications.

In Java Virtual Machine (JVM)-based systems like Spark, garbage collection can sometimes become a bottleneck. This is particularly true for Spark applications that create and destroy a large number of objects rapidly, such as those involving iterative algorithms, large shuffles, or extensive data transformations. Excessive garbage collection can lead to:

  • Increased Pause Times: GC events can cause the application to pause, waiting for memory to be freed up. These pauses can degrade the performance of Spark applications, especially if they occur frequently or last for long durations.
  • Reduced Throughput: Frequent garbage collection can significantly reduce the throughput of Spark applications, as more CPU cycles are spent collecting garbage rather than executing application logic.
  • Jitter and Latency Variability: Inconsistent GC behaviour can introduce variability in application latency, making performance unpredictable and complicating the tuning of Spark applications.

Tuning Garbage Collection

Consider a Spark application experiencing high latency due to frequent garbage collection pauses. To address this, you might start by switching to the G1 garbage collector and adjusting the heap size based on the application’s memory usage patterns.

val sparkConf = new SparkConf() sparkConf.set("spark.driver.extraJavaOptions", "-XX:+UseG1GC -Xms4g -Xmx4g") sparkConf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC -Xms4g -Xmx4g") val spark = SparkSession.builder().config(sparkConf).appName("GCTunedApp").getOrCreate()

In this configuration:

  • -XX:+UseG1GC enables the G1 Garbage Collector, which is designed to offer a good balance between throughput and pause times, especially suitable for applications with large heaps.
  • -Xms4g and -Xmx4g set the initial and maximum heap size to 4GB, respectively, providing sufficient memory for the application’s needs while limiting the heap size to avoid excessive GC pauses.

Memory Management:

Apache Spark divides its memory between two main categories: execution memory and storage memory. This division is crucial for balancing the memory usage between computational needs and data storage within an application.

  • Execution Memory: Utilized for computational processes such as joins, shuffles, aggregations, and sorting. This region is dynamic; it can borrow space from storage memory when the demand for execution memory exceeds its initial allocation, provided the storage memory is not being used at its full capacity.
  • Storage Memory: Allocated for caching and persisting RDDs, DataFrames, and Datasets. Unlike execution memory, storage memory cannot borrow space from execution memory. This ensures that cached data remains in memory as long as possible, improving the performance of Spark applications by reducing the need to recompute or fetch data from disk.

Within the total memory allocated to Spark tasks, there’s a shared region where execution and storage memory can coexist. However, there’s a protected sub-region within the storage memory (often referred to as R), designed to safeguard a portion of the cached data from being evicted by execution tasks.

Consider a Spark application that performs complex data transformations requiring extensive shuffling and sorting, alongside caching significant datasets for iterative processing. To optimize the performance of this application, you might adjust both the spark.memory.fraction and spark.executor.memory settings to ensure sufficient memory is allocated to both execution and storage tasks.

val sparkConf = new SparkConf()
// Increase total executor memory to 4GB
sparkConf.set("spark.executor.memory", "4g")
// Increase the memory fraction to 75% of the executor memory
sparkConf.set("spark.memory.fraction", "0.75")
// Apply the configuration to SparkSession
val spark = SparkSession.builder().config(sparkConf).appName("MemoryOptimizedApp").getOrCreate()

In this configuration, increasing the executor memory to 4GB and the memory fraction to 75% ensures that more memory is available for both execution and storage, reducing the need for spilling data to disk and improving overall application performance.

Conclusion:

Effective memory management is a cornerstone of optimizing Apache Spark applications for better performance. By carefully configuring memory allocation and leveraging dynamic allocation, you can significantly enhance the efficiency of data transformation processes, such as moving data from a Postgres table to Amazon S3. These strategies ensure that your Spark applications utilize available memory resources optimally, minimizing disk I/O and accelerating processing times for large-scale data analytics tasks.

Using pg_index_watch for PostgreSQL Indexing

Let’s delve into exploring pg_index_watch. In this instalment, I will guide you through the rationale behind its creation and explain its operational nuances.

Meet pg_index_watch – a utility for automagical rebuild of bloated indexes, an absolutely handly tool designed to tackle index bloat on PostgreSQL tables with high velocity of UPDATEs.

Index bloat on frequently updated tables is a common problem in PostgreSQL. Even with autovacuum, which is built-in, index bloat often persists despite different settings. pg_index_watch steps in to address this by automatically rebuilding indexes when necessary, offering a solution where autovacuum falls short.

PostgreSQL 12 brought about the introduction of REINDEX CONCURRENTLY, offering a safer method to rebuild indexes and consequently reduce bloat. However, a lingering question remained:

How do we determine when an index is bloated and if it needs to be rebuilt?

We require a straightforward statistical model to help us gauge the level of index bloat without the necessity of conducting a thorough review of the entire index. 

pg_index_watch tackles this challenge with a unique strategy:

Utilizing PostgreSQL’s capabilities, pg_index_watch leverages two key metrics:

  • The count of rows within the index (accessible via pg_class.reltuples)
  • Size of the index itself

By monitoring this ratio, pg_index_watch can detect when an index is bloated – indicated by a doubling of the ratio from its regular state. This signals that the index is nearing a 100% bloat rate, prompting pg_index_watch to spring into action, initiating a reindexing process without requiring manual intervention from database administrators.

For installing and using pg_index_watch, you’ll need:

  • PostgreSQL version 12.0 or higher
  • Superuser access and Passwordless access to the database as a superuser 

Recommendations:

If your server can handle it, consider setting up multiple (i.e., non-zero) max_parallel_maintenance_workers to speed up index rebuilds.

Installation (as a PostgreSQL user): 

Obtain the pg_index_watch code by cloning the repository from GitHub using the command

git clone https://github.com/dataegret/pg_index_watch

Navigate into the pg_index_watch directory. 

Then, execute the SQL script:

index_watch_tables.sql
to create the necessary table structure, followed by importing the code, including stored procedures, using index_watch_functions.sql via:

psql -1 -d osdb_index_bloat -f index_watch_functions.sql

During the initial launch of pg_index_watch, it’s imperative to prepare for a significant transformation. At this crucial stage, all indexes exceeding 10MB (default setting) undergo a comprehensive rebuilding process. This meticulous approach guarantees that your indexes are finely tuned for peak performance.

However, be mindful that this process may require several hours or even days to complete, particularly for larger databases. We recommend initiating this process manually to ensure a seamless transition.

nohup psql -d osdb_index_bloat -qt -c "CALL index_watch.periodic(TRUE);" >> index_watch.log

Continuous Monitoring: 

Following the initial launch, pg_index_watch seamlessly transitions into continuous monitoring mode. From this point forward, only newly created or bloated indexes trigger alerts, ensuring ongoing database optimisation. Furthermore, pg_index_watch offers the flexibility to utilise current index sizes as a baseline, further enhancing its adaptability to your database’s evolving needs.

Setting Up Cron:

To uphold consistent performance, establishing a cron schedule for regular monitoring is essential. Whether scheduling daily checks at midnight or hourly scans during periods of high activity, cron ensures that pg_index_watch remains vigilant without impeding critical maintenance tasks.

00 * * * * psql -d osdb_index_bloat -AtqXc "select not pg_is_in_recovery();" | grep -qx t || exit; psql -d osdb_index_bloat -qt -c "CALL index_watch.periodic(TRUE);"

In Conclusion

With pg_index_watch by your side, attaining peak PostgreSQL performance becomes a straightforward endeavour. As you embark on this path, have confidence that your database’s well-being and effectiveness are being diligently monitored. Embrace the benefits of continuous monitoring, and witness the remarkable enhancements that pg_index_watch brings to your PostgreSQL databases.

PostgreSQL User Management: Best Practices & Security Nuances

PostgreSQL – The World’s Most Advanced Open Source Relational Database and stands out as a robust and feature-rich solution, offering extensive capabilities for user management. Effective user management is important for ensuring data security, integrity, and accessibility within the platform. Let’s explore the intricacies of PostgreSQL user management, security considerations, and best practices to empower DBAs and use these features effectively.

User Management:

At its core, PostgreSQL employs a role-based access control (RBAC) system for managing users and their privileges. Roles can be categorized into two main types: database roles and login roles. Database roles define permissions within a specific database, while login roles grant access to the PostgreSQL cluster as a whole.

Security Considerations:

PostgreSQL prioritizes security, offering robust mechanisms to safeguard sensitive data and resources. Key security considerations in PostgreSQL user management include:

  • Password Management: Encouraging strong password policies and regularly rotating passwords helps mitigate the risk of unauthorized access. PostgreSQL supports various authentication methods, including latest and greatest scram-sha256 password authentication, certificate-based authentication, LDAP authentication and many more.
  • Role Privileges: Granting roles with minimal privileges necessary for their respective tasks minimizes the potential impact of security breaches. PostgreSQL’s fine-grained privilege system allows granular control over database objects, schemas, and system functions.
  • Encryption: DBAs can leverage PostgreSQL’s built-in encryption features, such as SSL/TLS encryption for secure connections. Transparent Data Encryption (TDE) for data-at-rest encryption, enhances data protection against interception and unauthorized access, although, this feature is not available in PostgreSQL Core, there’re extensions and forks that offer Postgres + TDE.
  • Auditing and Logging: Enabling comprehensive auditing and logging mechanisms provides visibility into user activity, facilitating the detection and investigation of security incidents. PostgreSQL’s logging facilities allow administrators to track authentication attempts, SQL queries, and database modifications.

Internal Nuances:

PostgreSQL’s internal architecture influences user management in several ways, including:

  • pg_catalog Schema: The pg_catalog schema houses system catalogs containing metadata about database objects, roles, and privileges. Understanding the structure and contents of the pg_catalog schema is essential for effective user management and troubleshooting.
  • Understanding Roles: PostgreSQL allows roles to be shared across multiple databases within a cluster (shared roles). DBAs must carefully consider the implications of role scope when designing user management strategies. A well-refined ACL Policy document across the Database deployment is very important.
  • Role Hierarchy: PostgreSQL supports role inheritance, allowing roles to inherit privileges from other roles. This hierarchical relationship simplifies user management by facilitating role composition and delegation of responsibilities.

Best Practices:

To optimize PostgreSQL user management, consider implementing the following best practices:

  • Role Segregation: Segregate user roles based on their functional responsibilities, such as administrators, developers, and analysts, to enforce the principle of least privilege and minimize the risk of unauthorized access.
  • Regular Audits: Conduct regular audits of user roles, privileges, and access patterns to identify potential security vulnerabilities or deviations from established policies. Automated tools and scripts can streamline the auditing process and ensure compliance with security standards.
  • Role Revocation: Immediately revoke access for users or roles that are no longer required or have been compromised. Timely removal of unnecessary privileges reduces the attack surface and mitigates the risk of unauthorized data access or manipulation.
  • Continuous Education: Educate DBAs, DevOps, Developers, and end-users on best practices for user management, password security, and data access control. Regular training sessions and knowledge-sharing initiatives foster a culture of security awareness and accountability.

In conclusion, PostgreSQL user management is a multifaceted aspect of database administration that requires careful consideration of security principles, internal mechanisms, and best practices. By implementing robust security measures, understanding PostgreSQL’s internal nuances, and adhering to established best practices, organizations can effectively manage user access, protect sensitive data, and maintain the integrity of their PostgreSQL databases. With a proactive approach to user management, PostgreSQL administrators can confidently navigate the complexities of database security and ensure the resilience of their database environments.

Open-source Data Engineering with PostgreSQL

Blog-4: Apache Drill Magic across PostgreSQL, Local Parquet, and S3

INTRODUCTION:

Welcome back! Following our exploration of data movement between PostgreSQL and Amazon S3 in the previous blog, we now venture into the realm of querying with Apache Drill. In this sequel, we’ll demonstrate the simplicity of querying PostgreSQL tables, local Parquet files, and Parquet files on S3. Join us as we seamlessly connect diverse data sources using the power of Apache Drill and SQL. Let’s continue our data journey.

Configuring the Apache Drill for PostgreSQL

To begin querying PostgreSQL data, the first step is to add the PostgreSQL storage plugin.

  • Open Apache drill web UI using http://localhost:8047/storage. Here we need to add this code as a PostgreSQL plugin providing the necessary details such as database URL, username, and password. Save the configuration, now Apache Drill can recognize PostgreSQL as a data source.
  • An alternate approach is Apache Drill, which also supports adding storage plugins through CLI(command line interface. we can add the plugin to storage-plugins-override.conf file.
{
  "type": "jdbc",
  "enabled": true,
  "driver": "org.postgresql.Driver",
  "url": "jdbc:postgresql://localhost:5432/spark_db",
  "username": "postgres",
  "password": "postgres",
  "sourceParameters": {
    "minimumIdle": 0,
    "autoCommit": true,
    "connectionTestQuery": "select version() as postgresql_version",
    "dataSource.cachePrepStmts": true,
    "dataSource.prepStmtCacheSize": 250
  }

}

  • Now we are ready to explore SQL power across PostgreSQL, We can visualize the PostgreSQL across Drill using commands such as SHOW SCHEMASSHOW DATABASES;

  • We can use the specific schema as per our need (example ‘public’) using the command ‘USE postgresql.public’, then we can see the tables present in that database using the ‘SHOW TABLES’ command

  • We can now run the SQL query to read the data in the desired table

Working on Local Parquet Files:

  • Now, let’s move on to querying Parquet files stored locally
  • For querying parquet files we don’t need any specific plugins to be added as it has parquet file support inbuilt
  • The beauty of Drill is that it allows direct querying for extracting data from parquet format and displaying the query output in a  readable format

Working on S3 parquet Files:

  • As we previously mentioned the Parquet files reading doesn’t need any specific configuration; and Drill seamlessly interacts with S3 as well, but a few things have to be updated in the existing configuration file like the AWS Access Key ID, the Secret Key, and the bucket name for Drill to be able to access data stored in Amazon S3. 
  • Here is the query for extracting data from Parquet files in the S3 bucket

Notice the simplicity – no complex configurations are needed. We can effortlessly translate the data to formats with SQL queries using Apache Drill.

Upcoming??

In this blog, we delved into the magic of Apache Drill, starting our journey from configuring PostgreSQL storage plugins to minimal configuration updates for S3 interactions, we highlighted the simplicity of querying the PostgreSQL, local Parquet files, and S3-residing Parquet files.

Stay tuned for the final installment in our series! The upcoming blog focuses on optimizing performance using Apache Spark and Apache Drill. Get ready to elevate your data engineering game with insights into enhancing query speed and maximizing efficiency. Until then, happy querying and exploring.

Open-source Data Engineering with PostgreSQL

Blog-3: Data Loading with Apache Spark

INTRODUCTION:

Welcome to the next installment of our series on Open-source Data Engineering with PostgreSQL. In this blog, we’ll delve into the practicalities of transforming table data from PostgreSQL into the Parquet format and storing it in an S3 bucket. Leveraging Python, Pandas, Apache Spark, and the power of open-source tools, we aim to guide you through a seamless data transformation process.

Installing Required Libraries:

Firstly, we ensure that you have the necessary libraries installed to run the following commands to install Pandas, Apache Spark, and boto3

pip install pandas
pip install pyspark
pip install pyarrow
pip install psycopg2
pip install sqlalchemy
pip install boto3

Python code for Transformation:

Here is the Python code for transforming PostgreSQL data into the Parquet format locally along with the timestamp and compression of data to Gzip

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from sqlalchemy import create_engine
import datetime

# Connection parameters for PostgreSQL
db_params = {
    'host': 'localhost',
    'port': 5432,
    'database': 'spark_db',
    'user': 'postgres',
    'password': 'postgres'
}

# Table name in PostgreSQL
postgres_table_name = 'trail_tab'

# Parquet file path
parquet_output_path = 'table1.parquet'

# Chunk size
chunk_size = 1000000

# Start measuring time
total_starttime = datetime.datetime.now()

# Create a connection to PostgreSQL
engine = create_engine(f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}")

# Get the total number of rows in the PostgreSQL table
total_rows = pd.read_sql_query(f"SELECT COUNT(*) FROM {postgres_table_name}", engine).iloc[0, 0]

# Initialize an empty DataFrame to hold the combined data
combined_df = pd.DataFrame()

# Loop through chunks
for chunk_index, chunk_start in enumerate(range(0, total_rows, chunk_size)):

    # Read data from PostgreSQL table into a Pandas DataFrame for the current chunk
    query = f"SELECT * FROM {postgres_table_name} OFFSET {chunk_start} LIMIT {chunk_size}"
    postgres_df = pd.read_sql_query(query, engine)

    # Concatenate the current chunk to the combined DataFrame
    combined_df = pd.concat([combined_df, postgres_df], ignore_index=True)

# Write the combined DataFrame to a single Parquet file
pq.write_table(pa.Table.from_pandas(combined_df), parquet_output_path)

# End measuring time
total_endtime = datetime.datetime.now()

# Calculate and print the total time taken
total_time_taken = total_endtime - total_starttime
print(f"Total time taken: {total_time_taken}")
print(f"Data from PostgreSQL table '{postgres_table_name}' has been successfully written to a single Parquet file.")

Here is the output for the above code execution:

Python code for transforming PostgreSQL data to S3 bucket:

# This Python code to parse PostgreSQL data to S3 in Parquet
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from sqlalchemy import create_engine
import datetime
import boto3
from io import BytesIO

# Connection parameters for PostgreSQL
db_params = {
    'host': 'localhost',
    'port': 5432,
    'database': 'spark_db',
    'user': 'postgres',
    'password': 'postgres'
}

# Table name in PostgreSQL
postgres_table_name = 'trail_tab'

# Parquet file path (local temporary path)
parquet_local_path = 'table1_combined.parquet'

# S3 bucket and key
s3_bucket = 'table-parquetbucket'
s3_key = 'table1_combined.parquet'

# Chunk size
chunk_size = 1000000

# Start measuring time
total_starttime = datetime.datetime.now()

# Create a connection to PostgreSQL
engine = create_engine(f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}")

# Get the total number of rows in the PostgreSQL table
total_rows = pd.read_sql_query(f"SELECT COUNT(*) FROM {postgres_table_name}", engine).iloc[0, 0]

# Initialize an empty DataFrame to hold the combined data
combined_df = pd.DataFrame()

# Loop through chunks
for chunk_index, chunk_start in enumerate(range(0, total_rows, chunk_size)):

    # Read data from PostgreSQL table into a Pandas DataFrame for the current chunk
    query = f"SELECT * FROM {postgres_table_name} OFFSET {chunk_start} LIMIT {chunk_size}"
    postgres_df = pd.read_sql_query(query, engine)

    # Concatenate the current chunk to the combined DataFrame
    combined_df = pd.concat([combined_df, postgres_df], ignore_index=True)

# Write the combined DataFrame to a local Parquet file
pq.write_table(pa.Table.from_pandas(combined_df), parquet_local_path)

# Upload the local Parquet file to S3
s3_client = boto3.client('s3', region_name='ap-south-1')
with open(parquet_local_path, 'rb') as data:
    s3_client.upload_fileobj(data, s3_bucket, s3_key)

# End measuring time
total_endtime = datetime.datetime.now()

# Calculate and print the total time taken
total_time_taken = total_endtime - total_starttime
print(f"Total time taken: {total_time_taken}")
print(f"Data from PostgresSQL '{postgres_table_name}' has been successfully written to the S3 Bucket.")

Here is the output for the above code execution

Python code for transforming Parquet format data to a new PostgreSQL table :

 # This Python code to parse Parquet data to PostgreSQL new table
import pandas as pd
import pyarrow.parquet as pq
from sqlalchemy import create_engine
import datetime
# Connection parameters for PostgreSQL
db_params = {
    'host': 'localhost',
    'port': 5432,
    'database': 'db_1',
    'user': 'postgres',
    'password': 'postgres'
}
# Table name in PostgreSQL
postgres_table_name = 'parquetinput'
# Parquet file path
parquet_input_path = 'compressedtable1.parquet'

# Chunk size for insertion into PostgreSQL
insert_chunk_size = 100000
# Start measuring time
total_starttime = datetime.datetime.now()
# Create a connection to PostgreSQL
engine = create_engine(f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}")
# Read the Parquet file into a Pandas DataFrame
parquet_data = pq.read_table(parquet_input_path).to_pandas()
# Chunked insertion into PostgreSQL table
for chunk_start in range(0, len(parquet_data), insert_chunk_size):
    chunk_end = min(chunk_start + insert_chunk_size, len(parquet_data))
    chunk_df = parquet_data.iloc[chunk_start:chunk_end]
   # Use pandas to_sql function to insert the chunk into the PostgreSQL table
    chunk_df.to_sql(postgres_table_name, engine, if_exists='append', index=False)
# End measuring time
total_endtime = datetime.datetime.now()
# Calculate and print the total time taken
total_time_taken = total_endtime - total_starttime
print(f"Total time taken: {total_time_taken}")
print(f"Parquet data from file '{parquet_input_path}' has been successfully loaded into the new PostgreSQL table '{postgres_table_name}'.")

Here is the Output for this code execution:

Python code for parsing S3 bucket data back to a PostgreSQL table:

# This Python code to parse S3 data back to new PostgreSQL table
import pandas as pd
import pyarrow.parquet as pq
from sqlalchemy import create_engine
import datetime
import boto3
from io import BytesIO

# Connection parameters for PostgreSQL
db_params = {
    'host': 'localhost',
    'port': 5432,
    'database': 'db_1',
    'user': 'postgres',
    'password': 'postgres'
}

# S3 bucket and key
s3_bucket = 'table-parquetbucket'
s3_key = 'table1_combined.parquet'

# Parquet file path (local temporary path)
parquet_local_path = 'table1_combined.parquet'

# Table name in PostgreSQL
postgres_table_name = 't1'

# Chunk size for insertion into PostgreSQL
insert_chunk_size = 100000

# Start measuring time
total_starttime = datetime.datetime.now()

# Download the Parquet file from S3
s3_client = boto3.client('s3', region_name='ap-south-1')
s3_client.download_file(s3_bucket, s3_key, parquet_local_path)

# Read the Parquet file into a Pandas DataFrame
parquet_data = pq.read_table(parquet_local_path).to_pandas()

# Create a connection to PostgreSQL
engine = create_engine(f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}")

# Chunked insertion into PostgreSQL table
for chunk_start in range(0, len(parquet_data), insert_chunk_size):
    chunk_end = min(chunk_start + insert_chunk_size, len(parquet_data))
    chunk_df = parquet_data.iloc[chunk_start:chunk_end]

    # Use pandas to_sql function to insert the chunk into the PostgreSQL table
    chunk_df.to_sql(postgres_table_name, engine, if_exists='append', index=False)

# End measuring time
total_endtime = datetime.datetime.now()

# Calculate and print the total time taken
total_time_taken = total_endtime - total_starttime
print(f"Total time taken: {total_time_taken}")
print(f"Parquet data from S3 has been successfully loaded into the new PostgreSQL table '{postgres_table_name}'.")

Here is the Output

What’s Next!

We are in no mood to wrap up this blog series, with the steps outlined in this blog post, you can successfully learn the intricate process of seamlessly transferring data from PostgreSQL to Parquet files, storing them on Amazon S3, and bringing them back into PostgreSQL tables. The journey doesn’t end here, in our next series we’ll shift our focus to the exciting realm of querying using Apache Drill. Stay tuned as we dive into empowering business analysts with the versatile Apache Drill.

Open-source Data Engineering with PostgreSQL

Blog-2: Installation and Setup on Ubuntu

INTRODUCTION:

Welcome back to the series on Open-source Data Engineering with PostgreSQL. In this post, we shall delve into the installation and configuration of Apache Spark and Apache Drill on an Ubuntu environment. Our aim is to guide you through each step, building upon the foundational concepts we’ve previously covered. By following this comprehensive walkthrough, you’ll have all the necessary prerequisites for a streamlined installation process, setting the stage for an optimized data engineering environment that facilitates efficient data loading and exploration. Let’s dive in and set the stage for a seamlessly integrated experience.

Prerequisites:

Before we embark on the installation journey, make sure your Ubuntu operating  system meets the following prerequisites:

  • Sufficient Disk storage
  • Apache Spark and Apache Drill require Java version Java 8 or Java 11 to run
    • Here is the command to install Java if it is not already installed
      • sudo apt-get install openjdk-8-jdk
  • Apache spark interacts with various data sources, including PostgreSQL. To connect to a Postgres database, we need a PostgreSQL JDBC Driver; we can download it from hhttps://jdbc.postgresql.org/download/
  • Place the PostgreSQL JDBC Driver JAR file in this location /opt/spark/jars so that is accessible to Apache Spark.

Installation and setup

Apache Spark:

  • Download the latest Apache Spark utility (version 3.5.0 as of this writing) from this link https://spark.apache.org/downloads.html, we can download any other available version as per the need.
  • First, we will update using the command
    sudo apt-get update
    sudo apt-get upgrade
  • Use this command to extract the tarball and before that change the directory where we want to extract spark
    cd /path/to/your/desired/directory
    tar -xzf spark-3.5.0-bin-hadoop3.tar.gz
  • Update your .bashrc file with the following environment variables
    export HADOOP_HOME=/path/to/your/hadoop
    export SPARK_HOME=/path/to/your/spark
  • Navigate to the Spark directory and run the following command, we can see the Spark working directory
    cd /path/to/your/spark
    spark-shell

Apache Drill:

  • Download the latest version of Apache Drill (version 1.19.0 as of this writing) using
    wget https://www.apache.org/dyn/closer.cgi/drill/drill-1.19.0/apache-drill-1.19.0.tar.gz
  • Firstly we will run 
    sudo apt-get update
    sudo apt-get upgrade
  • Use the following command to extract the tarball at the desired directory 
    cd /path/to/your/desired/directory
    tar -xvf apache-drill-1.19.0.tar.gz
  • Navigate to the Drill’s bin directory and run any of the following commands to start the Drill shell or the sqlline shell:
    ./drill-embedded
    ./sqlline
    ./sqlline -u jdbc:drill:zk=local

What’s next:

This installation guide has equipped you with the essential steps to set up Apache Spark and Apache Drill on your Ubuntu environment. By following these instructions, you’ve laid a solid foundation for efficient data engineering, enabling seamless data loading and exploration.

As you embark on your journey with Spark and Drill, remember that this is just the beginning. In upcoming blogs, we will delve deeper into advanced configurations, optimization strategies, and real-world use cases. Stay tuned for more insights and practical tips that will elevate your data engineering experience. The upcoming blogs will provide valuable insights for harnessing the full potential of Apache Spark and Apache Drill in your data projects. Happy exploring!

Open-source Data Engineering with PostgreSQL

Overview – A Curtain raiser

Introduction:

In the ever-evolving landscape of Data management, organizations are constantly seeking efficient ways to handle, transform, and query massive datasets. Data Archiving has become an important component of Data Engineering in the ever-evolving landscape of Data Management which constitutes efficient methods to handle, transform, and query massive datasets, including the archived ones. This blog series aims to focus on a robust solution using Open-source tools such as Apache Spark and Apache Drill. Throughout this series, we will delve into the intricacies of transforming table data into the Parquet format (columnar storage optimized for analytics) and using Apache Drill to query this data seamlessly. The important pillars of this blog series would be around:

Open-source tools

In the architecture described for archiving and data engineering, two key open-source tools play pivotal roles: Apache Spark and Apache Drill. Together, they form a robust foundation for enabling efficient data loading and exploration within a scalable and flexible environment.

Archiving

As organizations accumulate massive volumes of data, archiving becomes imperative for both cost optimization and performance enhancement. Parquet, a highly efficient columnar storage format, has gained prominence due to its ability to compress and store data in a way that facilitates fast analytics. In this series, we delve into the motivations behind archiving data and how Parquet addresses these challenges.

Legacy Datasets

A key emphasis of this series is on how data archiving plays a pivotal role in liberating production environments from the burden of legacy datasets. By effectively storing and archiving historical data in a structured and efficient format, organizations not only ensure smoother operational workflows but also unlock the potential for enhanced analytics.

Choosing Parquet format

Parquet is chosen for its superiority in analytical data storage due to its columnar storage architecture. This format excels in optimizing query performance through efficient compression techniques like predicate pushdown, run-length encoding, and dictionary encoding, reducing storage requirements and speeding up query processing. Its flexibility in schema evolution allows seamless modifications, ensuring compatibility across different versions for smooth system upgrades.

Apache Spark:

  • Apache Spark is a powerful open-source data processing engine known for its distributed computing capabilities.
  • In this context, Apache Spark serves as the backbone for the data-loading process. Its scalability and ability to handle diverse data sources make it well-suited for efficiently extracting, transforming, and loading (ETL) data into the desired format, which, in this case, involves archiving data into the Parquet format.

Apache Drill:

  • Apache Drill is a schema-free SQL query engine designed for exploring and querying large-scale datasets.
  •  In the described setup, it is instrumental in querying and extracting insights from the Parquet-archived data.
  • The schema-free nature of Apache Drill aligns well with the flexibility of the Parquet format, allowing for seamless querying and analysis without strict schema constraints.

As we conclude this introductory chapter of our blog series on transforming and archiving legacy datasets with Apache Spark, Parquet files, and Apache Drill, we’ve only just scratched the surface.

In the upcoming articles, we’ll walk you through practical implementation strategies, share real-world case studies.

Stay tuned for more!

Mastering Timestamp-Based CDC Hurdles: Solution Implementation

Introduction

In the execution phase of mastering Timestamp-Based Change Data Capture (CDC) hurdles, the focus lies on implementing INSERT, DELETE, and UPDATE operations in our PostgreSQL database and using the proven solution we discussed in the previous blog to achieve the comprehensive data replication. This solution will overcome the challenges (discussed in the previous blog) of using Timestamp-based Change Data Capture. We have explored some features in the Pentaho Data Integration (PDI) tools to achieve data replication to overcome the challenges in our previous blog. In this blog, we will explore the execution phase of the proven solution and its results.

As this blog post is the continuation part of the previous blog, if you have designed the CDC pipeline in the Pentaho Data Integration (PDI) toolset, you are all set to go. If you are viewing this series of blogs for the first time, you need to design the CDC pipeline of the solution to proceed further.

Execution of the CDC pipeline

Using this CDC pipeline, we will execute INSERT, DELETE and UPDATE operations and compare the data in both the tables (Source table and Target table) whether the data (changes) is replicated accordingly or not. Before that, we need to verify the below assumptions.

Assumptions

Before executing INSERT , DELETE and UPDATE operations using this CDC pipeline, let us assume

  • The tables of both Source and Target Databases have the same structure, comprising of these columns { id (primary key), name, last_update}
  • Last_update (Timestamp column) is updated.
  • Both the tables consist of 5 similar pre-existing records.
# Check the Data in Source table.
postgres=# select * from t1;
 id |  name  |        last_update         
----+--------+----------------------------
  1 | Akhil  | 2024-01-02 11:21:34.28668
  2 | Venkat | 2024-01-02 11:29:04.582673
  3 | Sai    | 2024-01-02 11:29:14.701115
  4 | Nalini | 2024-01-02 11:29:26.612513
  5 | Ram    | 2024-01-02 11:29:40.322224
(5 rows)

# Check the Data in Target table.
postgres=# select * from t2;
 id |  name  |        last_update         
----+--------+----------------------------
  1 | Akhil  | 2024-01-02 11:21:34.28668
  2 | Venkat | 2024-01-02 11:29:04.582673
  3 | Sai    | 2024-01-02 11:29:14.701115
  4 | Nalini | 2024-01-02 11:29:26.612513
  5 | Ram    | 2024-01-02 11:29:40.322224
(5 rows)
  • We can see the data in both the tables (Source & Target) is similar.

Insert operation

  • Insert a record into the source table and check the records in the table using the following codes.
# Insert a record into Source table.
postgres=# insert into t1 values (6,'Krishna',now());
INSERT 0 1

# Check the Data in Source table after INSERT operation.
postgres=# select * from t1;
 id |  name   |        last_update         
----+---------+----------------------------
  1 | Akhil   | 2024-01-02 11:21:34.28668
  2 | Venkat  | 2024-01-02 11:29:04.582673
  3 | Sai     | 2024-01-02 11:29:14.701115
  4 | Nalini  | 2024-01-02 11:29:26.612513
  5 | Ram     | 2024-01-02 11:29:40.322224
  6 | Krishna | 2024-01-02 13:11:57.373751
(6 rows)
  • Run the transformation in the Pentaho Data Integration (PDI) tool.
  • Compare the data in the target table with the source table using the following code.
# Verify the data in Target table. 
postgres=# select * from t2;
 id |  name   |        last_update         
----+---------+----------------------------
  1 | Akhil   | 2024-01-02 11:21:34.28668
  2 | Venkat  | 2024-01-02 11:29:04.582673
  3 | Sai     | 2024-01-02 11:29:14.701115
  4 | Nalini  | 2024-01-02 11:29:26.612513
  5 | Ram     | 2024-01-02 11:29:40.322224
  6 | Krishna | 2024-01-02 12:59:59.028452
(6 rows)
  • We can see the data is replicated successfully from the Source table to the Target table in case of INSERT operations. Let us verify DELETE the operation also.

Delete operation

  • Delete a record from the source table and check the records using the following codes.
# Delete a record from Source table.
postgres=# delete from t1 where id = 5;
DELETE 1

# Check the Data in Source table after DELETE operation.
postgres=# select * from t1;
 id |  name   |        last_update         
----+---------+----------------------------
  1 | Akhil   | 2024-01-02 11:21:34.28668
  2 | Venkat  | 2024-01-02 11:29:04.582673
  3 | Sai     | 2024-01-02 11:29:14.701115
  4 | Nalini  | 2024-01-02 11:29:26.612513
  6 | Krishna | 2024-01-02 13:11:57.373751
(5 rows)
  • Run the transformation in the Pentaho Data Integration (PDI) tool.
  • Compare the data in the target table with the source table using the following code.
# Verify the data in Target table.
postgres=# select * from t2;
 id |  name   |        last_update         
----+---------+----------------------------
  1 | Akhil   | 2024-01-02 11:21:34.28668
  2 | Venkat  | 2024-01-02 11:29:04.582673
  3 | Sai     | 2024-01-02 11:29:14.701115
  4 | Nalini  | 2024-01-02 11:29:26.612513
  6 | Krishna | 2024-01-02 13:11:57.373751
(5 rows)
  • We can see the data is deleted in the Target table accordingly. Let us move to the last operation UPDATE now.

Update operation

  • Update a record in the source table and check the records using the following statements.
# Update a record in the Source table. 
postgres=# update t1 set id=7,last_update=now() where id=4;
UPDATE 1

# Check the data in the Source table after UPDATE operation.
postgres=# select * from t1;
 id |  name   |        last_update         
----+---------+----------------------------
  1 | Akhil   | 2024-01-02 11:21:34.28668
  2 | Venkat  | 2024-01-02 11:29:04.582673
  3 | Sai     | 2024-01-02 11:29:14.701115
  6 | Krishna | 2024-01-02 13:11:57.373751
  7 | Nalini  | 2024-01-02 13:15:36.956686
(5 rows)
  • Run the transformation in the Pentaho Data Integration (PDI) tool.
  • Compare the data in the target table with the source table using the following code.
# Verify the data in Target table.
postgres=# select * from t2;
 id |  name   |        last_update         
----+---------+----------------------------
  1 | Akhil   | 2024-01-02 11:21:34.28668
  2 | Venkat  | 2024-01-02 11:29:04.582673
  3 | Sai     | 2024-01-02 11:29:14.701115
  6 | Krishna | 2024-01-02 13:11:57.373751
  7 | Nalini  | 2024-01-02 13:15:36.956686
(5 rows)

As we can see the data is updated in the Target table as per the changes in the Source table.

Note

  • While doing INSERT and UPDATE operations, ensure that we insert or update the value of timestamp columns along with other changes every time.

Summary

To sum up, we have implemented the CDC for INSERT, DELETE, and UPDATE operations in our PostgreSQL database. Using a Proven solution, we overcame the hurdles of Timestamp-based Change Data Capture mentioned in the previous blog.

We highly recommend you follow the series of topics around the Data Integration with PostgreSQL (the World’s Most Advanced Open Source Relational Database System) for better understanding. We have so far explored the Installation of the Pentaho Data Integration (PDI) tool and connection to the PostgreSQL database, learned about Change Data Capture and its methods and the implementation of Timestamp-based Change Data Capture, and mastered the hurdles of Timestamp-based Change Data Capture with the proven solution using PDI toolset. Try to set up a CDC for all INSERT, DELETE, and UPDATE operations in your PostgreSQL database as outlined here. Connect with us if you need any additional information.

Stay tuned !!

Mastering Timestamp-Based CDC Hurdles: A Proven Solution

Introduction

Have you experimented with Timestamp-Based Change Data Capture using the Pentaho Data Integration (PDI) tool? Achieving data replication from a source database to a target database through “Timestamp-Based Change Data Capture” with Pentaho Data Integration is indeed straightforward. Perhaps you’ve successfully implemented it. However, there are two drawbacks to achieving comprehensive data replication using this method. Firstly, updating the timestamp column in the source table is required each time rows are inserted or updated. Secondly, ‘Delete’ operations face a challenge, as there is no means to retrieve the value of the timestamp column in deleted rows.

As far as the first case is concerned, it would be taken care of as long as you incorporate these two practices in your database design:

  1. Include a Date/Time column to capture the Last Modified / Last Updated Timestamp in each and every table (or, at least the ones that you plan to replicate onto another database instance).
  2. Associate the column with an insert or update trigger that automatically updates the values in the column with the current timestamp whenever an insert or update action is performed. (or)
    Make sure to set/update the values in the column with the current timestamp value when performing any insert or update operation within your application’s code.

Coming to the second case, here is the effective and simple solution to get complete data replication with some modifications to the design of the Timestamp based CDC that we discussed in the previous blog. We highly recommend you to follow the series of topics around the Data Integration with PostgreSQL (the World’s Most Advanced Open Source Relational Database System) for better understanding. We have so far explored the Installation of Pentaho Data Integration (PDI) tool and connection to the PostgreSQL database, about Change Data Capture and its methods and the implementation of Timestamp based Change Data Capture. If you have already checked these out, you’re good to go!

We are dividing this topic into two parts to make it easier for you to follow-along:

  • Design of the CDC pipeline.
  • Execution of the CDC pipeline(next blog)

Design of the CDC pipeline

In this pipeline , we will identify the difference in comparison of two tables (one from Source Database and another from Target Database) and apply changes in the Target database accordingly. Before designing this pipeline , we will save the transformation we used to achieve ‘Timestamp based CDC’ (previous blog) as ‘Timestamp based CDC’ . Open a new transformation.

As we need two inputs for the second step , we will export the data from Source Database and also Target Database.

Export Data from Source (Step 1.1)

  • Go to Design → Input → Table input
  • Here, we will extract the data in the table(t1) from Source database by giving SQL statements.
  • Hop the result to the next step 2.0 as input.

Export Data from Target (Step 1.2)

  • Go to Design → Input → Table input
  • Here, we will extract the data in the table(t2) from Target database by giving SQL statements.
  • Hop the result to the next step 2.0 as input.

Identify the difference in two tables (Step 2.0)

  • Go to Design → Joins → Merge rows (diff)
  • Here, we will compare the field given in both the tables(t1 & t2) and show the output in flagfield column (Temporary column) as Identical, new ,deleted or changed.
  • Hop the result to the next step 3.0 as input.

Separate the difference (Step 3.0)

  • Go to Design → Flow → Filter rows
  • Here, we will filter the non-identical rows( from flagfield column) from the previous output by giving condition.
  • Hop the result to the next step 4.0 as input.

Apply changes in target (Step 4.0)

  • Go to Design → Output → Delete
  • Here, we will delete the non-identical rows( from flagfield column) from the previous output.
  • Hop the result to the next step 5.0 as input.

Timestamp based CDC (Step 5.0)

  • Go to Design → Flow → Transformation executor
  • Here, we will execute the transformation named ‘Timestamp based CDC’ (discussed in previous blog) .
  • Save and run the whole transformation.

Summary

Pentaho Data Integration (PDI) toolset is vast with numerous options built-in. We have explored just a few of those options for addressing the drawbacks of a simple timestamp based Change Data Capture, and effectively replicating ‘Delete’ operations from the source to the target databases. Understanding these two aspects is crucial, as Timestamp based is the most used method of Change Data Capture. In this blog post, we explored the design (part1) of the CDC pipeline. In the next post, we’ll solidify our understanding further on this topic by going through an example for replicating all of the insert, update and delete operations from the source database to the target database.

We highly recommend you to follow the series of topics around the Data Integration with PostgreSQL (the World’s Most Advanced Open Source Relational Database System) for better understanding. We have so far explored the Installation of Pentaho Data Integration (PDI) tool and connection to the PostgreSQL database, learnt about Change Data Capture and its methods and the implementation of Timestamp based Change Data Capture, and topped it up by adding the ability to handle ‘Delete’ operations. Try designing this pipeline, and connect with us if you need any additional information.

Stay tuned !!

Timestamp-based Change Data Capture

Introduction

Hey all!! Hope you are following the series of topics around Data Integration with PostgreSQL, The World’s Most Advanced Open Source Relational Database. In our previous blog, we explored the Change Data Capture (CDC) and its methods. If you missed it, we recommend reading the blog.

Timestamp-based CDC (Change Data Capture) is one approach to capturing and tracking changes in a database by associating a timestamp with each change. It is a versatile and widely used approach in tracking the changes in databases and supports various use cases like data replication and synchronization. In this blog, we will explore the prerequisites to execute the timestamp-based CDC and the setup with the result set.

Setup the Environment

Before diving into CDC, let’s ensure we have a timestamp column in the table (source) and the necessary components installed and configured.

PostgreSQL Database

  • Install PostgreSQL on the server or your local machine.
  • Create a database and the required tables that will be part of the CDC.

Pentaho Data Integration (ETL tool)

  • Install PDI on the server or your local machine – you may refer to this blog for a step-by-step procedure
  • Connect with the databases.
  • Create the data pipeline for CDC.

Designing the CDC Pipeline

In this CDC pipeline, we will create a new transformation and building a pipeline to extract the data from one table(t1) in one PostgreSQL database and load it in another table(t2) in another PostgreSQL database in incremental basis.

Define the date range

  • Go to Design → Input → Get system info
  • Here, we are getting the start and end range of the transformation.
  • Hop the result to the next step as input.

Export data from Source

  • Go to Design → Input → Table input
  • Here, we will extract the data in the table(t1) using a timestamp column by giving SQL statements.
  • Hop the result to the next step as input.

Import data into Target

  • Go to Design → Output → Insert/update
  • Here, we would insert the data(output from the previous step) into the target table(t2) as per the given conditions.

Input and Output

  • Assuming that both tables(t1 & t2) in different databases with columns (id, name & last_update) have no data.
  • Insert the data in the table(t1) using the insert command.
  • Check the data in table(t2) before running the transformation.
  • Run the transformation. If the transformation is successful, you will get the ✅ at the top of the following steps.
  • Now, check the target table(t2).
  • Compare the data in both the tables(t1&t2).

Note: As discussed in the previous blog, CDC using timestamp will not work for the DELETE option.

Conclusion

In conclusion, the adoption of timestamp-based Change Data Capture (CDC) in PostgreSQL marks a pivotal stride in the realm of data integration. By leveraging this sophisticated approach, you can build systems to capture and propagate real-time changes efficiently. The journey through the nuances of timestamp-based CDC in PostgreSQL+Pentaho DI showcases not just a technical evolution but a strategic leap towards a more responsive and interconnected data ecosystem. As we witness the seamless synchronization of data across diverse platforms and applications, the potential for data-driven decision-making becomes boundless. In the ever-evolving landscape of data integration, embracing timestamp-based CDC in PostgreSQL is not just a best practice—it’s a transformative key to unlocking the true power of real-time data.

Stay tuned for more in Data Integration with PostgreSQL blog post series!!