import pandas as pd import os import glob import numpy as np import datetime import itertools as it import argparse import re from urllib.parse import urlparse import pysftp # TODO make sure these are not case sensitive # all of the columns we want to extract from the csv file # excluding the question ids (they are found using regex) final_columns_student = { 'Start Date': ['startdate', 'start date'], 'End Date': ['enddate', 'end date'], 'Status': ['status'], 'Ip Address': ['ip address', 'ipaddress'], 'Progress': ['progress'], 'Duration': ['duration', 'duration..in.seconds', 'duration (in seconds)'], 'District': ['district', 'please select your school district.'], 'LASID': ['lasid', 'Please enter your Locally Assigned Student ID Number (LASID, or student lunch number).'], 'Grade': ['grade', 'what grade are you in?'], 'Gender': ['gender', 'what is your gender?', 'what is your gender? - selected choice'], 'Race': ['race'], 'Recorded Date': ['recorded date', 'recordeddate'], 'Response Id': ['responseid', 'response id'], 'Dese Id': ['deseid', 'dese id', 'school'], } final_columns_teacher = { 'Start Date': ['startdate', 'start date'], 'End Date': ['enddate', 'end date'], 'Status': ['status'], 'Ip Address': ['ip address', 'ipaddress'], 'Progress': ['progress'], 'Duration': ['duration', 'duration..in.seconds', 'duration (in seconds)'], 'District': ['district', 'please select your school district.'], 'Recorded Date': ['recorded date', 'recordeddate'], 'Response Id': ['responseid', 'response id'], 'Dese Id': ['deseid', 'dese id', 'school'], } class Sftp: def __init__(self, hostname, username, password, cnopts, port=22): """Constructor Method""" # Set connection object to None (initial value) self.connection = None self.hostname = hostname self.username = username self.password = password self.cnopts = cnopts self.port = port def connect(self): """Connects to the sftp server and returns the sftp connection object""" try: # Get the sftp connection object self.connection = pysftp.Connection( host=self.hostname, username=self.username, password=self.password, cnopts=self.cnopts, port=self.port, ) except Exception as err: raise Exception(err) finally: if not args.quiet: print(f"Connected to {self.hostname} as {self.username}.") def disconnect(self): """Closes the sftp connection""" self.connection.close() if not args.quiet: print(f"Disconnected from host {self.hostname}") def listdir(self, remote_path): """lists all the files and directories in the specified path and returns them""" for obj in self.connection.listdir(remote_path): yield obj def listdir_attr(self, remote_path): """lists all the files and directories (with their attributes) in the specified path and returns them""" for attr in self.connection.listdir_attr(remote_path): yield attr def download(self, remote_path, target_local_path): """ Downloads the file from remote sftp server to local. Also, by default extracts the file to the specified target_local_path """ try: if not args.quiet: print( f"downloading from {self.hostname} as {self.username} [(remote path : {remote_path});(local path: {target_local_path})]" ) # Create the target directory if it does not exist path, _ = os.path.split(target_local_path) if not os.path.isdir(path): try: os.makedirs(path) except Exception as err: raise Exception(err) # Download from remote sftp server to local self.connection.get(remote_path, target_local_path) if not args.quiet: print("download completed") except Exception as err: raise Exception(err) def upload(self, source_local_path, remote_path): """ Uploads the source files from local to the sftp server. """ try: if not args.quiet: print( f"uploading to {self.hostname} as {self.username} [(remote path: {remote_path});(source local path: {source_local_path})]" ) # Download file from SFTP self.connection.put(source_local_path, remote_path) if not args.quiet: print("upload completed") except Exception as err: raise Exception(err) # prepare csv and merged csv directories def prep_dir(folder=''): # prepare directories cwd = os.path.join(os.getcwd(), folder) mwd = os.path.join(cwd, 'merged') if not os.path.exists(mwd): if args.verbose: print(f'Creating directory {mwd}') os.mkdir(mwd) if args.verbose: print('Source data directory: ' + cwd) if args.verbose: print('Merged data directory: ' + mwd) return cwd, mwd # get current date in Month-XX-YYYY format def get_date(): return datetime.date.today().strftime("%B-%d-%Y") # in dataframe df, merges any column in possibilities into the final column col def combine_cols(df, col, possibilities): # if final column doesn't exist, create it if col not in df.columns: tmpdf = pd.DataFrame([np.nan], columns=[col]) df = pd.concat((df, tmpdf), axis=1) # list to store replaced columns drops = [] # for every column possibility that does exist... for cl in df.columns: if cl.lower() in possibilities: # we don't want to merge and drop our final column if cl == col: continue # replace the column... if args.verbose: print(f'Replacing column {cl}') df[col] = df[col].replace(r'^\s*$', np.nan, regex=True).fillna(df[cl]) # and add it to the drop list drops.append(cl) # drop spent columns df = df.drop(columns=drops) if args.verbose: print(f'Dropped columns: {drops}') return df # removes unused columns from student data def clean_cols_student(df): keep = list(final_columns_student.keys()) keep = list(map(str.lower, keep)) drops = [] question_pattern = re.compile("^s-[a-zA-Z]{4}-q[0-9][0-9]?$") for col in df.columns: if col.lower() not in keep and not bool(question_pattern.match(col)): drops.append(col) df = df.drop(columns=drops) if args.verbose: print(f'Dropped columns: {drops}') return df # removes unused columns from teacher data def clean_cols_teacher(df): keep = list(final_columns_teacher.keys()) keep = list(map(str.lower, keep)) drops = [] question_pattern = re.compile("^t-[a-zA-Z]{4}-q[0-9][0-9]?$") for col in df.columns: if col.lower() not in keep and not bool(question_pattern.match(col)): drops.append(col) df = df.drop(columns=drops) if args.verbose: print(f'Dropped columns: {drops}') return df # performs all merging operations for student data def do_merge_student(cwd, mwd): # identify and merge student files if not args.quiet: print('---Merging Student Data---') all_files = glob.glob(os.path.join(cwd, "*student*.csv")) if not args.quiet: print(f'Found {len(all_files)} Student CSV files') if len(all_files) < 1: if not args.quiet: print('No files found. Skipping merge...') return if not args.quiet: print('Merging...') files = [pd.read_csv(f, low_memory=False) for f in all_files] # count lines in read csv files lines = 0 for fi in files: lines += fi.shape[0] # combine csv files df = pd.concat(files, axis=0) # combine related columns if not args.quiet: print('Repairing rows...') df = repair_student_columns(df) # clean out unnecessary columns if not args.quiet: print('Cleaning out columns...') df = clean_cols_student(df) # ensure line count matches what is expected if df.shape[0] != lines: print(f'Warning: Line count mismatch: {lines} expected, but got {df.shape[0]}') # save merged file date = get_date() if args.project: proj = '-' + args.project else: proj = '' fn = f'{date}{proj}-student-data-merged.csv' df.to_csv(os.path.join(mwd, fn), index=False) if not args.quiet: print('Student data merged successfully!') return fn # performs all merging operations for teacher data def do_merge_teacher(cwd, mwd): # identify and merge teacher files if not args.quiet: print('---Merging Teacher Data---') all_files = glob.glob(os.path.join(cwd, "*teacher*.csv")) if not args.quiet: print(f'Found {len(all_files)} Teacher CSV files') if len(all_files) < 1: if not args.quiet: print('No files found. Skipping merge...') return if not args.quiet: print('Merging...') files = [pd.read_csv(f, low_memory=False) for f in all_files] # count lines in read csv files lines = 0 for f in files: lines += f.shape[0] # combine csv files df = pd.concat(files, axis=0) # combine related columns if not args.quiet: print('Repairing columns...') df = repair_teacher_columns(df) # clean out unnecessary columns if not args.quiet: print('Cleaning out columns...') df = clean_cols_teacher(df) # ensure line count matches what is expected if df.shape[0] != lines: print(f'Warning: Line count mismatch: {lines} expected, but got {df.shape[0]}') # save merged file date = get_date() if args.project: proj = '-' + args.project else: proj = '' fn = f'{date}{proj}-teacher-data-merged.csv' df.to_csv(os.path.join(mwd, fn), index=False) if not args.quiet: print('Teacher data merged successfully!') return fn # merges teacher columns that may have mismatched names def repair_teacher_columns(df): for col in final_columns_teacher: df = combine_cols(df, col, final_columns_teacher[col]) return df # merges student columns that may have mismatched names, # and combines question variants def repair_student_columns(df): for col in final_columns_student: df = combine_cols(df, col, final_columns_student[col]) if not args.quiet: print('Combining Question Variants...') df = combine_variants(df) return df # combines question variants into non-variant columns def combine_variants(df): drops = [] for col in df: x = re.search(r'^s-[a-z]{4}-q[0-9][0-9]?-1$', col) if x is not None: # get non variant version nonvar = col[:-2] # combine into non variant df[nonvar] = df[nonvar].replace(r'^\s*$', np.nan, regex=True).fillna(df[col]) # and add it to the drop list drops.append(col) df = df.drop(columns=drops) return df if __name__ == '__main__': # parse flags parser = argparse.ArgumentParser( prog='merge-csv', description='Merges CSV Files containing student and teacher data', epilog='Usage: python merge-csv.py (-stq) (-d directory) (-r remote) (-p project)') parser.add_argument('-d', '--directory', action='store', help='directory for local csv , defaults to current directory') parser.add_argument('-t', '--teacher', action='store_true', dest='teacher', help='merge teacher data') parser.add_argument('-s', '--student', action='store_true', dest='student', help='merge student data') parser.add_argument('-q', '--quiet', action='store_true', dest='quiet', help='run without output (besides errors and warnings)') parser.add_argument('-v', '--verbose', action='store_true', dest='verbose', help='run with extra output information') parser.add_argument('-p', '--project', action='store', help='add a project name to the merged csv file name') parser.add_argument('-r', '--remote-url', action='store', dest='remote_url', help='sftp url for remote merging') args = parser.parse_args() #quiet takes precedence over verbose if args.quiet: args.verbose = False # make sure -s or -t is set if not (args.student or args.teacher): if not args.quiet: print('Notice: Neither -s nor -t are specified. No merge will be performed.') if args.directory and not args.remote_url: c, m = prep_dir(args.directory) elif not args.directory: if not args.quiet: print('Notice: No directory specified. Defaulting to current directory.') c, m = prep_dir() # prepare sftp if flagged if args.remote_url: if not args.quiet: print(f'Remote destination set, fetching files...') parsed_url = urlparse(args.remote_url) cnopts = pysftp.CnOpts() cnopts.hostkeys = None sftp = Sftp( hostname=parsed_url.hostname, username=parsed_url.username, password=parsed_url.password, cnopts=cnopts, ) sftp.connect() # current directory is used for remote file merging c, m = prep_dir() # prepare remote path path = args.directory if not path: path = "/" # ensure trailing slash if not path.endswith("/"): path += "/" filelist = [] # get csv list from remote for file in sftp.listdir_attr(path): if file.filename.endswith(".csv"): filelist.append(file.filename) if not args.quiet: print(f'Fetching file {file.filename}...') sftp.download(path + file.filename, c + file.filename) # perform merges if args.teacher: tmd = do_merge_teacher(c, m) if args.student: smd = do_merge_student(c, m) if args.remote_url: # upload tmd and smd to remote if not args.quiet: print('Uploading merged data...') sftp.upload(m + '/' + tmd, path + 'merged/' + tmd) sftp.upload(m + '/' + smd, path + 'merged/' + smd) # remove merged directory if not args.quiet: print('Cleaning up...') os.remove(m + '/' + tmd) os.remove(m + '/' + smd) os.rmdir(m) # remove downloaded files for f in filelist: if os.path.exists(f): os.remove(f) sftp.disconnect() if not args.quiet: print('Done!')