From 6fac759ec2b8753035c80c93d823d2a80389bbac Mon Sep 17 00:00:00 2001 From: Gabe Farrell Date: Mon, 22 Apr 2024 14:46:37 -0400 Subject: [PATCH] Faster admin data loader + rename School.school_hash --- app/models/school.rb | 4 +- app/services/cleaner.rb | 2 +- app/services/dese/loader.rb | 22 +++--- app/services/survey_responses_data_loader.rb | 69 ++++++++++++----- lib/tasks/data.rake | 80 ++++++++++++-------- spec/services/cleaner_spec.rb | 14 ++-- spec/services/survey_item_values_spec.rb | 12 +-- 7 files changed, 124 insertions(+), 79 deletions(-) diff --git a/app/models/school.rb b/app/models/school.rb index 641d2af4..72b5a82c 100644 --- a/app/models/school.rb +++ b/app/models/school.rb @@ -8,7 +8,7 @@ class School < ApplicationRecord validates :name, presence: true scope :alphabetic, -> { order(name: :asc) } - scope :school_hash, -> { all.map { |school| [school.dese_id, school] }.to_h } + scope :school_by_dese_id, -> { all.map { |school| [school.dese_id, school] }.to_h } include FriendlyId friendly_id :name, use: [:slugged] @@ -16,7 +16,7 @@ class School < ApplicationRecord def self.find_by_district_code_and_school_code(district_code, school_code) School .joins(:district) - .where(districts: { qualtrics_code: district_code }) + .where(districts: {qualtrics_code: district_code}) .find_by_qualtrics_code(school_code) end diff --git a/app/services/cleaner.rb b/app/services/cleaner.rb index 9264ec33..dba39c05 100644 --- a/app/services/cleaner.rb +++ b/app/services/cleaner.rb @@ -120,7 +120,7 @@ class Cleaner end def schools - @schools ||= School.school_hash + @schools ||= School.school_by_dese_id end def genders diff --git a/app/services/dese/loader.rb b/app/services/dese/loader.rb index 8f2f76bc..9ad73337 100644 --- a/app/services/dese/loader.rb +++ b/app/services/dese/loader.rb @@ -4,11 +4,12 @@ module Dese def self.load_data(filepath:) admin_data_values = [] @memo = Hash.new + schools = School.school_by_dese_id CSV.parse(File.read(filepath), headers: true) do |row| score = likert_score(row:) next unless valid_likert_score(likert_score: score) - admin_data_values << create_admin_data_value(row:, score:) + admin_data_values << create_admin_data_value(row:, score:, schools:) end AdminDataValue.import(admin_data_values.flatten.compact, batch_size: 1_000, on_duplicate_key_update: :all) @@ -38,21 +39,18 @@ module Dese end # these three methods do the memoization - def self.find_school(dese_id:) - return @memo["school"+dese_id] if @memo.key? "school"+dese_id - @memo["school"+dese_id] ||= School.find_by_dese_id(dese_id.to_i) - end def self.find_admin_data_item(admin_data_item_id:) - return @memo["admin"+admin_data_item_id] if @memo.key? "admin"+admin_data_item_id - @memo["admin"+admin_data_item_id] ||= AdminDataItem.find_by_admin_data_item_id(admin_data_item_id) + return @memo["admin" + admin_data_item_id] if @memo.key?("admin" + admin_data_item_id) + @memo["admin" + admin_data_item_id] ||= AdminDataItem.find_by_admin_data_item_id(admin_data_item_id) end + def self.find_ay(ay:) - return @memo["year"+ay] if @memo.key? "year"+ay - @memo["year"+ay] ||= AcademicYear.find_by_range(ay) + return @memo["year" + ay] if @memo.key?("year" + ay) + @memo["year" + ay] ||= AcademicYear.find_by_range(ay) end - def self.create_admin_data_value(row:, score:) - school = find_school(dese_id: dese_id(row:)) + def self.create_admin_data_value(row:, score:, schools:) + school = schools[dese_id(row:).to_i] admin_data_item_id = admin_data_item(row:) admin_data_item = find_admin_data_item(admin_data_item_id:) academic_year = find_ay(ay: ay(row:)) @@ -71,7 +69,7 @@ module Dese likert_score: score, academic_year:, school:, - admin_data_item:, + admin_data_item: ) end end diff --git a/app/services/survey_responses_data_loader.rb b/app/services/survey_responses_data_loader.rb index f5f27a1b..0b0d0acc 100644 --- a/app/services/survey_responses_data_loader.rb +++ b/app/services/survey_responses_data_loader.rb @@ -11,7 +11,12 @@ class SurveyResponsesDataLoader survey_item_responses = CSV.parse(lines.join, headers:).map do |row| process_row(row: SurveyItemValues.new(row:, headers: headers_array, survey_items: all_survey_items, schools:)) end - SurveyItemResponse.import survey_item_responses.compact.flatten, batch_size: 500, on_duplicate_key_update: :all + + SurveyItemResponse.import( + survey_item_responses.compact.flatten, + batch_size: 500, + on_duplicate_key_update: :all + ) end end end @@ -28,24 +33,25 @@ class SurveyResponsesDataLoader next unless line.present? CSV.parse(line, headers:).map do |row| - survey_item_responses << process_row(row: SurveyItemValues.new(row:, headers: headers_array, survey_items: all_survey_items, schools:)) + survey_item_responses << + process_row(row: SurveyItemValues.new(row:, headers: headers_array, survey_items: all_survey_items, schools:)) end row_count += 1 next unless row_count == 500 - SurveyItemResponse.import survey_item_responses.compact.flatten, batch_size: 500, on_duplicate_key_update: :all + SurveyItemResponse.import(survey_item_responses.compact.flatten, batch_size: 500, on_duplicate_key_update: :all) survey_item_responses = [] row_count = 0 end - SurveyItemResponse.import survey_item_responses.compact.flatten, batch_size: 500, on_duplicate_key_update: :all + SurveyItemResponse.import(survey_item_responses.compact.flatten, batch_size: 500, on_duplicate_key_update: :all) end private def schools - @schools = School.school_hash + @schools = School.school_by_dese_id end def genders @@ -78,19 +84,26 @@ class SurveyResponsesDataLoader def process_survey_items(row:) student = Student.find_or_create_by(response_id: row.response_id, lasid: row.lasid) student.races.delete_all - tmp_races = row.races.map do |race| races[race] end + tmp_races = row.races.map do |race| + races[race] + end + student.races += tmp_races - row.survey_items.map do |survey_item| - likert_score = row.likert_score(survey_item_id: survey_item.survey_item_id) || next + row + .survey_items + .map do |survey_item| + likert_score = row.likert_score(survey_item_id: survey_item.survey_item_id) || next + + unless likert_score.valid_likert_score? + puts("Response ID: #{row.response_id}, Likert score: #{likert_score} rejected") unless likert_score == "NA" + next + end - unless likert_score.valid_likert_score? - puts "Response ID: #{row.response_id}, Likert score: #{likert_score} rejected" unless likert_score == "NA" - next + response = row.survey_item_response(survey_item:) + create_or_update_response(survey_item_response: response, likert_score:, row:, survey_item:, student:) end - response = row.survey_item_response(survey_item:) - create_or_update_response(survey_item_response: response, likert_score:, row:, survey_item:, student:) - end.compact + .compact end def create_or_update_response(survey_item_response:, likert_score:, row:, survey_item:, student:) @@ -111,8 +124,20 @@ class SurveyResponsesDataLoader survey_item_response.student = student survey_item_response else - SurveyItemResponse.new(response_id: row.response_id, academic_year: row.academic_year, school: row.school, survey_item:, - likert_score:, grade:, gender:, recorded_date: row.recorded_date, income:, ell:, sped:, student:) + SurveyItemResponse.new( + response_id: row.response_id, + academic_year: row.academic_year, + school: row.school, + survey_item:, + likert_score:, + grade:, + gender:, + recorded_date: row.recorded_date, + income:, + ell:, + sped:, + student: + ) end end @@ -121,16 +146,18 @@ class SurveyResponsesDataLoader end def get_survey_item_ids_from_headers(headers:) - CSV.parse(headers).first - .filter(&:present?) - .filter { |header| header.start_with? "t-", "s-" } + CSV + .parse(headers) + .first + .filter(&:present?) + .filter { |header| header.start_with?("t-", "s-") } end end module StringMonkeyPatches def valid_likert_score? - to_i.between? 1, 5 + to_i.between?(1, 5) end end -String.include StringMonkeyPatches +String.include(StringMonkeyPatches) diff --git a/lib/tasks/data.rake b/lib/tasks/data.rake index 44ed6790..d2f45024 100644 --- a/lib/tasks/data.rake +++ b/lib/tasks/data.rake @@ -1,80 +1,100 @@ require "csv" -namespace :data do - desc "load survey responses" - task load_survey_responses: :environment do +namespace(:data) do + desc("load survey responses") + task(load_survey_responses: :environment) do survey_item_response_count = SurveyItemResponse.count student_count = Student.count path = "/data/survey_responses/clean/" Sftp::Directory.open(path:) do |file| SurveyResponsesDataLoader.new.from_file(file:) end - puts "=====================> Completed loading #{SurveyItemResponse.count} survey responses" + + puts( + "=====================> Completed loading #{SurveyItemResponse.count - survey_item_response_count} survey responses. #{SurveyItemResponse.count} total responses in the database" + ) Rails.cache.clear end - desc "load survey responses from a specific directory" - task load_survey_responses_from_path: :environment do + desc("load survey responses from a specific directory") + task(load_survey_responses_from_path: :environment) do survey_item_response_count = SurveyItemResponse.count student_count = Student.count - path = "#{ENV['SFTP_PATH']}" + path = "#{ENV["SFTP_PATH"]}" Sftp::Directory.open(path:) do |file| SurveyResponsesDataLoader.new.from_file(file:) end - puts "=====================> Completed loading #{SurveyItemResponse.count - survey_item_response_count} survey responses. #{SurveyItemResponse.count} total responses in the database" + + puts( + "=====================> Completed loading #{SurveyItemResponse.count - survey_item_response_count} survey responses. #{SurveyItemResponse.count} total responses in the database" + ) Rails.cache.clear end - desc "reset response rate values" - task reset_response_rates: :environment do - puts "Resetting response rates" + desc("reset response rate values") + task(reset_response_rates: :environment) do + puts("Resetting response rates") ResponseRateLoader.reset Rails.cache.clear - puts "=====================> Completed loading #{ResponseRate.count} survey responses" + puts("=====================> Completed loading #{ResponseRate.count} survey responses") end - desc "reset race score calculations" - task reset_race_scores: :environment do - puts "Resetting race scores" + desc("reset race score calculations") + task(reset_race_scores: :environment) do + puts("Resetting race scores") RaceScoreLoader.reset(fast_processing: false) Rails.cache.clear - puts "=====================> Completed loading #{RaceScore.count} survey responses" + puts("=====================> Completed loading #{RaceScore.count} survey responses") end - desc "load admin_data" - task load_admin_data: :environment do + desc("load admin_data") + task(load_admin_data: :environment) do original_count = AdminDataValue.count + pool_size = 2 jobs = Queue.new Dir.glob(Rails.root.join("data", "admin_data", "dese", "*.csv")).each { |filepath| jobs << filepath } - while filepath = jobs.pop(true) - puts "=====================> Loading data from csv at path: #{filepath}" - Dese::Loader.load_data filepath: + workers = pool_size.times.map do + Thread.new do + + while filepath = jobs.pop(true) + puts("=====================> Loading data from csv at path: #{filepath}") + Dese::Loader.load_data(filepath:) + end + + rescue ThreadError + end end - rescue ThreadError - puts "=====================> Completed loading #{AdminDataValue.count - original_count} admin data values" + + workers.each(&:join) + + puts("=====================> Completed loading #{AdminDataValue.count - original_count} admin data values") end - desc "reset all cache counters" - task reset_cache_counters: :environment do - puts "=====================> Resetting Category counters" + desc("reset all cache counters") + task(reset_cache_counters: :environment) do + puts("=====================> Resetting Category counters") Category.all.each do |category| Category.reset_counters(category.id, :subcategories) end - puts "=====================> Resetting Subcategory counters" + + puts("=====================> Resetting Subcategory counters") Subcategory.all.each do |subcategory| Subcategory.reset_counters(subcategory.id, :measures) end - puts "=====================> Resetting Measure counters" + + puts("=====================> Resetting Measure counters") Measure.all.each do |measure| Measure.reset_counters(measure.id, :scales) end - puts "=====================> Resetting Scale counters" + + puts("=====================> Resetting Scale counters") Scale.all.each do |scale| Scale.reset_counters(scale.id, :survey_items) end - puts "=====================> Resetting SurveyItem counters" + + puts("=====================> Resetting SurveyItem counters") SurveyItem.all.each do |survey_item| SurveyItem.reset_counters(survey_item.id, :survey_item_responses) end diff --git a/spec/services/cleaner_spec.rb b/spec/services/cleaner_spec.rb index 92b19a7c..e177e1b4 100644 --- a/spec/services/cleaner_spec.rb +++ b/spec/services/cleaner_spec.rb @@ -153,7 +153,7 @@ RSpec.describe Cleaner do survey_items = SurveyItem.where(survey_item_id: standard_survey_items) data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: standard_survey_items, survey_items:, - schools: School.school_hash)] + schools: School.school_by_dese_id)] filename = Cleaner.new(input_filepath:, output_filepath:, log_filepath:).filename( headers: standard_survey_items, data:, filepath: nil ) @@ -165,7 +165,7 @@ RSpec.describe Cleaner do survey_items = SurveyItem.where(survey_item_id: short_form_survey_items) data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: short_form_survey_items, survey_items:, - schools: School.school_hash)] + schools: School.school_by_dese_id)] filename = Cleaner.new(input_filepath:, output_filepath:, log_filepath:).filename( headers: short_form_survey_items, data:, filepath: nil ) @@ -178,7 +178,7 @@ RSpec.describe Cleaner do survey_items = SurveyItem.where(survey_item_id: early_education_survey_items) data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: early_education_survey_items, survey_items:, - schools: School.school_hash)] + schools: School.school_by_dese_id)] filename = Cleaner.new(input_filepath:, output_filepath:, log_filepath:).filename( headers: early_education_survey_items, data:, filepath: nil ) @@ -190,7 +190,7 @@ RSpec.describe Cleaner do survey_items = SurveyItem.where(survey_item_id: teacher_survey_items) data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: teacher_survey_items, survey_items:, - schools: School.school_hash)] + schools: School.school_by_dese_id)] filename = Cleaner.new(input_filepath:, output_filepath:, log_filepath:).filename( headers: teacher_survey_items, data:, filepath: nil ) @@ -202,9 +202,9 @@ RSpec.describe Cleaner do it "adds all districts to the filename" do survey_items = SurveyItem.where(survey_item_id: teacher_survey_items) - data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: teacher_survey_items, survey_items:, schools: School.school_hash), + data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: teacher_survey_items, survey_items:, schools: School.school_by_dese_id), SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "222_222" }, - headers: teacher_survey_items, survey_items:, schools: School.school_hash)] + headers: teacher_survey_items, survey_items:, schools: School.school_by_dese_id)] filename = Cleaner.new(input_filepath:, output_filepath:, log_filepath:).filename( headers: teacher_survey_items, data:, filepath: nil ) @@ -217,7 +217,7 @@ RSpec.describe Cleaner do survey_items = SurveyItem.where(survey_item_id: early_education_survey_items) data = [SurveyItemValues.new(row: { "Recorded Date" => recorded_date, "Dese ID" => "1_740_505" }, headers: early_education_survey_items, survey_items:, - schools: School.school_hash)] + schools: School.school_by_dese_id)] filename = Cleaner.new(input_filepath:, output_filepath:, log_filepath:).filename( headers: early_education_survey_items, data:, filepath: "/data/survey_responses/maynard early ed_ part a.2022-23.csv" ) diff --git a/spec/services/survey_item_values_spec.rb b/spec/services/survey_item_values_spec.rb index 1882c755..e5cf625f 100644 --- a/spec/services/survey_item_values_spec.rb +++ b/spec/services/survey_item_values_spec.rb @@ -35,7 +35,7 @@ RSpec.describe SurveyItemValues, type: :model do let(:attleboro_respondents) do create(:respondent, school: attleboro, academic_year: ay_2022_23, nine: 40, ten: 40, eleven: 40, twelve: 40) end - let(:schools) { School.school_hash } + let(:schools) { School.school_by_dese_id } let(:recorded_date) { "2023-04-01T12:12:12" } let(:ay_2022_23) do create(:academic_year, range: "2022-23") @@ -789,7 +789,7 @@ RSpec.describe SurveyItemValues, type: :model do it "returns false" do headers = %w[s-sbel-q5 s-phys-q2 grade RecordedDate DeseID] values = SurveyItemValues.new(row: { "grade" => "2", "RecordedDate" => recorded_date, "DeseID" => "1234" }, headers:, survey_items:, - schools: School.school_hash) + schools: School.school_by_dese_id) expect(values.valid_grade?).to eq false end end @@ -800,13 +800,13 @@ RSpec.describe SurveyItemValues, type: :model do it "returns true for student questions" do headers = %w[s-sbel-q5 s-phys-q1 s-phys-q2 RecordedDate] values = SurveyItemValues.new(row: { "RecordedDate" => recorded_date, "Dese ID" => "1234", "s-sbel-q5" => "1", "s-phys-q1" => "", "s-phys-q2" => "5" }, headers:, survey_items:, - schools: School.school_hash) + schools: School.school_by_dese_id) expect(values.valid_sd?).to eq true end it "returns true for teacher questions" do headers = %w[t-sbel-q5 t-phys-q2] values = SurveyItemValues.new(row: { "RecordedDate" => recorded_date, "Dese ID" => "1234", "t-sbel-q5" => "1", "t-phys-q2" => "5" }, headers:, survey_items:, - schools: School.school_hash) + schools: School.school_by_dese_id) expect(values.valid_sd?).to eq true end end @@ -815,13 +815,13 @@ RSpec.describe SurveyItemValues, type: :model do it "returns false for student questions" do headers = %w[s-sbel-q5 s-phys-q1 s-phys-q2 RecordedDate] values = SurveyItemValues.new(row: { "RecordedDate" => recorded_date, "Dese ID" => "1234", "s-sbel-q5" => "1", "s-phys-q2" => "1" }, headers:, survey_items:, - schools: School.school_hash) + schools: School.school_by_dese_id) expect(values.valid_sd?).to eq false end it "returns false for teacher questions" do headers = %w[t-sbel-q5 t-phys-q1 t-phys-q2 RecordedDate] values = SurveyItemValues.new(row: { "RecordedDate" => recorded_date, "Dese ID" => "1234", "t-sbel-q5" => "1", "t-phys-q2" => "1" }, headers:, survey_items:, - schools: School.school_hash) + schools: School.school_by_dese_id) expect(values.valid_sd?).to eq false end end