From 6b75af4c7de397a03384cbbe42ff9e73c2c9f414 Mon Sep 17 00:00:00 2001 From: rebuilt Date: Wed, 17 Sep 2025 21:57:34 -0700 Subject: [PATCH] fix: make survey data uploads threaded --- app/services/survey_responses_data_loader.rb | 37 ++++++++++++++------ 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/app/services/survey_responses_data_loader.rb b/app/services/survey_responses_data_loader.rb index a86bda9d..e54bab1e 100644 --- a/app/services/survey_responses_data_loader.rb +++ b/app/services/survey_responses_data_loader.rb @@ -28,7 +28,6 @@ class SurveyResponsesDataLoader headers_array = CSV.parse(headers).first all_survey_items = survey_items(headers:) - survey_item_responses = [] batch_size = 1000 lines = [] @@ -38,24 +37,42 @@ class SurveyResponsesDataLoader lines << line end + slices = [] lines.each_slice(batch_size) do |slice| - slice.each do |line| - CSV.parse(line, headers:).map do |row| - row = process_row(row: SurveyItemValues.new(row:, headers: headers_array, - survey_items: all_survey_items, schools:, academic_years:)) + slices << slice + end - survey_item_responses.concat(row) unless row.nil? - end + pool_size = 4 + jobs = Queue.new + slices.each { |slice| jobs << slice } + + workers = pool_size.times.map do + Thread.new do + while slice = jobs.pop(true) + + slice.each do |line| + survey_item_responses = [] + CSV.parse(line, headers:).map do |row| + row = process_row(row: SurveyItemValues.new(row:, headers: headers_array, + survey_items: all_survey_items, schools:, academic_years:)) - survey_item_responses = survey_item_responses.compact.flatten + survey_item_responses.concat(row) unless row.nil? + end - SurveyItemResponse.import(survey_item_responses, batch_size:, on_duplicate_key_update: :all) + survey_item_responses = survey_item_responses.compact.flatten - survey_item_responses = [] + SurveyItemResponse.import(survey_item_responses, batch_size:, on_duplicate_key_update: :all) + + survey_item_responses = [] + end + end + rescue ThreadError end end + workers.each(&:join) + GC.start end