Faster admin data loader + rename School.school_hash

mciea-main
Gabe Farrell 2 years ago
parent 0965841566
commit b3e6efdb2e

@ -8,7 +8,7 @@ class School < ApplicationRecord
validates :name, presence: true
scope :alphabetic, -> { order(name: :asc) }
scope :school_hash, -> { all.map { |school| [school.dese_id, school] }.to_h }
scope :school_by_dese_id, -> { all.map { |school| [school.dese_id, school] }.to_h }
include FriendlyId
friendly_id :name, use: [:slugged]

@ -120,7 +120,7 @@ class Cleaner
end
def schools
@schools ||= School.school_hash
@schools ||= School.school_by_dese_id
end
def genders

@ -6,11 +6,12 @@ module Dese
def self.load_data(filepath:)
admin_data_values = []
@memo = Hash.new
schools = School.school_by_dese_id
CSV.parse(File.read(filepath), headers: true) do |row|
score = likert_score(row:)
next unless valid_likert_score(likert_score: score)
admin_data_values << create_admin_data_value(row:, score:)
admin_data_values << create_admin_data_value(row:, score:, schools:)
end
AdminDataValue.import(admin_data_values.flatten.compact, batch_size: 1_000, on_duplicate_key_update: :all)
@ -40,21 +41,18 @@ module Dese
end
# these three methods do the memoization
def self.find_school(dese_id:)
return @memo["school"+dese_id] if @memo.key? "school"+dese_id
@memo["school"+dese_id] ||= School.find_by_dese_id(dese_id.to_i)
end
def self.find_admin_data_item(admin_data_item_id:)
return @memo["admin"+admin_data_item_id] if @memo.key? "admin"+admin_data_item_id
return @memo["admin" + admin_data_item_id] if @memo.key?("admin" + admin_data_item_id)
@memo["admin" + admin_data_item_id] ||= AdminDataItem.find_by_admin_data_item_id(admin_data_item_id)
end
def self.find_ay(ay:)
return @memo["year"+ay] if @memo.key? "year"+ay
return @memo["year" + ay] if @memo.key?("year" + ay)
@memo["year" + ay] ||= AcademicYear.find_by_range(ay)
end
def self.create_admin_data_value(row:, score:)
school = find_school(dese_id: dese_id(row:))
def self.create_admin_data_value(row:, score:, schools:)
school = schools[dese_id(row:).to_i]
admin_data_item_id = admin_data_item(row:)
admin_data_item = find_admin_data_item(admin_data_item_id:)
academic_year = find_ay(ay: ay(row:))
@ -73,7 +71,7 @@ module Dese
likert_score: score,
academic_year:,
school:,
admin_data_item:,
admin_data_item:
)
end
end

