Open-source Data Engineering with PostgreSQL

Blog-3: Data Loading with Apache Spark

INTRODUCTION:

Welcome to the next installment of our series on Open-source Data Engineering with PostgreSQL. In this blog, we’ll delve into the practicalities of transforming table data from PostgreSQL into the Parquet format and storing it in an S3 bucket. Leveraging Python, Pandas, Apache Spark, and the power of open-source tools, we aim to guide you through a seamless data transformation process.

Installing Required Libraries:

Firstly, we ensure that you have the necessary libraries installed to run the following commands to install Pandas, Apache Spark, and boto3

pip install pandas
pip install pyspark
pip install pyarrow
pip install psycopg2
pip install sqlalchemy
pip install boto3

Python code for Transformation:

Here is the Python code for transforming PostgreSQL data into the Parquet format locally along with the timestamp and compression of data to Gzip

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from sqlalchemy import create_engine
import datetime

# Connection parameters for PostgreSQL
db_params = {
    'host': 'localhost',
    'port': 5432,
    'database': 'spark_db',
    'user': 'postgres',
    'password': 'postgres'
}

# Table name in PostgreSQL
postgres_table_name = 'trail_tab'

# Parquet file path
parquet_output_path = 'table1.parquet'

# Chunk size
chunk_size = 1000000

# Start measuring time
total_starttime = datetime.datetime.now()

# Create a connection to PostgreSQL
engine = create_engine(f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}")

# Get the total number of rows in the PostgreSQL table
total_rows = pd.read_sql_query(f"SELECT COUNT(*) FROM {postgres_table_name}", engine).iloc[0, 0]

# Initialize an empty DataFrame to hold the combined data
combined_df = pd.DataFrame()

# Loop through chunks
for chunk_index, chunk_start in enumerate(range(0, total_rows, chunk_size)):

    # Read data from PostgreSQL table into a Pandas DataFrame for the current chunk
    query = f"SELECT * FROM {postgres_table_name} OFFSET {chunk_start} LIMIT {chunk_size}"
    postgres_df = pd.read_sql_query(query, engine)

    # Concatenate the current chunk to the combined DataFrame
    combined_df = pd.concat([combined_df, postgres_df], ignore_index=True)

# Write the combined DataFrame to a single Parquet file
pq.write_table(pa.Table.from_pandas(combined_df), parquet_output_path)

# End measuring time
total_endtime = datetime.datetime.now()

# Calculate and print the total time taken
total_time_taken = total_endtime - total_starttime
print(f"Total time taken: {total_time_taken}")
print(f"Data from PostgreSQL table '{postgres_table_name}' has been successfully written to a single Parquet file.")

Here is the output for the above code execution:

Python code for transforming PostgreSQL data to S3 bucket:

# This Python code to parse PostgreSQL data to S3 in Parquet
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from sqlalchemy import create_engine
import datetime
import boto3
from io import BytesIO

# Connection parameters for PostgreSQL
db_params = {
    'host': 'localhost',
    'port': 5432,
    'database': 'spark_db',
    'user': 'postgres',
    'password': 'postgres'
}

# Table name in PostgreSQL
postgres_table_name = 'trail_tab'

# Parquet file path (local temporary path)
parquet_local_path = 'table1_combined.parquet'

# S3 bucket and key
s3_bucket = 'table-parquetbucket'
s3_key = 'table1_combined.parquet'

# Chunk size
chunk_size = 1000000

# Start measuring time
total_starttime = datetime.datetime.now()

# Create a connection to PostgreSQL
engine = create_engine(f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}")

# Get the total number of rows in the PostgreSQL table
total_rows = pd.read_sql_query(f"SELECT COUNT(*) FROM {postgres_table_name}", engine).iloc[0, 0]

# Initialize an empty DataFrame to hold the combined data
combined_df = pd.DataFrame()

# Loop through chunks
for chunk_index, chunk_start in enumerate(range(0, total_rows, chunk_size)):

    # Read data from PostgreSQL table into a Pandas DataFrame for the current chunk
    query = f"SELECT * FROM {postgres_table_name} OFFSET {chunk_start} LIMIT {chunk_size}"
    postgres_df = pd.read_sql_query(query, engine)

    # Concatenate the current chunk to the combined DataFrame
    combined_df = pd.concat([combined_df, postgres_df], ignore_index=True)

# Write the combined DataFrame to a local Parquet file
pq.write_table(pa.Table.from_pandas(combined_df), parquet_local_path)

# Upload the local Parquet file to S3
s3_client = boto3.client('s3', region_name='ap-south-1')
with open(parquet_local_path, 'rb') as data:
    s3_client.upload_fileobj(data, s3_bucket, s3_key)

