From 7465ee94e1eee71a0ca1275b2f7d541273b9c191 Mon Sep 17 00:00:00 2001 From: Gabe Farrell Date: Fri, 21 Apr 2023 21:00:13 -0400 Subject: [PATCH] SFTP Support, Quiet Mode, Bug Fixes, Removed Unused Deps --- .gitignore | 4 +- README.md | Bin 844 -> 1302 bytes merge-csv.py | 243 +++++++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 209 insertions(+), 38 deletions(-) diff --git a/.gitignore b/.gitignore index 16768bd..c861176 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ .env .idea test-csv/merged -ecp-csv \ No newline at end of file +ecp-csv +test-csv-large +merged \ No newline at end of file diff --git a/README.md b/README.md index e8eb477804d642e7a37d188f8c160a129e3536df..d3f39f9937e820c95983e3f84128e7c58169bbba 100644 GIT binary patch literal 1302 zcma)6TTa6;5S(u$?qI1utwh^1Ai)v%Cp3Ko(Z+RBN-q!0tP>|~Q-rL<@jhmEXV>}p z9>HOYB?8ubPq9FSPu%0qo)M&fqQ>*tA-DGe1^3DJhh%)1??-;)O>I` zo(_!Uar)EB(_V3erz~!v38%t}_`a8iJYvo-)Mi$bWJ6h==(|D5XhRIlEK`RuIZ+jR zOlYT5&OK*!fgCKTDfx9u+CZ%|Tgq$h@``@q#8X$|z44q3X8uTxPHx3LJCU z;DM-9`&yD^#po~fq{NXj)mjkCac?cv_lhg> zD9*WsI;j~@7lc(MEU|iMCaNC9y0o~&+9#|%ku;MXjxr0Bs=G(4Mob*cBJRF9jXAyI zp|D)UkZMz(0;L@*M^+Jtr@3BJ6T7FH@|4H*Tqutr>B{gOdsdQ9-4WN^y>1#~q?Wmm zqV(r$$|{0+x3|G1NgJbYe+%6w^z#{Qdz`TfO@7Ab(qc!cW|V@k-n)yxyoJ*6 delta 102 zcmbQnb%t%i6!A)i5{3+he1<#*U4|4OUBZwGWS32@XB3@yD}XU$vNogQgqrl`@EPNB6*iC-Hq6Yw> C#2iNe diff --git a/merge-csv.py b/merge-csv.py index ab81b71..f65154b 100644 --- a/merge-csv.py +++ b/merge-csv.py @@ -1,16 +1,103 @@ import pandas as pd import os import glob -from dotenv import load_dotenv import numpy as np import datetime import itertools as it import argparse import re -import pprint +from urllib.parse import urlparse +import pysftp -def prep_dir(folder): +class Sftp: + def __init__(self, hostname, username, password, cnopts, port=22): + """Constructor Method""" + # Set connection object to None (initial value) + self.connection = None + self.hostname = hostname + self.username = username + self.password = password + self.cnopts = cnopts + self.port = port + + def connect(self): + """Connects to the sftp server and returns the sftp connection object""" + + try: + # Get the sftp connection object + self.connection = pysftp.Connection( + host=self.hostname, + username=self.username, + password=self.password, + cnopts=self.cnopts, + port=self.port, + ) + except Exception as err: + raise Exception(err) + finally: + if not args.quiet: print(f"Connected to {self.hostname} as {self.username}.") + + def disconnect(self): + """Closes the sftp connection""" + self.connection.close() + if not args.quiet: print(f"Disconnected from host {self.hostname}") + + def listdir(self, remote_path): + """lists all the files and directories in the specified path and returns them""" + for obj in self.connection.listdir(remote_path): + yield obj + + def listdir_attr(self, remote_path): + """lists all the files and directories (with their attributes) in the specified path and returns them""" + for attr in self.connection.listdir_attr(remote_path): + yield attr + + def download(self, remote_path, target_local_path): + """ + Downloads the file from remote sftp server to local. + Also, by default extracts the file to the specified target_local_path + """ + + try: + if not args.quiet: print( + f"downloading from {self.hostname} as {self.username} [(remote path : {remote_path});(local path: {target_local_path})]" + ) + + # Create the target directory if it does not exist + path, _ = os.path.split(target_local_path) + if not os.path.isdir(path): + try: + os.makedirs(path) + except Exception as err: + raise Exception(err) + + # Download from remote sftp server to local + self.connection.get(remote_path, target_local_path) + if not args.quiet: print("download completed") + + except Exception as err: + raise Exception(err) + + def upload(self, source_local_path, remote_path): + """ + Uploads the source files from local to the sftp server. + """ + + try: + if not args.quiet: print( + f"uploading to {self.hostname} as {self.username} [(remote path: {remote_path});(source local path: {source_local_path})]" + ) + + # Download file from SFTP + self.connection.put(source_local_path, remote_path) + if not args.quiet: print("upload completed") + + except Exception as err: + raise Exception(err) + + +def prep_dir(folder=''): # prepare directories cwd = os.path.join(os.getcwd(), folder) mwd = os.path.join(cwd, 'merged') @@ -33,7 +120,8 @@ def cap_permutations(s): def combine_rows(df, col, possibilities): # if final column doesn't exist, create it if col not in df.columns: - df[col] = np.nan + tmpdf = pd.DataFrame([np.nan], columns=[col]) + df = pd.concat((df, tmpdf), axis=1) # generate all upper/lowercase possibilities for columns allp = [] for p in possibilities: @@ -44,7 +132,7 @@ def combine_rows(df, col, possibilities): allp.remove(col) safety += 1 if safety > 100: - print(f'Infinite loop detected, shutting down.') + print(f'Fatal: Infinite loop detected, shutting down.') exit(1) # list to store replaced columns drops = [] @@ -64,42 +152,60 @@ def combine_rows(df, col, possibilities): def do_merge_student(cwd, mwd): # identify and merge student files - print('---Merging Student Data---') + if not args.quiet: print('---Merging Student Data---') all_files = glob.glob(os.path.join(cwd, "*student*.csv")) - print(f'Found {len(all_files)} CSV files') - print('Merging...') - files = [pd.read_csv(f) for f in all_files] + if not args.quiet: print(f'Found {len(all_files)} Student CSV files') + if len(all_files) < 1: + if not args.quiet: print('No files found. Skipping merge...') + return + if not args.quiet: print('Merging...') + files = [pd.read_csv(f, low_memory=False) for f in all_files] lines = 0 for f in files: lines += f.shape[0] - df = pd.concat(files, ignore_index=True) - print('Repairing rows...') + df = pd.concat(files, axis=0) + if not args.quiet: print('Repairing rows...') df = repair_student_rows(df) if df.shape[0] != lines: - print(f'Warning! Line count mismatch: {lines} expected, but got {df.shape[0]}') + print(f'Warning: Line count mismatch: {lines} expected, but got {df.shape[0]}') date = get_date() - df.to_csv(os.path.join(mwd, f'{date}-student-data-merged.csv')) - print('Student data merged successfully!') + if args.project: + proj = '-' + args.project + else: + proj = '' + fn = f'{date}{proj}-student-data-merged.csv' + df.to_csv(os.path.join(mwd, fn)) + if not args.quiet: print('Student data merged successfully!') + return fn def do_merge_teacher(cwd, mwd): # identify and merge teacher files - print('---Merging Teacher Data---') + if not args.quiet: print('---Merging Teacher Data---') all_files = glob.glob(os.path.join(cwd, "*teacher*.csv")) - print(f'Found {len(all_files)} CSV files') - print('Merging...') - files = [pd.read_csv(f) for f in all_files] + if not args.quiet: print(f'Found {len(all_files)} Teacher CSV files') + if len(all_files) < 1: + if not args.quiet: print('No files found. Skipping merge...') + return + if not args.quiet: print('Merging...') + files = [pd.read_csv(f, low_memory=False) for f in all_files] lines = 0 for f in files: lines += f.shape[0] - df = pd.concat(files, ignore_index=True) - print('Repairing rows...') + df = pd.concat(files, axis=0) + if not args.quiet: print('Repairing rows...') df = repair_teacher_rows(df) if df.shape[0] != lines: - print(f'Warning! Line count mismatch: {lines} expected, but got {df.shape[0]}') + print(f'Warning: Line count mismatch: {lines} expected, but got {df.shape[0]}') date = get_date() - df.to_csv(os.path.join(mwd, f'{date}-teacher-data-merged.csv')) - print('Teacher data merged successfully!') + if args.project: + proj = '-' + args.project + else: + proj = '' + fn = f'{date}{proj}-teacher-data-merged.csv' + df.to_csv(os.path.join(mwd, fn)) + if not args.quiet: print('Teacher data merged successfully!') + return fn def repair_teacher_rows(df): @@ -116,7 +222,7 @@ def repair_student_rows(df): df = combine_rows(df, 'Grade', ['grade', 'What grade are you in?']) df = combine_rows(df, 'Gender', ['gender', 'Gender - self report', 'What is your gender?', 'What is your gender? - Selected Choice']) df = combine_rows(df, 'Race', ['Race- self report', 'race', 'Race - self report']) - print('Combining Question Variants...') + if not args.quiet: print('Combining Question Variants...') df = combine_variants(df) return df @@ -137,33 +243,96 @@ def combine_variants(df): if __name__ == '__main__': - # load environment vars - load_dotenv() + # parse flags parser = argparse.ArgumentParser( prog='merge-csv', description='Merges CSV Files containing student and teacher data', - epilog='Usage: python merge-csv.py (-sth) (directory)') - parser.add_argument('-d', '--folder', + epilog='Usage: python merge-csv.py (-stq) (-d directory) (-r remote) (-p project)') + parser.add_argument('-d', '--directory', action='store', - help='directory for local csv merging') + help='directory for local csv , defaults to current directory') parser.add_argument('-t', '--teacher', action='store_true', dest='teacher', - help='merge teacher data') # only merge teacher data + help='merge teacher data') parser.add_argument('-s', '--student', action='store_true', dest='student', - help='merge student data') # on/off flag + help='merge student data') + parser.add_argument('-q', '--quiet', + action='store_true', + dest='quiet', + help='run without output (besides errors and warnings)') + parser.add_argument('-p', '--project', + action='store', + help='add a project name to the merged csv file name') + parser.add_argument('-r', '--remote-url', + action='store', + dest='remote_url', + help='sftp url for remote merging') args = parser.parse_args() + # make sure -s or -t is set if not (args.student or args.teacher): - print('Warning: Neither -s nor -t are specified. No merge will be performed.') - # do merge - c, m = prep_dir(args.folder) + if not args.quiet: print('Notice: Neither -s nor -t are specified. No merge will be performed.') + + if args.directory and not args.remote_url: + c, m = prep_dir(args.directory) + elif not args.directory: + if not args.quiet: print('Notice: No directory specified. Defaulting to current directory.') + c, m = prep_dir() + + # prepare sftp if flagged + if args.remote_url: + if not args.quiet: print(f'Remote destination set, fetching files...') + parsed_url = urlparse(args.remote_url) + cnopts = pysftp.CnOpts() + cnopts.hostkeys = None + sftp = Sftp( + hostname=parsed_url.hostname, + username=parsed_url.username, + password=parsed_url.password, + cnopts=cnopts, + ) + sftp.connect() + # current directory is used for remote file merging + c, m = prep_dir() + + # prepare remote path + path = args.directory + if not path: + path = "/" + # ensure trailing slash + if not path.endswith("/"): path += "/" + + filelist = [] + # get csv list from remote + for file in sftp.listdir_attr(path): + if file.filename.endswith(".csv"): + filelist.append(file.filename) + if not args.quiet: print(f'Fetching file {file.filename}...') + sftp.download(path + file.filename, c + file.filename) + + # perform merges if args.teacher: - do_merge_teacher(c, m) + tmd = do_merge_teacher(c, m) if args.student: - do_merge_student(c, m) + smd = do_merge_student(c, m) -# TODO: Regex match cols with title s-****-q#-1 and merge with col s-****-q# \ No newline at end of file + if args.remote_url: + # upload tmd and smd to remote + if not args.quiet: print('Uploading merged data...') + sftp.upload(m + '/' + tmd, path + 'merged/' + tmd) + sftp.upload(m + '/' + smd, path + 'merged/' + smd) + # remove merged directory + if not args.quiet: print('Cleaning up...') + os.remove(m + '/' + tmd) + os.remove(m + '/' + smd) + os.rmdir(m) + # remove downloaded files + for f in filelist: + if os.path.exists(f): + os.remove(f) + sftp.disconnect() + if not args.quiet: print('Done!')