@ -12,8 +12,12 @@ class SurveyResponsesDataLoader
survey_item_responses = CSV.parse(lines.join, headers:).map do |row|
process_row(row: SurveyItemValues.new(row:, headers: headers_array, survey_items: all_survey_items, schools:))
end
SurveyItemResponse.import survey_item_responses.compact.flatten, batch_size: BATCH_SIZE,
SurveyItemResponse.import(
survey_item_responses.compact.flatten,
batch_size: BATCH_SIZE,
on_duplicate_key_update: :all
)
end
end
end
@ -30,27 +34,33 @@ class SurveyResponsesDataLoader
next unless line.present?
CSV.parse(line, headers:).map do |row|
survey_item_responses << process_row(row: SurveyItemValues.new(row:, headers: headers_array,
survey_items: all_survey_items, schools:))
survey_item_responses <<
process_row(row: SurveyItemValues.new(row:, headers: headers_array, survey_items: all_survey_items, schools:))
end
row_count += 1
next unless row_count == BATCH_SIZE
SurveyItemResponse.import survey_item_responses.compact.flatten, batch_size: BATCH_SIZE,
SurveyItemResponse.import(
survey_item_responses.compact.flatten,
batch_size: BATCH_SIZE,
on_duplicate_key_update: :all
)
survey_item_responses = []
row_count = 0
end
SurveyItemResponse.import survey_item_responses.compact.flatten, batch_size: BATCH_SIZE,
SurveyItemResponse.import(
survey_item_responses.compact.flatten,
batch_size: BATCH_SIZE,
on_duplicate_key_update: :all
)
end
private
def schools
@schools = School.school_hash
@schools = School.school_by_dese_id
end
def genders
@ -83,19 +93,27 @@ class SurveyResponsesDataLoader
def process_survey_items(row:)
student = Student.find_or_create_by(response_id: row.response_id, lasid: row.lasid)
student.races.delete_all
tmp_races = row.races.map { |race| races[race] }
tmp_races = row.races.map do |race|
races[race]
end
student.races += tmp_races
row.survey_items.map do |survey_item|
row
.survey_items
.map do |survey_item|
likert_score = row.likert_score(survey_item_id: survey_item.survey_item_id) || next
unless likert_score.valid_likert_score?
puts "Response ID: #{row.response_id}, Likert score: #{likert_score} rejected" unless likert_score == "NA"
puts("Response ID: #{row.response_id}, Likert score: #{likert_score} rejected") unless likert_score == "NA"
next
end
response = row.survey_item_response(survey_item:)
create_or_update_response(survey_item_response: response, likert_score:, row:, survey_item:, student:)
end.compact
end
.compact
end
def create_or_update_response(survey_item_response:, likert_score:, row:, survey_item:, student:)
@ -116,8 +134,20 @@ class SurveyResponsesDataLoader
survey_item_response.student = student
survey_item_response
else
SurveyItemResponse.new(response_id: row.response_id, academic_year: row.academic_year, school: row.school, survey_item:,
likert_score:, grade:, gender:, recorded_date: row.recorded_date, income:, ell:, sped:, student:)
SurveyItemResponse.new(
response_id: row.response_id,
academic_year: row.academic_year,
school: row.school,
survey_item:,
likert_score:,
grade:,
gender:,
recorded_date: row.recorded_date,
income:,
ell:,
sped:,
student:
)
end
end
@ -126,16 +156,18 @@ class SurveyResponsesDataLoader
end
def get_survey_item_ids_from_headers(headers:)
CSV.parse(headers).first
CSV
.parse(headers)
.first
.filter(&:present?)
.filter { |header| header.start_with? "t-", "s-" }
.filter { |header| header.start_with?("t-", "s-") }
end
end
module StringMonkeyPatches
def valid_likert_score?
to_i.between? 1, 5
to_i.between?(1, 5)
end
end
String.include StringMonkeyPatches
String.include(StringMonkeyPatches)

