You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

460 lines
16 KiB

import pandas as pd
import os
import glob
import numpy as np
import datetime
import itertools as it
import argparse
import re
from urllib.parse import urlparse
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
import pysftp
#NOTE for now, each of the arrays should be all lowercase
#TODO eventually make them case agnostic
# all of the columns we want to extract from the csv file
# excluding the question ids (they are found using regex)
final_columns_student = {
'Start Date': ['startdate', 'start date'],
'End Date': ['enddate', 'end date'],
'Status': ['status'],
'Ip Address': ['ip address', 'ipaddress'],
'Progress': ['progress'],
'Duration': ['duration', 'duration..in.seconds', 'duration (in seconds)'],
'District': ['district', 'please select your school district.'],
'LASID': ['lasid', 'please enter your locally assigned student id number (lasid, or student lunch number).'],
'Grade': ['grade', 'what grade are you in?'],
'Gender': ['gender', 'what is your gender?', 'what is your gender? - selected choice'],
'Race': ['race'],
'Recorded Date': ['recorded date', 'recordeddate'],
'Response Id': ['responseid', 'response id'],
'Dese Id': ['deseid', 'dese id', 'school'],
}
final_columns_teacher = {
'Start Date': ['startdate', 'start date'],
'End Date': ['enddate', 'end date'],
'Status': ['status'],
'Ip Address': ['ip address', 'ipaddress'],
'Progress': ['progress'],
'Duration': ['duration', 'duration..in.seconds', 'duration (in seconds)'],
'District': ['district', 'please select your school district.'],
'Recorded Date': ['recorded date', 'recordeddate'],
'Response Id': ['responseid', 'response id'],
'Dese Id': ['deseid', 'dese id', 'school'],
}
argVerbose = False
argQuiet = True
class Sftp:
def __init__(self, hostname, username, password, cnopts, port=22):
"""Constructor Method"""
# Set connection object to None (initial value)
self.connection = None
self.hostname = hostname
self.username = username
self.password = password
self.cnopts = cnopts
self.port = port
def connect(self):
"""Connects to the sftp server and returns the sftp connection object"""
try:
# Get the sftp connection object
self.connection = pysftp.Connection(
host=self.hostname,
username=self.username,
password=self.password,
cnopts=self.cnopts,
port=self.port,
)
except Exception as err:
raise Exception(err)
finally:
if not argQuiet: print(f"Connected to {self.hostname} as {self.username}.")
def disconnect(self):
"""Closes the sftp connection"""
self.connection.close()
if not argQuiet: print(f"Disconnected from host {self.hostname}")
def listdir(self, remote_path):
"""lists all the files and directories in the specified path and returns them"""
for obj in self.connection.listdir(remote_path):
yield obj
def listdir_attr(self, remote_path):
"""lists all the files and directories (with their attributes) in the specified path and returns them"""
for attr in self.connection.listdir_attr(remote_path):
yield attr
def download(self, remote_path, target_local_path):
"""
Downloads the file from remote sftp server to local.
Also, by default extracts the file to the specified target_local_path
"""
try:
if not argQuiet: print(
f"downloading from {self.hostname} as {self.username} [(remote path : {remote_path});(local path: {target_local_path})]"
)
# Create the target directory if it does not exist
path, _ = os.path.split(target_local_path)
if not os.path.isdir(path):
try:
os.makedirs(path)
except Exception as err:
raise Exception(err)
# Download from remote sftp server to local
self.connection.get(remote_path, target_local_path)
if not argQuiet: print("download completed")
except Exception as err:
raise Exception(err)
def upload(self, source_local_path, remote_path):
"""
Uploads the source files from local to the sftp server.
"""
try:
if not argQuiet: print(
f"uploading to {self.hostname} as {self.username} [(remote path: {remote_path});(source local path: {source_local_path})]"
)
# Download file from SFTP
self.connection.put(source_local_path, remote_path)
if not argQuiet: print("upload completed")
except Exception as err:
raise Exception(err)
# prepare csv and merged csv directories
def prep_dir(folder=''):
# prepare directories
cwd = os.path.join(os.getcwd(), folder)
mwd = os.path.join(cwd, 'merged')
if not os.path.exists(mwd):
if argVerbose: print(f'Creating directory {mwd}')
os.mkdir(mwd)
if argVerbose: print('Source data directory: ' + cwd)
if argVerbose: print('Merged data directory: ' + mwd)
return cwd, mwd
# get current date in Month-XX-YYYY format
def get_date():
return datetime.date.today().strftime("%B-%d-%Y")
# in dataframe df, merges any column in possibilities into the final column col
def combine_cols(df, col, possibilities):
# if final column doesn't exist, create it
if col not in df.columns:
tmpdf = pd.DataFrame([np.nan], columns=[col])
df = pd.concat((df, tmpdf), axis=1)
# list to store replaced columns
drops = []
# for every column possibility that does exist...
for cl in df.columns:
if cl.lower() in possibilities:
# we don't want to merge and drop our final column
if cl == col:
continue
# replace the column...
if argVerbose: print(f'Replacing column {cl}')
df[col] = df[col].replace(r'^\s*$', np.nan, regex=True).fillna(df[cl])
# and add it to the drop list
drops.append(cl)
# drop spent columns
df = df.drop(columns=drops)
if argVerbose: print(f'Dropped columns: {drops}')
return df
# removes unused columns from student data
def clean_cols_student(df):
keep = list(final_columns_student.keys())
keep = list(map(str.lower, keep))
drops = []
question_pattern = re.compile("^s-[a-zA-Z]{4}-q[0-9][0-9]?$")
for col in df.columns:
if col.lower() not in keep and not bool(question_pattern.match(col)):
drops.append(col)
df = df.drop(columns=drops)
if argVerbose: print(f'Dropped columns: {drops}')
return df
# removes unused columns from teacher data
def clean_cols_teacher(df):
keep = list(final_columns_teacher.keys())
keep = list(map(str.lower, keep))
drops = []
question_pattern = re.compile("^t-[a-zA-Z]{4}-q[0-9][0-9]?$")
for col in df.columns:
if col.lower() not in keep and not bool(question_pattern.match(col)):
drops.append(col)
df = df.drop(columns=drops)
if argVerbose: print(f'Dropped columns: {drops}')
return df
# performs all merging operations for student data
def do_merge_student(cwd, mwd):
# identify and merge student files
if not argQuiet: print('---Merging Student Data---')
all_files = glob.glob(os.path.join(cwd, "*student*.csv"))
if not argQuiet: print(f'Found {len(all_files)} Student CSV files')
if len(all_files) < 1:
if not argQuiet: print('No files found. Skipping merge...')
return
if not argQuiet: print('Merging...')
files = [pd.read_csv(f, low_memory=False) for f in all_files]
# count lines in read csv files
lines = 0
for fi in files:
lines += fi.shape[0]
# combine csv files
df = pd.concat(files, axis=0)
# combine related columns
if not argQuiet: print('Repairing rows...')
df = repair_student_columns(df)
# clean out unnecessary columns
if not argQuiet: print('Cleaning out columns...')
df = clean_cols_student(df)
# add academic year column
if not argQuiet: print('Adding \'Academic Year\' column...')
df = add_academic_year(df)
# ensure line count matches what is expected
if df.shape[0] != lines:
print(f'Warning: Line count mismatch: {lines} expected, but got {df.shape[0]}')
# save merged file
date = get_date()
if args.project:
proj = '-' + args.project
else:
proj = ''
fn = f'{date}{proj}-student-data-merged.csv'
df.to_csv(os.path.join(mwd, fn), index=False)
if not argQuiet: print('Student data merged successfully!')
return fn
# performs all merging operations for teacher data
def do_merge_teacher(cwd, mwd):
# identify and merge teacher files
if not argQuiet: print('---Merging Teacher Data---')
all_files = glob.glob(os.path.join(cwd, "*teacher*.csv"))
if not argQuiet: print(f'Found {len(all_files)} Teacher CSV files')
if len(all_files) < 1:
if not argQuiet: print('No files found. Skipping merge...')
return
if not argQuiet: print('Merging...')
files = [pd.read_csv(f, low_memory=False) for f in all_files]
# count lines in read csv files
lines = 0
for f in files:
lines += f.shape[0]
# combine csv files
df = pd.concat(files, axis=0)
# combine related columns
if not argQuiet: print('Repairing columns...')
df = repair_teacher_columns(df)
# clean out unnecessary columns
if not argQuiet: print('Cleaning out columns...')
df = clean_cols_teacher(df)
# add academic year column
if not argQuiet: print('Adding \'Academic Year\' column...')
df = add_academic_year(df)
# ensure line count matches what is expected
if df.shape[0] != lines:
print(f'Warning: Line count mismatch: {lines} expected, but got {df.shape[0]}')
# save merged file
date = get_date()
if args.project:
proj = '-' + args.project
else:
proj = ''
fn = f'{date}{proj}-teacher-data-merged.csv'
df.to_csv(os.path.join(mwd, fn), index=False)
if not argQuiet: print('Teacher data merged successfully!')
return fn
# merges teacher columns that may have mismatched names
def repair_teacher_columns(df):
for col in final_columns_teacher:
df = combine_cols(df, col, final_columns_teacher[col])
return df
# merges student columns that may have mismatched names,
# and combines question variants
def repair_student_columns(df):
for col in final_columns_student:
df = combine_cols(df, col, final_columns_student[col])
if not argQuiet: print('Combining Question Variants...')
df = combine_variants(df)
return df
# combines question variants into non-variant columns
def combine_variants(df):
drops = []
for col in df:
x = re.search(r'^s-[a-z]{4}-q[0-9][0-9]?-1$', col)
if x is not None:
# get non variant version
nonvar = col[:-2]
# combine into non variant
df[nonvar] = df[nonvar].replace(r'^\s*$', np.nan, regex=True).fillna(df[col])
# and add it to the drop list
drops.append(col)
df = df.drop(columns=drops)
return df
# take the dates in 'Recorded Date' and use them to add
# a column for academic year
# note: must be used after the columns are merged because this only
# looks for the column 'Recorded Date'
def add_academic_year(df):
academic_year = []
recorded_date = df['Recorded Date'].tolist()
for datestr in recorded_date:
academic_year.append(date_str_to_academic_year(datestr))
df['Academic Year'] = academic_year
# probably unnecessary to return df here, but this is the convention so far
return df
def date_str_to_academic_year(str):
# get date from string
try:
date = parse(str).date()
except TypeError:
# I would like this to only print once if the merged csv will have Undefined, but whatever
print('WARN: Found non-date value in \'Recorded Date\' column, \'Academic Year\' will contain \'Undefined\' for some rows')
return 'Undefined'
# I wanted to use dates to calculate the nextyear and lastyear values, but LEAP YEARS !!!!
if date.month < 7: # spring semester
lastyear = date.year-1
return f'{lastyear}-{date.strftime("%y")}'
else: # fall semester
nextyear = date.year+1 - 2000
return f'{date.year}-{nextyear}'
if __name__ == '__main__':
# parse flags
parser = argparse.ArgumentParser(
prog='merge-csv',
description='Merges CSV Files containing student and teacher data',
epilog='Usage: python merge-csv.py (-stq) (-d directory) (-r remote) (-p project)')
parser.add_argument('-d', '--directory',
action='store',
help='directory for local csv , defaults to current directory')
parser.add_argument('-t', '--teacher',
action='store_true',
dest='teacher',
help='merge teacher data')
parser.add_argument('-s', '--student',
action='store_true',
dest='student',
help='merge student data')
parser.add_argument('-q', '--quiet',
action='store_true',
dest='quiet',
help='run without output (besides errors and warnings)')
parser.add_argument('-v', '--verbose',
action='store_true',
dest='verbose',
help='run with extra output information')
parser.add_argument('-p', '--project',
action='store',
help='add a project name to the merged csv file name')
parser.add_argument('-r', '--remote-url',
action='store',
dest='remote_url',
help='sftp url for remote merging')
args = parser.parse_args()
argVerbose = args.verbose
argQuiet = args.quiet
#quiet takes precedence over verbose
if argQuiet:
argVerbose = False
# make sure -s or -t is set
if not (args.student or args.teacher):
if not argQuiet: print('Notice: Neither -s nor -t are specified. No merge will be performed.')
if args.directory and not args.remote_url:
c, m = prep_dir(args.directory)
elif not args.directory:
if not argQuiet: print('Notice: No directory specified. Defaulting to current directory.')
c, m = prep_dir()
# prepare sftp if flagged
if args.remote_url:
if not argQuiet: print(f'Remote destination set, fetching files...')
parsed_url = urlparse(args.remote_url)
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
sftp = Sftp(
hostname=parsed_url.hostname,
username=parsed_url.username,
password=parsed_url.password,
cnopts=cnopts,
)
sftp.connect()
# current directory is used for remote file merging
c, m = prep_dir()
# prepare remote path
path = args.directory
if not path:
path = "/"
# ensure trailing slash
if not path.endswith("/"): path += "/"
filelist = []
# get csv list from remote
for file in sftp.listdir_attr(path):
if file.filename.endswith(".csv"):
filelist.append(file.filename)
if not argQuiet: print(f'Fetching file {file.filename}...')
sftp.download(path + file.filename, c + file.filename)
# perform merges
if args.teacher:
tmd = do_merge_teacher(c, m)
if args.student:
smd = do_merge_student(c, m)
if args.remote_url:
# upload tmd and smd to remote
if not argQuiet: print('Uploading merged data...')
sftp.upload(m + '/' + tmd, path + 'merged/' + tmd)
sftp.upload(m + '/' + smd, path + 'merged/' + smd)
# remove merged directory
if not argQuiet: print('Cleaning up...')
os.remove(m + '/' + tmd)
os.remove(m + '/' + smd)
os.rmdir(m)
# remove downloaded files
for f in filelist:
if os.path.exists(f):
os.remove(f)
sftp.disconnect()
if not argQuiet: print('Done!')