85 lines
2.6 KiB
Python
85 lines
2.6 KiB
Python
from dotenv import load_dotenv
|
|
import os
|
|
import glob
|
|
import pandas as pd
|
|
import psycopg2
|
|
import shutil # Import shutil to move files
|
|
|
|
load_dotenv()
|
|
|
|
# Directory containing your CSV files
|
|
csv_dir = '/home/ams/postgres/csv_files/'
|
|
|
|
# Directory where processed CSV files will be moved
|
|
csv_dir_old = '/home/ams/postgres/csv_files_old/'
|
|
|
|
# Ensure the csv_dir_old exists
|
|
if not os.path.exists(csv_dir_old):
|
|
os.makedirs(csv_dir_old)
|
|
|
|
# Get a list of all CSV files in the directory
|
|
csv_files = glob.glob(os.path.join(csv_dir, '*.csv'))
|
|
|
|
# Connect to the PostgreSQL database
|
|
conn = psycopg2.connect(
|
|
host="172.26.0.3",
|
|
database="analytics_team",
|
|
user=os.getenv("POSTGRES_USER"),
|
|
password=os.getenv("POSTGRES_PASSWORD")
|
|
)
|
|
cur = conn.cursor()
|
|
|
|
for csv_file in csv_files:
|
|
# Read the CSV file into a DataFrame with low_memory=False
|
|
df = pd.read_csv(csv_file, low_memory=False)
|
|
|
|
# Drop columns that are completely empty
|
|
df.dropna(axis=1, how='all', inplace=True)
|
|
|
|
# Replace NaN values with None to handle NULLs in PostgreSQL
|
|
df = df.where(pd.notnull(df), None)
|
|
|
|
# Get the filename without the extension
|
|
filename = os.path.splitext(os.path.basename(csv_file))[0]
|
|
|
|
# Define the table name
|
|
table_name = f'survey_data_{filename}'
|
|
|
|
# Drop the table if it already exists
|
|
cur.execute(f'DROP TABLE IF EXISTS "{table_name}";')
|
|
conn.commit()
|
|
|
|
# Generate the CREATE TABLE query based on DataFrame's columns and data types
|
|
columns = []
|
|
for col, dtype in zip(df.columns, df.dtypes):
|
|
col_name = col.replace('"', '""') # Escape double quotes in column names
|
|
if 'int' in str(dtype):
|
|
columns.append(f'"{col_name}" INTEGER')
|
|
elif 'float' in str(dtype):
|
|
columns.append(f'"{col_name}" FLOAT')
|
|
else:
|
|
columns.append(f'"{col_name}" TEXT')
|
|
|
|
create_table_query = f'CREATE TABLE "{table_name}" ({", ".join(columns)});'
|
|
print(f"Creating table {table_name}...")
|
|
|
|
# Execute the CREATE TABLE query
|
|
cur.execute(create_table_query)
|
|
conn.commit()
|
|
|
|
# Insert DataFrame records into the table
|
|
for index, row in df.iterrows():
|
|
placeholders = ', '.join(['%s'] * len(row))
|
|
insert_query = f'INSERT INTO "{table_name}" VALUES ({placeholders});'
|
|
cur.execute(insert_query, tuple(row))
|
|
|
|
conn.commit()
|
|
print(f"Data imported into {table_name} successfully!")
|
|
|
|
# Move the processed file to the 'csv_files_old' directory
|
|
shutil.move(csv_file, os.path.join(csv_dir_old, os.path.basename(csv_file)))
|
|
|
|
# Close the cursor and connection
|
|
cur.close()
|
|
conn.close()
|