You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

414 lines
15 KiB

import pandas as pd
import os
import glob
import numpy as np
import datetime
import itertools as it
import argparse
import re
from urllib.parse import urlparse
import pysftp
# TODO make sure these are not case sensitive
# all of the columns we want to extract from the csv file
# excluding the question ids (they are found using regex)
final_columns_student = {
'Start Date': ['startdate', 'start date'],
'End Date': ['enddate', 'end date'],
'Status': ['status'],
'Ip Address': ['ip address', 'ipaddress'],
'Progress': ['progress'],
'Duration': ['duration', 'duration..in.seconds', 'duration (in seconds)'],
'District': ['district', 'please select your school district.'],
'LASID': ['lasid', 'Please enter your Locally Assigned Student ID Number (LASID, or student lunch number).'],
'Grade': ['grade', 'what grade are you in?'],
'Gender': ['gender', 'what is your gender?', 'what is your gender? - selected choice'],
'Race': ['race'],
'Recorded Date': ['recorded date', 'recordeddate'],
'Response Id': ['responseid', 'response id'],
'Dese Id': ['deseid', 'dese id', 'school'],
}
final_columns_teacher = {
'Start Date': ['startdate', 'start date'],
'End Date': ['enddate', 'end date'],
'Status': ['status'],
'Ip Address': ['ip address', 'ipaddress'],
'Progress': ['progress'],
'Duration': ['duration', 'duration..in.seconds', 'duration (in seconds)'],
'District': ['district', 'please select your school district.'],
'Recorded Date': ['recorded date', 'recordeddate'],
'Response Id': ['responseid', 'response id'],
'Dese Id': ['deseid', 'dese id', 'school'],
}
class Sftp:
def __init__(self, hostname, username, password, cnopts, port=22):
"""Constructor Method"""
# Set connection object to None (initial value)
self.connection = None
self.hostname = hostname
self.username = username
self.password = password
self.cnopts = cnopts
self.port = port
def connect(self):
"""Connects to the sftp server and returns the sftp connection object"""
try:
# Get the sftp connection object
self.connection = pysftp.Connection(
host=self.hostname,
username=self.username,
password=self.password,
cnopts=self.cnopts,
port=self.port,
)
except Exception as err:
raise Exception(err)
finally:
if not args.quiet: print(f"Connected to {self.hostname} as {self.username}.")
def disconnect(self):
"""Closes the sftp connection"""
self.connection.close()
if not args.quiet: print(f"Disconnected from host {self.hostname}")
def listdir(self, remote_path):
"""lists all the files and directories in the specified path and returns them"""
for obj in self.connection.listdir(remote_path):
yield obj
def listdir_attr(self, remote_path):
"""lists all the files and directories (with their attributes) in the specified path and returns them"""
for attr in self.connection.listdir_attr(remote_path):
yield attr
def download(self, remote_path, target_local_path):
"""
Downloads the file from remote sftp server to local.
Also, by default extracts the file to the specified target_local_path
"""
try:
if not args.quiet: print(
f"downloading from {self.hostname} as {self.username} [(remote path : {remote_path});(local path: {target_local_path})]"
)
# Create the target directory if it does not exist
path, _ = os.path.split(target_local_path)
if not os.path.isdir(path):
try:
os.makedirs(path)
except Exception as err:
raise Exception(err)
# Download from remote sftp server to local
self.connection.get(remote_path, target_local_path)
if not args.quiet: print("download completed")
except Exception as err:
raise Exception(err)
def upload(self, source_local_path, remote_path):
"""
Uploads the source files from local to the sftp server.
"""
try:
if not args.quiet: print(
f"uploading to {self.hostname} as {self.username} [(remote path: {remote_path});(source local path: {source_local_path})]"
)
# Download file from SFTP
self.connection.put(source_local_path, remote_path)
if not args.quiet: print("upload completed")
except Exception as err:
raise Exception(err)
# prepare csv and merged csv directories
def prep_dir(folder=''):
# prepare directories
cwd = os.path.join(os.getcwd(), folder)
mwd = os.path.join(cwd, 'merged')
if not os.path.exists(mwd):
if args.verbose: print(f'Creating directory {mwd}')
os.mkdir(mwd)
if args.verbose: print('Source data directory: ' + cwd)
if args.verbose: print('Merged data directory: ' + mwd)
return cwd, mwd
# get current date in Month-XX-YYYY format
def get_date():
return datetime.date.today().strftime("%B-%d-%Y")
# in dataframe df, merges any column in possibilities into the final column col
def combine_cols(df, col, possibilities):
# if final column doesn't exist, create it
if col not in df.columns:
tmpdf = pd.DataFrame([np.nan], columns=[col])
df = pd.concat((df, tmpdf), axis=1)
# list to store replaced columns
drops = []
# for every column possibility that does exist...
for cl in df.columns:
if cl.lower() in possibilities:
# we don't want to merge and drop our final column
if cl == col:
continue
# replace the column...
if args.verbose: print(f'Replacing column {cl}')
df[col] = df[col].replace(r'^\s*$', np.nan, regex=True).fillna(df[cl])
# and add it to the drop list
drops.append(cl)
# drop spent columns
df = df.drop(columns=drops)
if args.verbose: print(f'Dropped columns: {drops}')
return df
# removes unused columns from student data
def clean_cols_student(df):
keep = list(final_columns_student.keys())
keep = list(map(str.lower, keep))
drops = []
question_pattern = re.compile("^s-[a-zA-Z]{4}-q[0-9][0-9]?$")
for col in df.columns:
if col.lower() not in keep and not bool(question_pattern.match(col)):
drops.append(col)
df = df.drop(columns=drops)
if args.verbose: print(f'Dropped columns: {drops}')
return df
# removes unused columns from teacher data
def clean_cols_teacher(df):
keep = list(final_columns_teacher.keys())
keep = list(map(str.lower, keep))
drops = []
question_pattern = re.compile("^t-[a-zA-Z]{4}-q[0-9][0-9]?$")
for col in df.columns:
if col.lower() not in keep and not bool(question_pattern.match(col)):
drops.append(col)
df = df.drop(columns=drops)
if args.verbose: print(f'Dropped columns: {drops}')
return df
# performs all merging operations for student data
def do_merge_student(cwd, mwd):
# identify and merge student files
if not args.quiet: print('---Merging Student Data---')
all_files = glob.glob(os.path.join(cwd, "*student*.csv"))
if not args.quiet: print(f'Found {len(all_files)} Student CSV files')
if len(all_files) < 1:
if not args.quiet: print('No files found. Skipping merge...')
return
if not args.quiet: print('Merging...')
files = [pd.read_csv(f, low_memory=False) for f in all_files]
# count lines in read csv files
lines = 0
for fi in files:
lines += fi.shape[0]
# combine csv files
df = pd.concat(files, axis=0)
# combine related columns
if not args.quiet: print('Repairing rows...')
df = repair_student_columns(df)
# clean out unnecessary columns
if not args.quiet: print('Cleaning out columns...')
df = clean_cols_student(df)
# ensure line count matches what is expected
if df.shape[0] != lines:
print(f'Warning: Line count mismatch: {lines} expected, but got {df.shape[0]}')
# save merged file
date = get_date()
if args.project:
proj = '-' + args.project
else:
proj = ''
fn = f'{date}{proj}-student-data-merged.csv'
df.to_csv(os.path.join(mwd, fn), index=False)
if not args.quiet: print('Student data merged successfully!')
return fn
# performs all merging operations for teacher data
def do_merge_teacher(cwd, mwd):
# identify and merge teacher files
if not args.quiet: print('---Merging Teacher Data---')
all_files = glob.glob(os.path.join(cwd, "*teacher*.csv"))
if not args.quiet: print(f'Found {len(all_files)} Teacher CSV files')
if len(all_files) < 1:
if not args.quiet: print('No files found. Skipping merge...')
return
if not args.quiet: print('Merging...')
files = [pd.read_csv(f, low_memory=False) for f in all_files]
# count lines in read csv files
lines = 0
for f in files:
lines += f.shape[0]
# combine csv files
df = pd.concat(files, axis=0)
# combine related columns
if not args.quiet: print('Repairing columns...')
df = repair_teacher_columns(df)
# clean out unnecessary columns
if not args.quiet: print('Cleaning out columns...')
df = clean_cols_teacher(df)
# ensure line count matches what is expected
if df.shape[0] != lines:
print(f'Warning: Line count mismatch: {lines} expected, but got {df.shape[0]}')
# save merged file
date = get_date()
if args.project:
proj = '-' + args.project
else:
proj = ''
fn = f'{date}{proj}-teacher-data-merged.csv'
df.to_csv(os.path.join(mwd, fn), index=False)
if not args.quiet: print('Teacher data merged successfully!')
return fn
# merges teacher columns that may have mismatched names
def repair_teacher_columns(df):
for col in final_columns_teacher:
df = combine_cols(df, col, final_columns_teacher[col])
return df
# merges student columns that may have mismatched names,
# and combines question variants
def repair_student_columns(df):
for col in final_columns_student:
df = combine_cols(df, col, final_columns_student[col])
if not args.quiet: print('Combining Question Variants...')
df = combine_variants(df)
return df
# combines question variants into non-variant columns
def combine_variants(df):
drops = []
for col in df:
x = re.search(r'^s-[a-z]{4}-q[0-9][0-9]?-1$', col)
if x is not None:
# get non variant version
nonvar = col[:-2]
# combine into non variant
df[nonvar] = df[nonvar].replace(r'^\s*$', np.nan, regex=True).fillna(df[col])
# and add it to the drop list
drops.append(col)
df = df.drop(columns=drops)
return df
if __name__ == '__main__':
# parse flags
parser = argparse.ArgumentParser(
prog='merge-csv',
description='Merges CSV Files containing student and teacher data',
epilog='Usage: python merge-csv.py (-stq) (-d directory) (-r remote) (-p project)')
parser.add_argument('-d', '--directory',
action='store',
help='directory for local csv , defaults to current directory')
parser.add_argument('-t', '--teacher',
action='store_true',
dest='teacher',
help='merge teacher data')
parser.add_argument('-s', '--student',
action='store_true',
dest='student',
help='merge student data')
parser.add_argument('-q', '--quiet',
action='store_true',
dest='quiet',
help='run without output (besides errors and warnings)')
parser.add_argument('-v', '--verbose',
action='store_true',
dest='verbose',
help='run with extra output information')
parser.add_argument('-p', '--project',
action='store',
help='add a project name to the merged csv file name')
parser.add_argument('-r', '--remote-url',
action='store',
dest='remote_url',
help='sftp url for remote merging')
args = parser.parse_args()
#quiet takes precedence over verbose
if args.quiet:
args.verbose = False
# make sure -s or -t is set
if not (args.student or args.teacher):
if not args.quiet: print('Notice: Neither -s nor -t are specified. No merge will be performed.')
if args.directory and not args.remote_url:
c, m = prep_dir(args.directory)
elif not args.directory:
if not args.quiet: print('Notice: No directory specified. Defaulting to current directory.')
c, m = prep_dir()
# prepare sftp if flagged
if args.remote_url:
if not args.quiet: print(f'Remote destination set, fetching files...')
parsed_url = urlparse(args.remote_url)
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
sftp = Sftp(
hostname=parsed_url.hostname,
username=parsed_url.username,
password=parsed_url.password,
cnopts=cnopts,
)
sftp.connect()
# current directory is used for remote file merging
c, m = prep_dir()
# prepare remote path
path = args.directory
if not path:
path = "/"
# ensure trailing slash
if not path.endswith("/"): path += "/"
filelist = []
# get csv list from remote
for file in sftp.listdir_attr(path):
if file.filename.endswith(".csv"):
filelist.append(file.filename)
if not args.quiet: print(f'Fetching file {file.filename}...')
sftp.download(path + file.filename, c + file.filename)
# perform merges
if args.teacher:
tmd = do_merge_teacher(c, m)
if args.student:
smd = do_merge_student(c, m)
if args.remote_url:
# upload tmd and smd to remote
if not args.quiet: print('Uploading merged data...')
sftp.upload(m + '/' + tmd, path + 'merged/' + tmd)
sftp.upload(m + '/' + smd, path + 'merged/' + smd)
# remove merged directory
if not args.quiet: print('Cleaning up...')
os.remove(m + '/' + tmd)
os.remove(m + '/' + smd)
os.rmdir(m)
# remove downloaded files
for f in filelist:
if os.path.exists(f):
os.remove(f)
sftp.disconnect()
if not args.quiet: print('Done!')