mirror of
https://github.com/edcommonwealth/merge-csv.git
synced 2026-03-07 21:48:13 -08:00
SFTP Support, Quiet Mode, Bug Fixes, Removed Unused Deps
This commit is contained in:
parent
ecb7903851
commit
7465ee94e1
3 changed files with 211 additions and 40 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -2,3 +2,5 @@
|
|||
.idea
|
||||
test-csv/merged
|
||||
ecp-csv
|
||||
test-csv-large
|
||||
merged
|
||||
BIN
README.md
BIN
README.md
Binary file not shown.
247
merge-csv.py
247
merge-csv.py
|
|
@ -1,16 +1,103 @@
|
|||
import pandas as pd
|
||||
import os
|
||||
import glob
|
||||
from dotenv import load_dotenv
|
||||
import numpy as np
|
||||
import datetime
|
||||
import itertools as it
|
||||
import argparse
|
||||
import re
|
||||
import pprint
|
||||
from urllib.parse import urlparse
|
||||
import pysftp
|
||||
|
||||
|
||||
def prep_dir(folder):
|
||||
class Sftp:
|
||||
def __init__(self, hostname, username, password, cnopts, port=22):
|
||||
"""Constructor Method"""
|
||||
# Set connection object to None (initial value)
|
||||
self.connection = None
|
||||
self.hostname = hostname
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.cnopts = cnopts
|
||||
self.port = port
|
||||
|
||||
def connect(self):
|
||||
"""Connects to the sftp server and returns the sftp connection object"""
|
||||
|
||||
try:
|
||||
# Get the sftp connection object
|
||||
self.connection = pysftp.Connection(
|
||||
host=self.hostname,
|
||||
username=self.username,
|
||||
password=self.password,
|
||||
cnopts=self.cnopts,
|
||||
port=self.port,
|
||||
)
|
||||
except Exception as err:
|
||||
raise Exception(err)
|
||||
finally:
|
||||
if not args.quiet: print(f"Connected to {self.hostname} as {self.username}.")
|
||||
|
||||
def disconnect(self):
|
||||
"""Closes the sftp connection"""
|
||||
self.connection.close()
|
||||
if not args.quiet: print(f"Disconnected from host {self.hostname}")
|
||||
|
||||
def listdir(self, remote_path):
|
||||
"""lists all the files and directories in the specified path and returns them"""
|
||||
for obj in self.connection.listdir(remote_path):
|
||||
yield obj
|
||||
|
||||
def listdir_attr(self, remote_path):
|
||||
"""lists all the files and directories (with their attributes) in the specified path and returns them"""
|
||||
for attr in self.connection.listdir_attr(remote_path):
|
||||
yield attr
|
||||
|
||||
def download(self, remote_path, target_local_path):
|
||||
"""
|
||||
Downloads the file from remote sftp server to local.
|
||||
Also, by default extracts the file to the specified target_local_path
|
||||
"""
|
||||
|
||||
try:
|
||||
if not args.quiet: print(
|
||||
f"downloading from {self.hostname} as {self.username} [(remote path : {remote_path});(local path: {target_local_path})]"
|
||||
)
|
||||
|
||||
# Create the target directory if it does not exist
|
||||
path, _ = os.path.split(target_local_path)
|
||||
if not os.path.isdir(path):
|
||||
try:
|
||||
os.makedirs(path)
|
||||
except Exception as err:
|
||||
raise Exception(err)
|
||||
|
||||
# Download from remote sftp server to local
|
||||
self.connection.get(remote_path, target_local_path)
|
||||
if not args.quiet: print("download completed")
|
||||
|
||||
except Exception as err:
|
||||
raise Exception(err)
|
||||
|
||||
def upload(self, source_local_path, remote_path):
|
||||
"""
|
||||
Uploads the source files from local to the sftp server.
|
||||
"""
|
||||
|
||||
try:
|
||||
if not args.quiet: print(
|
||||
f"uploading to {self.hostname} as {self.username} [(remote path: {remote_path});(source local path: {source_local_path})]"
|
||||
)
|
||||
|
||||
# Download file from SFTP
|
||||
self.connection.put(source_local_path, remote_path)
|
||||
if not args.quiet: print("upload completed")
|
||||
|
||||
except Exception as err:
|
||||
raise Exception(err)
|
||||
|
||||
|
||||
def prep_dir(folder=''):
|
||||
# prepare directories
|
||||
cwd = os.path.join(os.getcwd(), folder)
|
||||
mwd = os.path.join(cwd, 'merged')
|
||||
|
|
@ -33,7 +120,8 @@ def cap_permutations(s):
|
|||
def combine_rows(df, col, possibilities):
|
||||
# if final column doesn't exist, create it
|
||||
if col not in df.columns:
|
||||
df[col] = np.nan
|
||||
tmpdf = pd.DataFrame([np.nan], columns=[col])
|
||||
df = pd.concat((df, tmpdf), axis=1)
|
||||
# generate all upper/lowercase possibilities for columns
|
||||
allp = []
|
||||
for p in possibilities:
|
||||
|
|
@ -44,7 +132,7 @@ def combine_rows(df, col, possibilities):
|
|||
allp.remove(col)
|
||||
safety += 1
|
||||
if safety > 100:
|
||||
print(f'Infinite loop detected, shutting down.')
|
||||
print(f'Fatal: Infinite loop detected, shutting down.')
|
||||
exit(1)
|
||||
# list to store replaced columns
|
||||
drops = []
|
||||
|
|
@ -64,42 +152,60 @@ def combine_rows(df, col, possibilities):
|
|||
|
||||
def do_merge_student(cwd, mwd):
|
||||
# identify and merge student files
|
||||
print('---Merging Student Data---')
|
||||
if not args.quiet: print('---Merging Student Data---')
|
||||
all_files = glob.glob(os.path.join(cwd, "*student*.csv"))
|
||||
print(f'Found {len(all_files)} CSV files')
|
||||
print('Merging...')
|
||||
files = [pd.read_csv(f) for f in all_files]
|
||||
if not args.quiet: print(f'Found {len(all_files)} Student CSV files')
|
||||
if len(all_files) < 1:
|
||||
if not args.quiet: print('No files found. Skipping merge...')
|
||||
return
|
||||
if not args.quiet: print('Merging...')
|
||||
files = [pd.read_csv(f, low_memory=False) for f in all_files]
|
||||
lines = 0
|
||||
for f in files:
|
||||
lines += f.shape[0]
|
||||
df = pd.concat(files, ignore_index=True)
|
||||
print('Repairing rows...')
|
||||
df = pd.concat(files, axis=0)
|
||||
if not args.quiet: print('Repairing rows...')
|
||||
df = repair_student_rows(df)
|
||||
if df.shape[0] != lines:
|
||||
print(f'Warning! Line count mismatch: {lines} expected, but got {df.shape[0]}')
|
||||
print(f'Warning: Line count mismatch: {lines} expected, but got {df.shape[0]}')
|
||||
date = get_date()
|
||||
df.to_csv(os.path.join(mwd, f'{date}-student-data-merged.csv'))
|
||||
print('Student data merged successfully!')
|
||||
if args.project:
|
||||
proj = '-' + args.project
|
||||
else:
|
||||
proj = ''
|
||||
fn = f'{date}{proj}-student-data-merged.csv'
|
||||
df.to_csv(os.path.join(mwd, fn))
|
||||
if not args.quiet: print('Student data merged successfully!')
|
||||
return fn
|
||||
|
||||
|
||||
def do_merge_teacher(cwd, mwd):
|
||||
# identify and merge teacher files
|
||||
print('---Merging Teacher Data---')
|
||||
if not args.quiet: print('---Merging Teacher Data---')
|
||||
all_files = glob.glob(os.path.join(cwd, "*teacher*.csv"))
|
||||
print(f'Found {len(all_files)} CSV files')
|
||||
print('Merging...')
|
||||
files = [pd.read_csv(f) for f in all_files]
|
||||
if not args.quiet: print(f'Found {len(all_files)} Teacher CSV files')
|
||||
if len(all_files) < 1:
|
||||
if not args.quiet: print('No files found. Skipping merge...')
|
||||
return
|
||||
if not args.quiet: print('Merging...')
|
||||
files = [pd.read_csv(f, low_memory=False) for f in all_files]
|
||||
lines = 0
|
||||
for f in files:
|
||||
lines += f.shape[0]
|
||||
df = pd.concat(files, ignore_index=True)
|
||||
print('Repairing rows...')
|
||||
df = pd.concat(files, axis=0)
|
||||
if not args.quiet: print('Repairing rows...')
|
||||
df = repair_teacher_rows(df)
|
||||
if df.shape[0] != lines:
|
||||
print(f'Warning! Line count mismatch: {lines} expected, but got {df.shape[0]}')
|
||||
print(f'Warning: Line count mismatch: {lines} expected, but got {df.shape[0]}')
|
||||
date = get_date()
|
||||
df.to_csv(os.path.join(mwd, f'{date}-teacher-data-merged.csv'))
|
||||
print('Teacher data merged successfully!')
|
||||
if args.project:
|
||||
proj = '-' + args.project
|
||||
else:
|
||||
proj = ''
|
||||
fn = f'{date}{proj}-teacher-data-merged.csv'
|
||||
df.to_csv(os.path.join(mwd, fn))
|
||||
if not args.quiet: print('Teacher data merged successfully!')
|
||||
return fn
|
||||
|
||||
|
||||
def repair_teacher_rows(df):
|
||||
|
|
@ -116,7 +222,7 @@ def repair_student_rows(df):
|
|||
df = combine_rows(df, 'Grade', ['grade', 'What grade are you in?'])
|
||||
df = combine_rows(df, 'Gender', ['gender', 'Gender - self report', 'What is your gender?', 'What is your gender? - Selected Choice'])
|
||||
df = combine_rows(df, 'Race', ['Race- self report', 'race', 'Race - self report'])
|
||||
print('Combining Question Variants...')
|
||||
if not args.quiet: print('Combining Question Variants...')
|
||||
df = combine_variants(df)
|
||||
return df
|
||||
|
||||
|
|
@ -137,33 +243,96 @@ def combine_variants(df):
|
|||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# load environment vars
|
||||
load_dotenv()
|
||||
|
||||
# parse flags
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='merge-csv',
|
||||
description='Merges CSV Files containing student and teacher data',
|
||||
epilog='Usage: python merge-csv.py (-sth) (directory)')
|
||||
parser.add_argument('-d', '--folder',
|
||||
epilog='Usage: python merge-csv.py (-stq) (-d directory) (-r remote) (-p project)')
|
||||
parser.add_argument('-d', '--directory',
|
||||
action='store',
|
||||
help='directory for local csv merging')
|
||||
help='directory for local csv , defaults to current directory')
|
||||
parser.add_argument('-t', '--teacher',
|
||||
action='store_true',
|
||||
dest='teacher',
|
||||
help='merge teacher data') # only merge teacher data
|
||||
help='merge teacher data')
|
||||
parser.add_argument('-s', '--student',
|
||||
action='store_true',
|
||||
dest='student',
|
||||
help='merge student data') # on/off flag
|
||||
help='merge student data')
|
||||
parser.add_argument('-q', '--quiet',
|
||||
action='store_true',
|
||||
dest='quiet',
|
||||
help='run without output (besides errors and warnings)')
|
||||
parser.add_argument('-p', '--project',
|
||||
action='store',
|
||||
help='add a project name to the merged csv file name')
|
||||
parser.add_argument('-r', '--remote-url',
|
||||
action='store',
|
||||
dest='remote_url',
|
||||
help='sftp url for remote merging')
|
||||
args = parser.parse_args()
|
||||
|
||||
# make sure -s or -t is set
|
||||
if not (args.student or args.teacher):
|
||||
print('Warning: Neither -s nor -t are specified. No merge will be performed.')
|
||||
# do merge
|
||||
c, m = prep_dir(args.folder)
|
||||
if args.teacher:
|
||||
do_merge_teacher(c, m)
|
||||
if args.student:
|
||||
do_merge_student(c, m)
|
||||
if not args.quiet: print('Notice: Neither -s nor -t are specified. No merge will be performed.')
|
||||
|
||||
# TODO: Regex match cols with title s-****-q#-1 and merge with col s-****-q#
|
||||
if args.directory and not args.remote_url:
|
||||
c, m = prep_dir(args.directory)
|
||||
elif not args.directory:
|
||||
if not args.quiet: print('Notice: No directory specified. Defaulting to current directory.')
|
||||
c, m = prep_dir()
|
||||
|
||||
# prepare sftp if flagged
|
||||
if args.remote_url:
|
||||
if not args.quiet: print(f'Remote destination set, fetching files...')
|
||||
parsed_url = urlparse(args.remote_url)
|
||||
cnopts = pysftp.CnOpts()
|
||||
cnopts.hostkeys = None
|
||||
sftp = Sftp(
|
||||
hostname=parsed_url.hostname,
|
||||
username=parsed_url.username,
|
||||
password=parsed_url.password,
|
||||
cnopts=cnopts,
|
||||
)
|
||||
sftp.connect()
|
||||
# current directory is used for remote file merging
|
||||
c, m = prep_dir()
|
||||
|
||||
# prepare remote path
|
||||
path = args.directory
|
||||
if not path:
|
||||
path = "/"
|
||||
# ensure trailing slash
|
||||
if not path.endswith("/"): path += "/"
|
||||
|
||||
filelist = []
|
||||
# get csv list from remote
|
||||
for file in sftp.listdir_attr(path):
|
||||
if file.filename.endswith(".csv"):
|
||||
filelist.append(file.filename)
|
||||
if not args.quiet: print(f'Fetching file {file.filename}...')
|
||||
sftp.download(path + file.filename, c + file.filename)
|
||||
|
||||
# perform merges
|
||||
if args.teacher:
|
||||
tmd = do_merge_teacher(c, m)
|
||||
if args.student:
|
||||
smd = do_merge_student(c, m)
|
||||
|
||||
if args.remote_url:
|
||||
# upload tmd and smd to remote
|
||||
if not args.quiet: print('Uploading merged data...')
|
||||
sftp.upload(m + '/' + tmd, path + 'merged/' + tmd)
|
||||
sftp.upload(m + '/' + smd, path + 'merged/' + smd)
|
||||
# remove merged directory
|
||||
if not args.quiet: print('Cleaning up...')
|
||||
os.remove(m + '/' + tmd)
|
||||
os.remove(m + '/' + smd)
|
||||
os.rmdir(m)
|
||||
# remove downloaded files
|
||||
for f in filelist:
|
||||
if os.path.exists(f):
|
||||
os.remove(f)
|
||||
sftp.disconnect()
|
||||
if not args.quiet: print('Done!')
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue