diff --git a/app/controllers/oai_controller.rb b/app/controllers/oai_controller.rb index f27879ad..c5967fe4 100644 --- a/app/controllers/oai_controller.rb +++ b/app/controllers/oai_controller.rb @@ -155,23 +155,14 @@ def normalized_dumps(set, from_date, until_date) end # get all dumps in this stream, optionally between two dates; error if none - dumps = filter_dumps(streams, from_date, until_date) + dumps = streams.map do |stream| + stream.current_dumps(from_date: from_date, until_date: until_date) + end raise OaiConcern::NoRecordsMatch if dumps.empty? dumps end - def filter_dumps(streams, from_date, until_date) - # get candidate dumps (the current full dump and its deltas for each stream) - dumps = streams.flat_map(&:current_dumps).sort_by(&:created_at) - - # filter candidate dumps (by from date and until date) - dumps = dumps.select { |dump| dump.created_at >= Time.zone.parse(from_date).beginning_of_day } if from_date.present? - dumps = dumps.select { |dump| dump.created_at <= Time.zone.parse(until_date).end_of_day } if until_date.present? - - dumps - end - # Wrap the provided Nokogiri::XML::Builder block in an OAI-PMH response # See http://www.openarchives.org/OAI/openarchivesprotocol.html#XMLResponse def build_oai_response(xml, params) diff --git a/app/jobs/generate_delta_dump_job.rb b/app/jobs/generate_delta_dump_job.rb index f59b0239..e7c29843 100644 --- a/app/jobs/generate_delta_dump_job.rb +++ b/app/jobs/generate_delta_dump_job.rb @@ -1,92 +1,30 @@ # frozen_string_literal: true ## -# Background job to create a delta dump download for a resource (organization) -class GenerateDeltaDumpJob < ApplicationJob - with_job_tracking - +# Background job to create delta (changes/deletes) files for an organization +class GenerateDeltaDumpJob < GenerateDumpJob def self.enqueue_all - Organization.find_each { |org| GenerateDeltaDumpJob.perform_later(org) } + Organization.each { |org| perform_later(org) } end - # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity - def perform(organization) - now = Time.zone.now - full_dump = organization.default_stream.current_full_dump - return unless full_dump - - from = full_dump.last_delta_dump_at - - uploads = organization.default_stream.uploads.active.where(created_at: from...now) - - return unless uploads.any? - - uploads.where.not(status: 'processed').each do |upload| - ExtractMarcRecordMetadataJob.perform_now(upload) - end - - progress.total = uploads.sum(&:marc_records_count) - - delta_dump = full_dump.deltas.create(stream_id: full_dump.stream_id) - base_name = "#{organization.slug}-#{Time.zone.today}-delta" - writer = MarcRecordWriterService.new(base_name) - oai_file_counter = 0 - - begin - NormalizedMarcRecordReader.new(uploads).each_slice(Settings.oai_max_page_size) do |records| - oai_writer = OaiMarcRecordWriterService.new(base_name) - records.each do |record| - if record.status == 'delete' - writer.write_delete(record) - oai_writer.write_delete(record) - else - writer.write_marc_record(record) - oai_writer.write_marc_record(record) - end - end - oai_writer.finalize - delta_dump.public_send(:oai_xml).attach(io: File.open(oai_writer.oai_file), - filename: human_readable_filename(base_name, :oai_xml, oai_file_counter)) - - oai_file_counter += 1 - progress.increment(records.length) - ensure - oai_writer.close - oai_writer.unlink - end - - writer.finalize - - writer.files.each do |as, file| - delta_dump.public_send(as).attach(io: File.open(file), - filename: human_readable_filename(base_name, as)) - end + private - delta_dump.save! - full_dump.update(last_delta_dump_at: now) - ensure - writer.close - writer.unlink - end + def full_dump + @organization.default_stream.current_full_dump end - # rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity - private + def dump + @dump ||= full_dump.deltas.create(stream_id: full_dump.stream_id) + end - def human_readable_filename(base_name, file_type, counter = nil) - as = case file_type - when :deletes - 'deletes.del.txt' - when :marc21 - 'marc21.mrc.gz' - when :marcxml - 'marcxml.xml.gz' - when :oai_xml - "oai-#{format('%010d', counter)}.xml.gz" - else - "#{file_type}.gz" - end + # Only process uploads added to the stream since the last delta dump + def uploads + @organization.default_stream.uploads + .active + .where(created_at: full_dump.last_delta_dump_at...Time.zone.now) + end - "#{base_name}-#{as}" + def base_name + "#{@organization.slug}-#{Time.zone.today}-delta" end end diff --git a/app/jobs/generate_dump_job.rb b/app/jobs/generate_dump_job.rb new file mode 100644 index 00000000..11b89c1b --- /dev/null +++ b/app/jobs/generate_dump_job.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +## +# Background job to create downloadable files representing an organization's stream +class GenerateDumpJob < ApplicationJob + with_job_tracking + + def perform(organization) + @organization = organization + return unless full_dump && uploads.any? + + # ensure all uploads have been processed before starting + uploads.where.not(status: 'processed').each do |upload| + ExtractMarcRecordMetadataJob.perform_now(upload) + end + + write_files + + dump.save! + full_dump.update(last_delta_dump_at: Time.zone.now) + end + + private + + # Write all MARC records to tempfiles using the configured writers + # rubocop:disable Metrics/AbcSize + # rubocop:disable Metrics/CyclomaticComplexity + def write_files + progress.total = uploads.sum(&:marc_records_count) + + NormalizedMarcRecordReader.new(uploads).each_slice(100) do |records| + records.each { |record| write_record(record) } + progress.increment(records.length) + end + + writers.each(&:finalize) + writers.each { |writer| writer.attach_files_to_dump(dump, base_name) } + ensure + writers.each(&:close) + writers.each(&:unlink) + end + # rubocop:enable Metrics/AbcSize + # rubocop:enable Metrics/CyclomaticComplexity + + # Write a single MARC record/delete using each writer + def write_record(record) + if record.status == 'delete' + writers.each { |writer| writer.write_delete(record) } + else + writers.each { |writer| writer.write_marc_record(record) } + end + end + + # Services that support #write_marc_record and #write_delete + def writers + @writers ||= [ + MarcRecordWriterService.new(base_name), + OaiMarcRecordWriterService.new(base_name) + ] + end +end diff --git a/app/jobs/generate_full_dump_job.rb b/app/jobs/generate_full_dump_job.rb index 112181e6..07420ced 100644 --- a/app/jobs/generate_full_dump_job.rb +++ b/app/jobs/generate_full_dump_job.rb @@ -1,93 +1,46 @@ # frozen_string_literal: true ## -# Background job to create a full dump download for a resource (organization) -class GenerateFullDumpJob < ApplicationJob - with_job_tracking +# Background job to create full stream files for an organization +class GenerateFullDumpJob < GenerateDumpJob + after_perform do |job| + GenerateDeltaDumpJob.perform_later(*job.arguments) + end + # Only process organizations with changes since last full dump def self.enqueue_all - Organization.find_each do |org| + organizations = Organization.select do |org| full_dump = org.default_stream.normalized_dumps.last next if full_dump && org.default_stream.uploads.where(updated_at: full_dump.last_full_dump_at...Time.zone.now).none? - - GenerateFullDumpJob.perform_later(org) - end - end - - # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity - def perform(organization) - now = Time.zone.now - uploads = Upload.active.where(stream: organization.default_stream) - - uploads.where.not(status: 'processed').each do |upload| - ExtractMarcRecordMetadataJob.perform_now(upload) end - progress.total = uploads.sum(&:marc_records_count) - - full_dump = organization.default_stream.normalized_dumps.build(last_full_dump_at: now, last_delta_dump_at: now) - - base_name = "#{organization.slug}-#{Time.zone.today}-full" - writer = MarcRecordWriterService.new(base_name) - oai_file_counter = 0 - - begin - NormalizedMarcRecordReader.new(uploads).each_slice(Settings.oai_max_page_size) do |records| - oai_writer = OaiMarcRecordWriterService.new(base_name) - records.each do |record| - # In a full dump, we can omit the deletes - next if record.status == 'delete' - - writer.write_marc_record(record) - oai_writer.write_marc_record(record) - end - - if oai_writer.bytes_written? - oai_writer.finalize - full_dump.public_send(:oai_xml).attach(io: File.open(oai_writer.oai_file), - filename: human_readable_filename( - base_name, :oai_xml, oai_file_counter - )) - end + organizations.each { |org| perform_later(org) } + end - oai_file_counter += 1 - progress.increment(records.length) - ensure - oai_writer.finalize - oai_writer.close - oai_writer.unlink - end + private - writer.finalize + # Skip deletes when writing a full dump + def write_record(record) + return if record.status == 'delete' - writer.files.each do |as, file| - full_dump.public_send(as).attach(io: File.open(file), filename: human_readable_filename(base_name, as)) - end + writers.each { |writer| writer.write_marc_record(record) } + end - full_dump.save! + def full_dump + dump + end - GenerateDeltaDumpJob.perform_later(organization) - ensure - writer.close - writer.unlink - end + def dump + @dump ||= @organization.default_stream.normalized_dumps + .create(last_full_dump_at: Time.zone.now, + last_delta_dump_at: Time.zone.now) end - # rubocop:enable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity - def human_readable_filename(base_name, file_type, counter = nil) - as = case file_type - when :deletes - 'deletes.del.txt' - when :marc21 - 'marc21.mrc.gz' - when :marcxml - 'marcxml.xml.gz' - when :oai_xml - "oai-#{format('%010d', counter)}.xml.gz" - else - "#{file_type}.gz" - end + def uploads + @organization.default_stream.uploads.active + end - "#{base_name}-#{as}" + def base_name + "#{@organization.slug}-#{Time.zone.today}-full" end end diff --git a/app/models/stream.rb b/app/models/stream.rb index 13cf28fc..5334d87c 100644 --- a/app/models/stream.rb +++ b/app/models/stream.rb @@ -78,9 +78,15 @@ def current_full_dump normalized_dumps.full_dumps.create(last_delta_dump_at: Time.zone.at(0)) end - # the current full dump and its associated deltas - def current_dumps - [current_full_dump, *current_full_dump.deltas] + # the current full dump and its associated deltas, optionally filtered using + # a start and end date range query + def current_dumps(from_date: nil, until_date: nil) + dumps = normalized_dumps.where(id: current_full_dump.id) + .or(normalized_dumps.where(full_dump_id: current_full_dump.id)) + .order(created_at: :asc) + dumps = dumps.where('created_at >= ?', from_date) if from_date + dumps = dumps.where('created_at <= ?', until_date) if until_date + dumps end # If no datetime is provided then assume we want the previous DefaultStreamHistory diff --git a/app/services/marc_record_writer_service.rb b/app/services/marc_record_writer_service.rb index 31dc9a58..6b9829e6 100644 --- a/app/services/marc_record_writer_service.rb +++ b/app/services/marc_record_writer_service.rb @@ -36,6 +36,12 @@ def unlink @opened_files.each(&:unlink) end + def attach_files_to_dump(dump, base_name) + files.each do |file_type, file| + dump.public_send(file_type).attach(io: File.open(file), filename: human_readable_filename(base_name, file_type)) + end + end + private def write_marc21_record(record) @@ -83,4 +89,19 @@ def split_marc(marc) raise e end + + def human_readable_filename(base_name, file_type) + file_name = case file_type + when :deletes + 'deletes.del.txt' + when :marc21 + 'marc21.mrc.gz' + when :marcxml + 'marcxml.xml.gz' + else + "#{file_type}.gz" + end + + "#{base_name}-#{file_name}" + end end diff --git a/app/services/oai_marc_record_writer_service.rb b/app/services/oai_marc_record_writer_service.rb index dd9a1dec..7dfc822b 100644 --- a/app/services/oai_marc_record_writer_service.rb +++ b/app/services/oai_marc_record_writer_service.rb @@ -1,15 +1,21 @@ # frozen_string_literal: true -# Utility class for serializing MARC records to files +# Utility class for serializing MARC records to OAI-XML files class OaiMarcRecordWriterService - attr_reader :base_name + attr_reader :base_name, :files def initialize(base_name = nil) @base_name = base_name + @files = [] + @records_written = 0 + @oai_writer = OAIPMHWriter.new(Zlib::GzipWriter.new(temp_file)) end def write_marc_record(record) + next_file if @records_written == Settings.oai_max_page_size + oai_writer.write(record.augmented_marc, record.oai_id, record.stream.id, record.upload.created_at) + @records_written += 1 rescue StandardError => e error = "Error writing MARC OAI file #{record.oai_id}: #{e}" Rails.logger.info(error) @@ -17,7 +23,10 @@ def write_marc_record(record) end def write_delete(record) + next_file if @records_written == Settings.oai_max_page_size + oai_writer.write_delete(record.oai_id, record.stream.id, record.upload.created_at) + @records_written += 1 end def finalize @@ -25,25 +34,35 @@ def finalize end def close - @oai_file&.close + @files.each(&:close) end def unlink - @oai_file&.unlink + @files.each(&:unlink) end - def oai_file - @oai_file ||= Tempfile.new("#{base_name}-oai_xml", binmode: true) + def attach_files_to_dump(dump, base_name) + files.each_with_index do |file, counter| + dump.public_send(:oai_xml).attach(io: File.open(file), filename: human_readable_filename(base_name, counter)) + end end - def bytes_written? - @oai_writer&.bytes_written? + private + + def next_file + @oai_writer&.close + @oai_writer = OAIPMHWriter.new(Zlib::GzipWriter.new(temp_file)) + @records_written = 0 end - private + def temp_file + Tempfile.new("#{base_name}-oai_xml", binmode: true).tap do |file| + @files << file + end + end - def oai_writer - @oai_writer ||= OAIPMHWriter.new(Zlib::GzipWriter.new(oai_file)) + def human_readable_filename(base_name, counter = nil) + "#{base_name}-oai-#{format('%010d', counter)}.xml.gz" end # Special logic for writing OAI-PMH-style record responses diff --git a/spec/services/oai_marc_record_writer_service_spec.rb b/spec/services/oai_marc_record_writer_service_spec.rb new file mode 100644 index 00000000..0d1291c5 --- /dev/null +++ b/spec/services/oai_marc_record_writer_service_spec.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe OaiMarcRecordWriterService do + subject(:service) { described_class.new('test') } + + it 'tracks the number of records written' + it 'opens a new file when the count written exceeds the max page size' + it 'can write a marc records as oai xml' + it 'can write a delete record as oai xml' + it 'can attach all files to a dump' + it 'generates human readable filenames' +end