@ -1,63 +1,84 @@
namespace :data do
desc "load survey responses"
task load_survey_responses: :environment do
namespace(:data) do
desc("load survey responses")
task(load_survey_responses: :environment) do
survey_item_response_count = SurveyItemResponse.count
student_count = Student.count
path = "/data/survey_responses/clean/"
Sftp::Directory.open(path:) do |file|
SurveyResponsesDataLoader.new.from_file(file:)
end
puts "=====================> Completed loading #{SurveyItemResponse.count - survey_item_response_count} survey responses. #{SurveyItemResponse.count} total responses in the database"
puts(
"=====================> Completed loading #{SurveyItemResponse.count - survey_item_response_count} survey responses. #{SurveyItemResponse.count} total responses in the database"
)
Rails.cache.clear
end
desc "load survey responses from a specific directory"
task load_survey_responses_from_path: :environment do
desc("load survey responses from a specific directory")
task(load_survey_responses_from_path: :environment) do
survey_item_response_count = SurveyItemResponse.count
student_count = Student.count
path = "#{ENV['SFTP_PATH']}"
path = "#{ENV["SFTP_PATH"]}"
Sftp::Directory.open(path:) do |file|
SurveyResponsesDataLoader.new.from_file(file:)
end
puts "=====================> Completed loading #{SurveyItemResponse.count - survey_item_response_count} survey responses. #{SurveyItemResponse.count} total responses in the database"
puts(
"=====================> Completed loading #{SurveyItemResponse.count - survey_item_response_count} survey responses. #{SurveyItemResponse.count} total responses in the database"
)
Rails.cache.clear
end
desc "load admin_data"
task load_admin_data: :environment do
desc("load admin_data")
task(load_admin_data: :environment) do
original_count = AdminDataValue.count
pool_size = 2
jobs = Queue.new
Dir.glob(Rails.root.join("data", "admin_data", "dese", "*.csv")).each { |filepath| jobs << filepath }
Dir.glob(Rails.root.join("data", "admin_data", "out_of_state", "*.csv")).each { |filepath| jobs << filepath }
workers = pool_size.times.map do
Thread.new do
while filepath = jobs.pop(true)
puts "=====================> Loading data from csv at path: #{filepath}"
Dese::Loader.load_data filepath:
puts("=====================> Loading data from csv at path: #{filepath}")
Dese::Loader.load_data(filepath:)
end
rescue ThreadError
puts "=====================> Completed loading #{AdminDataValue.count - original_count} admin data values"
end
end
workers.each(&:join)
desc "reset all cache counters"
task reset_cache_counters: :environment do
puts "=====================> Resetting Category counters"
puts("=====================> Completed loading #{AdminDataValue.count - original_count} admin data values")
end
desc("reset all cache counters")
task(reset_cache_counters: :environment) do
puts("=====================> Resetting Category counters")
Category.all.each do |category|
Category.reset_counters(category.id, :subcategories)
end
puts "=====================> Resetting Subcategory counters"
puts("=====================> Resetting Subcategory counters")
Subcategory.all.each do |subcategory|
Subcategory.reset_counters(subcategory.id, :measures)
end
puts "=====================> Resetting Measure counters"
puts("=====================> Resetting Measure counters")
Measure.all.each do |measure|
Measure.reset_counters(measure.id, :scales)
end
puts "=====================> Resetting Scale counters"
puts("=====================> Resetting Scale counters")
Scale.all.each do |scale|
Scale.reset_counters(scale.id, :survey_items)
end
puts "=====================> Resetting SurveyItem counters"
puts("=====================> Resetting SurveyItem counters")
SurveyItem.all.each do |survey_item|
SurveyItem.reset_counters(survey_item.id, :survey_item_responses)
end

@ -153,7 +153,7 @@ RSpec.describe Cleaner do
survey_items = SurveyItem.where(survey_item_id: standard_survey_items)
data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: standard_survey_items, survey_items:,
schools: School.school_hash)]
schools: School.school_by_dese_id)]
filename = Cleaner.new(input_filepath:, output_filepath:, log_filepath:).filename(
headers: standard_survey_items, data:, filepath: nil
)
@ -165,7 +165,7 @@ RSpec.describe Cleaner do
survey_items = SurveyItem.where(survey_item_id: short_form_survey_items)
data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: short_form_survey_items, survey_items:,
schools: School.school_hash)]
schools: School.school_by_dese_id)]
filename = Cleaner.new(input_filepath:, output_filepath:, log_filepath:).filename(
headers: short_form_survey_items, data:, filepath: nil
)
@ -178,7 +178,7 @@ RSpec.describe Cleaner do
survey_items = SurveyItem.where(survey_item_id: early_education_survey_items)
data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: early_education_survey_items, survey_items:,
schools: School.school_hash)]
schools: School.school_by_dese_id)]
filename = Cleaner.new(input_filepath:, output_filepath:, log_filepath:).filename(
headers: early_education_survey_items, data:, filepath: nil
)
@ -190,7 +190,7 @@ RSpec.describe Cleaner do
survey_items = SurveyItem.where(survey_item_id: teacher_survey_items)
data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: teacher_survey_items, survey_items:,
schools: School.school_hash)]
schools: School.school_by_dese_id)]
filename = Cleaner.new(input_filepath:, output_filepath:, log_filepath:).filename(
headers: teacher_survey_items, data:, filepath: nil
)
@ -202,9 +202,9 @@ RSpec.describe Cleaner do
it "adds all districts to the filename" do
survey_items = SurveyItem.where(survey_item_id: teacher_survey_items)
data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: teacher_survey_items, survey_items:, schools: School.school_hash),
data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: teacher_survey_items, survey_items:, schools: School.school_by_dese_id),
SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "222_222" },
headers: teacher_survey_items, survey_items:, schools: School.school_hash)]
headers: teacher_survey_items, survey_items:, schools: School.school_by_dese_id)]
filename = Cleaner.new(input_filepath:, output_filepath:, log_filepath:).filename(
headers: teacher_survey_items, data:, filepath: nil
)
@ -217,7 +217,7 @@ RSpec.describe Cleaner do
survey_items = SurveyItem.where(survey_item_id: early_education_survey_items)
data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: early_education_survey_items, survey_items:,
schools: School.school_hash)]
schools: School.school_by_dese_id)]
filename = Cleaner.new(input_filepath:, output_filepath:, log_filepath:).filename(
headers: early_education_survey_items, data:, filepath: "/data/survey_responses/maynard early ed_ part a.2022-23.csv"
)