# End measuring time
total_endtime = datetime.datetime.now()

# Calculate and print the total time taken
total_time_taken = total_endtime - total_starttime
print(f"Total time taken: {total_time_taken}")
print(f"Data from PostgresSQL '{postgres_table_name}' has been successfully written to the S3 Bucket.")

Here is the output for the above code execution

Python code for transforming Parquet format data to a new PostgreSQL table :

 # This Python code to parse Parquet data to PostgreSQL new table
import pandas as pd
import pyarrow.parquet as pq
from sqlalchemy import create_engine
import datetime
# Connection parameters for PostgreSQL
db_params = {
    'host': 'localhost',
    'port': 5432,
    'database': 'db_1',
    'user': 'postgres',
    'password': 'postgres'
}
# Table name in PostgreSQL
postgres_table_name = 'parquetinput'
# Parquet file path
parquet_input_path = 'compressedtable1.parquet'

# Chunk size for insertion into PostgreSQL
insert_chunk_size = 100000
# Start measuring time
total_starttime = datetime.datetime.now()
# Create a connection to PostgreSQL
engine = create_engine(f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}")
# Read the Parquet file into a Pandas DataFrame
parquet_data = pq.read_table(parquet_input_path).to_pandas()
# Chunked insertion into PostgreSQL table
for chunk_start in range(0, len(parquet_data), insert_chunk_size):
    chunk_end = min(chunk_start + insert_chunk_size, len(parquet_data))
    chunk_df = parquet_data.iloc[chunk_start:chunk_end]
   # Use pandas to_sql function to insert the chunk into the PostgreSQL table
    chunk_df.to_sql(postgres_table_name, engine, if_exists='append', index=False)
# End measuring time
total_endtime = datetime.datetime.now()
# Calculate and print the total time taken
total_time_taken = total_endtime - total_starttime
print(f"Total time taken: {total_time_taken}")
print(f"Parquet data from file '{parquet_input_path}' has been successfully loaded into the new PostgreSQL table '{postgres_table_name}'.")

Here is the Output for this code execution:

Python code for parsing S3 bucket data back to a PostgreSQL table:

# This Python code to parse S3 data back to new PostgreSQL table
import pandas as pd
import pyarrow.parquet as pq
from sqlalchemy import create_engine
import datetime
import boto3
from io import BytesIO

# Connection parameters for PostgreSQL
db_params = {
    'host': 'localhost',
    'port': 5432,
    'database': 'db_1',
    'user': 'postgres',
    'password': 'postgres'
}

# S3 bucket and key
s3_bucket = 'table-parquetbucket'
s3_key = 'table1_combined.parquet'

# Parquet file path (local temporary path)
parquet_local_path = 'table1_combined.parquet'

# Table name in PostgreSQL
postgres_table_name = 't1'

# Chunk size for insertion into PostgreSQL
insert_chunk_size = 100000

# Start measuring time
total_starttime = datetime.datetime.now()

# Download the Parquet file from S3
s3_client = boto3.client('s3', region_name='ap-south-1')
s3_client.download_file(s3_bucket, s3_key, parquet_local_path)

# Read the Parquet file into a Pandas DataFrame
parquet_data = pq.read_table(parquet_local_path).to_pandas()

# Create a connection to PostgreSQL
engine = create_engine(f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}")

# Chunked insertion into PostgreSQL table
for chunk_start in range(0, len(parquet_data), insert_chunk_size):
    chunk_end = min(chunk_start + insert_chunk_size, len(parquet_data))
    chunk_df = parquet_data.iloc[chunk_start:chunk_end]

    # Use pandas to_sql function to insert the chunk into the PostgreSQL table
    chunk_df.to_sql(postgres_table_name, engine, if_exists='append', index=False)

# End measuring time
total_endtime = datetime.datetime.now()

# Calculate and print the total time taken
total_time_taken = total_endtime - total_starttime
print(f"Total time taken: {total_time_taken}")
print(f"Parquet data from S3 has been successfully loaded into the new PostgreSQL table '{postgres_table_name}'.")

Here is the Output

What’s Next!

We are in no mood to wrap up this blog series, with the steps outlined in this blog post, you can successfully learn the intricate process of seamlessly transferring data from PostgreSQL to Parquet files, storing them on Amazon S3, and bringing them back into PostgreSQL tables. The journey doesn’t end here, in our next series we’ll shift our focus to the exciting realm of querying using Apache Drill. Stay tuned as we dive into empowering business analysts with the versatile Apache Drill.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>