You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
460 lines
16 KiB
460 lines
16 KiB
import pandas as pd
|
|
import os
|
|
import glob
|
|
import numpy as np
|
|
import datetime
|
|
import itertools as it
|
|
import argparse
|
|
import re
|
|
from urllib.parse import urlparse
|
|
from dateutil.parser import parse
|
|
from dateutil.relativedelta import relativedelta
|
|
import pysftp
|
|
|
|
#NOTE for now, each of the arrays should be all lowercase
|
|
#TODO eventually make them case agnostic
|
|
# all of the columns we want to extract from the csv file
|
|
# excluding the question ids (they are found using regex)
|
|
final_columns_student = {
|
|
'Start Date': ['startdate', 'start date'],
|
|
'End Date': ['enddate', 'end date'],
|
|
'Status': ['status'],
|
|
'Ip Address': ['ip address', 'ipaddress'],
|
|
'Progress': ['progress'],
|
|
'Duration': ['duration', 'duration..in.seconds', 'duration (in seconds)'],
|
|
'District': ['district', 'please select your school district.'],
|
|
'LASID': ['lasid', 'please enter your locally assigned student id number (lasid, or student lunch number).'],
|
|
'Grade': ['grade', 'what grade are you in?'],
|
|
'Gender': ['gender', 'what is your gender?', 'what is your gender? - selected choice'],
|
|
'Race': ['race'],
|
|
'Recorded Date': ['recorded date', 'recordeddate'],
|
|
'Response Id': ['responseid', 'response id'],
|
|
'Dese Id': ['deseid', 'dese id', 'school'],
|
|
}
|
|
|
|
final_columns_teacher = {
|
|
'Start Date': ['startdate', 'start date'],
|
|
'End Date': ['enddate', 'end date'],
|
|
'Status': ['status'],
|
|
'Ip Address': ['ip address', 'ipaddress'],
|
|
'Progress': ['progress'],
|
|
'Duration': ['duration', 'duration..in.seconds', 'duration (in seconds)'],
|
|
'District': ['district', 'please select your school district.'],
|
|
'Recorded Date': ['recorded date', 'recordeddate'],
|
|
'Response Id': ['responseid', 'response id'],
|
|
'Dese Id': ['deseid', 'dese id', 'school'],
|
|
}
|
|
|
|
argVerbose = False
|
|
argQuiet = True
|
|
|
|
|
|
class Sftp:
|
|
def __init__(self, hostname, username, password, cnopts, port=22):
|
|
"""Constructor Method"""
|
|
# Set connection object to None (initial value)
|
|
self.connection = None
|
|
self.hostname = hostname
|
|
self.username = username
|
|
self.password = password
|
|
self.cnopts = cnopts
|
|
self.port = port
|
|
|
|
def connect(self):
|
|
"""Connects to the sftp server and returns the sftp connection object"""
|
|
|
|
try:
|
|
# Get the sftp connection object
|
|
self.connection = pysftp.Connection(
|
|
host=self.hostname,
|
|
username=self.username,
|
|
password=self.password,
|
|
cnopts=self.cnopts,
|
|
port=self.port,
|
|
)
|
|
except Exception as err:
|
|
raise Exception(err)
|
|
finally:
|
|
if not argQuiet: print(f"Connected to {self.hostname} as {self.username}.")
|
|
|
|
def disconnect(self):
|
|
"""Closes the sftp connection"""
|
|
self.connection.close()
|
|
if not argQuiet: print(f"Disconnected from host {self.hostname}")
|
|
|
|
def listdir(self, remote_path):
|
|
"""lists all the files and directories in the specified path and returns them"""
|
|
for obj in self.connection.listdir(remote_path):
|
|
yield obj
|
|
|
|
def listdir_attr(self, remote_path):
|
|
"""lists all the files and directories (with their attributes) in the specified path and returns them"""
|
|
for attr in self.connection.listdir_attr(remote_path):
|
|
yield attr
|
|
|
|
def download(self, remote_path, target_local_path):
|
|
"""
|
|
Downloads the file from remote sftp server to local.
|
|
Also, by default extracts the file to the specified target_local_path
|
|
"""
|
|
|
|
try:
|
|
if not argQuiet: print(
|
|
f"downloading from {self.hostname} as {self.username} [(remote path : {remote_path});(local path: {target_local_path})]"
|
|
)
|
|
|
|
# Create the target directory if it does not exist
|
|
path, _ = os.path.split(target_local_path)
|
|
if not os.path.isdir(path):
|
|
try:
|
|
os.makedirs(path)
|
|
except Exception as err:
|
|
raise Exception(err)
|
|
|
|
# Download from remote sftp server to local
|
|
self.connection.get(remote_path, target_local_path)
|
|
if not argQuiet: print("download completed")
|
|
|
|
except Exception as err:
|
|
raise Exception(err)
|
|
|
|
def upload(self, source_local_path, remote_path):
|
|
"""
|
|
Uploads the source files from local to the sftp server.
|
|
"""
|
|
|
|
try:
|
|
if not argQuiet: print(
|
|
f"uploading to {self.hostname} as {self.username} [(remote path: {remote_path});(source local path: {source_local_path})]"
|
|
)
|
|
|
|
# Download file from SFTP
|
|
self.connection.put(source_local_path, remote_path)
|
|
if not argQuiet: print("upload completed")
|
|
|
|
except Exception as err:
|
|
raise Exception(err)
|
|
|
|
|
|
# prepare csv and merged csv directories
|
|
def prep_dir(folder=''):
|
|
# prepare directories
|
|
cwd = os.path.join(os.getcwd(), folder)
|
|
mwd = os.path.join(cwd, 'merged')
|
|
if not os.path.exists(mwd):
|
|
if argVerbose: print(f'Creating directory {mwd}')
|
|
os.mkdir(mwd)
|
|
if argVerbose: print('Source data directory: ' + cwd)
|
|
if argVerbose: print('Merged data directory: ' + mwd)
|
|
return cwd, mwd
|
|
|
|
|
|
# get current date in Month-XX-YYYY format
|
|
def get_date():
|
|
return datetime.date.today().strftime("%B-%d-%Y")
|
|
|
|
|
|
# in dataframe df, merges any column in possibilities into the final column col
|
|
def combine_cols(df, col, possibilities):
|
|
# if final column doesn't exist, create it
|
|
if col not in df.columns:
|
|
tmpdf = pd.DataFrame([np.nan], columns=[col])
|
|
df = pd.concat((df, tmpdf), axis=1)
|
|
# list to store replaced columns
|
|
drops = []
|
|
# for every column possibility that does exist...
|
|
for cl in df.columns:
|
|
if cl.lower() in possibilities:
|
|
# we don't want to merge and drop our final column
|
|
if cl == col:
|
|
continue
|
|
# replace the column...
|
|
if argVerbose: print(f'Replacing column {cl}')
|
|
df[col] = df[col].replace(r'^\s*$', np.nan, regex=True).fillna(df[cl])
|
|
# and add it to the drop list
|
|
drops.append(cl)
|
|
# drop spent columns
|
|
df = df.drop(columns=drops)
|
|
if argVerbose: print(f'Dropped columns: {drops}')
|
|
return df
|
|
|
|
|
|
# removes unused columns from student data
|
|
def clean_cols_student(df):
|
|
keep = list(final_columns_student.keys())
|
|
keep = list(map(str.lower, keep))
|
|
drops = []
|
|
question_pattern = re.compile("^s-[a-zA-Z]{4}-q[0-9][0-9]?$")
|
|
for col in df.columns:
|
|
if col.lower() not in keep and not bool(question_pattern.match(col)):
|
|
drops.append(col)
|
|
df = df.drop(columns=drops)
|
|
if argVerbose: print(f'Dropped columns: {drops}')
|
|
return df
|
|
|
|
|
|
# removes unused columns from teacher data
|
|
def clean_cols_teacher(df):
|
|
keep = list(final_columns_teacher.keys())
|
|
keep = list(map(str.lower, keep))
|
|
drops = []
|
|
question_pattern = re.compile("^t-[a-zA-Z]{4}-q[0-9][0-9]?$")
|
|
for col in df.columns:
|
|
if col.lower() not in keep and not bool(question_pattern.match(col)):
|
|
drops.append(col)
|
|
df = df.drop(columns=drops)
|
|
if argVerbose: print(f'Dropped columns: {drops}')
|
|
return df
|
|
|
|
|
|
# performs all merging operations for student data
|
|
def do_merge_student(cwd, mwd):
|
|
# identify and merge student files
|
|
if not argQuiet: print('---Merging Student Data---')
|
|
all_files = glob.glob(os.path.join(cwd, "*student*.csv"))
|
|
if not argQuiet: print(f'Found {len(all_files)} Student CSV files')
|
|
if len(all_files) < 1:
|
|
if not argQuiet: print('No files found. Skipping merge...')
|
|
return
|
|
if not argQuiet: print('Merging...')
|
|
files = [pd.read_csv(f, low_memory=False) for f in all_files]
|
|
# count lines in read csv files
|
|
lines = 0
|
|
for fi in files:
|
|
lines += fi.shape[0]
|
|
# combine csv files
|
|
df = pd.concat(files, axis=0)
|
|
# combine related columns
|
|
if not argQuiet: print('Repairing rows...')
|
|
df = repair_student_columns(df)
|
|
# clean out unnecessary columns
|
|
if not argQuiet: print('Cleaning out columns...')
|
|
df = clean_cols_student(df)
|
|
# add academic year column
|
|
if not argQuiet: print('Adding \'Academic Year\' column...')
|
|
df = add_academic_year(df)
|
|
# ensure line count matches what is expected
|
|
if df.shape[0] != lines:
|
|
print(f'Warning: Line count mismatch: {lines} expected, but got {df.shape[0]}')
|
|
# save merged file
|
|
date = get_date()
|
|
if args.project:
|
|
proj = '-' + args.project
|
|
else:
|
|
proj = ''
|
|
fn = f'{date}{proj}-student-data-merged.csv'
|
|
df.to_csv(os.path.join(mwd, fn), index=False)
|
|
if not argQuiet: print('Student data merged successfully!')
|
|
return fn
|
|
|
|
|
|
# performs all merging operations for teacher data
|
|
def do_merge_teacher(cwd, mwd):
|
|
# identify and merge teacher files
|
|
if not argQuiet: print('---Merging Teacher Data---')
|
|
all_files = glob.glob(os.path.join(cwd, "*teacher*.csv"))
|
|
if not argQuiet: print(f'Found {len(all_files)} Teacher CSV files')
|
|
if len(all_files) < 1:
|
|
if not argQuiet: print('No files found. Skipping merge...')
|
|
return
|
|
if not argQuiet: print('Merging...')
|
|
files = [pd.read_csv(f, low_memory=False) for f in all_files]
|
|
# count lines in read csv files
|
|
lines = 0
|
|
for f in files:
|
|
lines += f.shape[0]
|
|
# combine csv files
|
|
df = pd.concat(files, axis=0)
|
|
# combine related columns
|
|
if not argQuiet: print('Repairing columns...')
|
|
df = repair_teacher_columns(df)
|
|
# clean out unnecessary columns
|
|
if not argQuiet: print('Cleaning out columns...')
|
|
df = clean_cols_teacher(df)
|
|
# add academic year column
|
|
if not argQuiet: print('Adding \'Academic Year\' column...')
|
|
df = add_academic_year(df)
|
|
# ensure line count matches what is expected
|
|
if df.shape[0] != lines:
|
|
print(f'Warning: Line count mismatch: {lines} expected, but got {df.shape[0]}')
|
|
# save merged file
|
|
date = get_date()
|
|
if args.project:
|
|
proj = '-' + args.project
|
|
else:
|
|
proj = ''
|
|
fn = f'{date}{proj}-teacher-data-merged.csv'
|
|
df.to_csv(os.path.join(mwd, fn), index=False)
|
|
if not argQuiet: print('Teacher data merged successfully!')
|
|
return fn
|
|
|
|
|
|
# merges teacher columns that may have mismatched names
|
|
def repair_teacher_columns(df):
|
|
for col in final_columns_teacher:
|
|
df = combine_cols(df, col, final_columns_teacher[col])
|
|
return df
|
|
|
|
|
|
# merges student columns that may have mismatched names,
|
|
# and combines question variants
|
|
def repair_student_columns(df):
|
|
for col in final_columns_student:
|
|
df = combine_cols(df, col, final_columns_student[col])
|
|
if not argQuiet: print('Combining Question Variants...')
|
|
df = combine_variants(df)
|
|
return df
|
|
|
|
|
|
# combines question variants into non-variant columns
|
|
def combine_variants(df):
|
|
drops = []
|
|
for col in df:
|
|
x = re.search(r'^s-[a-z]{4}-q[0-9][0-9]?-1$', col)
|
|
if x is not None:
|
|
# get non variant version
|
|
nonvar = col[:-2]
|
|
# combine into non variant
|
|
df[nonvar] = df[nonvar].replace(r'^\s*$', np.nan, regex=True).fillna(df[col])
|
|
# and add it to the drop list
|
|
drops.append(col)
|
|
df = df.drop(columns=drops)
|
|
return df
|
|
|
|
# take the dates in 'Recorded Date' and use them to add
|
|
# a column for academic year
|
|
# note: must be used after the columns are merged because this only
|
|
# looks for the column 'Recorded Date'
|
|
def add_academic_year(df):
|
|
academic_year = []
|
|
recorded_date = df['Recorded Date'].tolist()
|
|
for datestr in recorded_date:
|
|
academic_year.append(date_str_to_academic_year(datestr))
|
|
df['Academic Year'] = academic_year
|
|
# probably unnecessary to return df here, but this is the convention so far
|
|
return df
|
|
|
|
|
|
def date_str_to_academic_year(str):
|
|
# get date from string
|
|
try:
|
|
date = parse(str).date()
|
|
except TypeError:
|
|
# I would like this to only print once if the merged csv will have Undefined, but whatever
|
|
print('WARN: Found non-date value in \'Recorded Date\' column, \'Academic Year\' will contain \'Undefined\' for some rows')
|
|
return 'Undefined'
|
|
# I wanted to use dates to calculate the nextyear and lastyear values, but LEAP YEARS !!!!
|
|
if date.month < 7: # spring semester
|
|
lastyear = date.year-1
|
|
return f'{lastyear}-{date.strftime("%y")}'
|
|
else: # fall semester
|
|
nextyear = date.year+1 - 2000
|
|
return f'{date.year}-{nextyear}'
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
# parse flags
|
|
parser = argparse.ArgumentParser(
|
|
prog='merge-csv',
|
|
description='Merges CSV Files containing student and teacher data',
|
|
epilog='Usage: python merge-csv.py (-stq) (-d directory) (-r remote) (-p project)')
|
|
parser.add_argument('-d', '--directory',
|
|
action='store',
|
|
help='directory for local csv , defaults to current directory')
|
|
parser.add_argument('-t', '--teacher',
|
|
action='store_true',
|
|
dest='teacher',
|
|
help='merge teacher data')
|
|
parser.add_argument('-s', '--student',
|
|
action='store_true',
|
|
dest='student',
|
|
help='merge student data')
|
|
parser.add_argument('-q', '--quiet',
|
|
action='store_true',
|
|
dest='quiet',
|
|
help='run without output (besides errors and warnings)')
|
|
parser.add_argument('-v', '--verbose',
|
|
action='store_true',
|
|
dest='verbose',
|
|
help='run with extra output information')
|
|
parser.add_argument('-p', '--project',
|
|
action='store',
|
|
help='add a project name to the merged csv file name')
|
|
parser.add_argument('-r', '--remote-url',
|
|
action='store',
|
|
dest='remote_url',
|
|
help='sftp url for remote merging')
|
|
args = parser.parse_args()
|
|
|
|
argVerbose = args.verbose
|
|
argQuiet = args.quiet
|
|
|
|
#quiet takes precedence over verbose
|
|
if argQuiet:
|
|
argVerbose = False
|
|
|
|
# make sure -s or -t is set
|
|
if not (args.student or args.teacher):
|
|
if not argQuiet: print('Notice: Neither -s nor -t are specified. No merge will be performed.')
|
|
|
|
if args.directory and not args.remote_url:
|
|
c, m = prep_dir(args.directory)
|
|
elif not args.directory:
|
|
if not argQuiet: print('Notice: No directory specified. Defaulting to current directory.')
|
|
c, m = prep_dir()
|
|
|
|
# prepare sftp if flagged
|
|
if args.remote_url:
|
|
if not argQuiet: print(f'Remote destination set, fetching files...')
|
|
parsed_url = urlparse(args.remote_url)
|
|
cnopts = pysftp.CnOpts()
|
|
cnopts.hostkeys = None
|
|
sftp = Sftp(
|
|
hostname=parsed_url.hostname,
|
|
username=parsed_url.username,
|
|
password=parsed_url.password,
|
|
cnopts=cnopts,
|
|
)
|
|
sftp.connect()
|
|
# current directory is used for remote file merging
|
|
c, m = prep_dir()
|
|
|
|
# prepare remote path
|
|
path = args.directory
|
|
if not path:
|
|
path = "/"
|
|
# ensure trailing slash
|
|
if not path.endswith("/"): path += "/"
|
|
|
|
filelist = []
|
|
# get csv list from remote
|
|
for file in sftp.listdir_attr(path):
|
|
if file.filename.endswith(".csv"):
|
|
filelist.append(file.filename)
|
|
if not argQuiet: print(f'Fetching file {file.filename}...')
|
|
sftp.download(path + file.filename, c + file.filename)
|
|
|
|
# perform merges
|
|
if args.teacher:
|
|
tmd = do_merge_teacher(c, m)
|
|
if args.student:
|
|
smd = do_merge_student(c, m)
|
|
|
|
if args.remote_url:
|
|
# upload tmd and smd to remote
|
|
if not argQuiet: print('Uploading merged data...')
|
|
sftp.upload(m + '/' + tmd, path + 'merged/' + tmd)
|
|
sftp.upload(m + '/' + smd, path + 'merged/' + smd)
|
|
# remove merged directory
|
|
if not argQuiet: print('Cleaning up...')
|
|
os.remove(m + '/' + tmd)
|
|
os.remove(m + '/' + smd)
|
|
os.rmdir(m)
|
|
# remove downloaded files
|
|
for f in filelist:
|
|
if os.path.exists(f):
|
|
os.remove(f)
|
|
sftp.disconnect()
|
|
if not argQuiet: print('Done!')
|