@ -35,7 +35,7 @@ RSpec.describe SurveyItemValues, type: :model do
let(:attleboro_respondents) do
create(:respondent, school: attleboro, academic_year: ay_2022_23, nine: 40, ten: 40, eleven: 40, twelve: 40)
end
let(:schools) { School.school_hash }
let(:schools) { School.school_by_dese_id }
let(:recorded_date) { "2023-04-01T12:12:12" }
let(:ay_2022_23) do
create(:academic_year, range: "2022-23")
@ -789,7 +789,7 @@ RSpec.describe SurveyItemValues, type: :model do
it "returns false" do
headers = %w[s-sbel-q5 s-phys-q2 grade RecordedDate DeseID]
values = SurveyItemValues.new(row: { "grade" => "2", "RecordedDate" => recorded_date, "DeseID" => "1234" }, headers:, survey_items:,
schools: School.school_hash)
schools: School.school_by_dese_id)
expect(values.valid_grade?).to eq false
end
end
@ -800,13 +800,13 @@ RSpec.describe SurveyItemValues, type: :model do
it "returns true for student questions" do
headers = %w[s-sbel-q5 s-phys-q1 s-phys-q2 RecordedDate]
values = SurveyItemValues.new(row: { "RecordedDate" => recorded_date, "Dese ID" => "1234", "s-sbel-q5" => "1", "s-phys-q1" => "", "s-phys-q2" => "5" }, headers:, survey_items:,
schools: School.school_hash)
schools: School.school_by_dese_id)
expect(values.valid_sd?).to eq true
end
it "returns true for teacher questions" do
headers = %w[t-sbel-q5 t-phys-q2]
values = SurveyItemValues.new(row: { "RecordedDate" => recorded_date, "Dese ID" => "1234", "t-sbel-q5" => "1", "t-phys-q2" => "5" }, headers:, survey_items:,
schools: School.school_hash)
schools: School.school_by_dese_id)
expect(values.valid_sd?).to eq true
end
end
@ -815,13 +815,13 @@ RSpec.describe SurveyItemValues, type: :model do
it "returns false for student questions" do
headers = %w[s-sbel-q5 s-phys-q1 s-phys-q2 RecordedDate]
values = SurveyItemValues.new(row: { "RecordedDate" => recorded_date, "Dese ID" => "1234", "s-sbel-q5" => "1", "s-phys-q2" => "1" }, headers:, survey_items:,
schools: School.school_hash)
schools: School.school_by_dese_id)
expect(values.valid_sd?).to eq false
end
it "returns false for teacher questions" do
headers = %w[t-sbel-q5 t-phys-q1 t-phys-q2 RecordedDate]
values = SurveyItemValues.new(row: { "RecordedDate" => recorded_date, "Dese ID" => "1234", "t-sbel-q5" => "1", "t-phys-q2" => "1" }, headers:, survey_items:,
schools: School.school_hash)
schools: School.school_by_dese_id)
expect(values.valid_sd?).to eq false
end
end

Loading…
Cancel
Save