Skip to content

Commit 5ba1c27

Browse files
committed
Handle on_failure and on_success
* on_failure fires the first time any of the jobs fail, even once * on_success only fires if all jobs work (after retries) * remove unused job_id
1 parent 4251685 commit 5ba1c27

File tree

4 files changed

+51
-17
lines changed

4 files changed

+51
-17
lines changed

app/models/solid_queue/job_batch.rb

+41-12
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ class JobBatch < Record
77

88
serialize :on_finish_active_job, coder: JSON
99
serialize :on_success_active_job, coder: JSON
10+
serialize :on_failure_active_job, coder: JSON
1011

1112
scope :incomplete, -> {
1213
where(finished_at: nil).where("changed_at IS NOT NULL OR last_changed_at < ?", 1.hour.ago)
1314
}
15+
scope :finished, -> { where.not(finished_at: nil) }
1416

1517
class << self
1618
def current_batch_id
@@ -45,6 +47,7 @@ def dispatch_finished_batches
4547
def batch_attributes(attributes)
4648
on_finish_klass = attributes.delete(:on_finish)
4749
on_success_klass = attributes.delete(:on_success)
50+
on_failure_klass = attributes.delete(:on_failure)
4851

4952
if on_finish_klass.present?
5053
attributes[:on_finish_active_job] = as_active_job(on_finish_klass).serialize
@@ -54,6 +57,10 @@ def batch_attributes(attributes)
5457
attributes[:on_success_active_job] = as_active_job(on_success_klass).serialize
5558
end
5659

60+
if on_failure_klass.present?
61+
attributes[:on_failure_active_job] = as_active_job(on_failure_klass).serialize
62+
end
63+
5764
attributes
5865
end
5966

@@ -69,29 +76,51 @@ def finished?
6976
def finish
7077
return if finished?
7178
reset_changed_at
72-
jobs.find_each do |next_job|
73-
# FIXME: If it's failed but is going to retry, how do we know?
74-
# Because we need to know if we will determine what the failed execution means
75-
# FIXME: use "success" vs "finish" vs "discard" `completion_type` to determine
76-
# how to analyze each job
77-
return unless next_job.finished?
78-
end
7979

80+
all_jobs_succeeded = true
8081
attrs = {}
82+
jobs.find_each do |next_job|
83+
# SolidQueue does treats `discard_on` differently than failures. The job will report as being :finished,
84+
# and there is no record of the failure.
85+
# GoodJob would report a discard as an error. It's possible we should do that in the future?
86+
if fire_failure_job?(next_job)
87+
perform_completion_job(:on_failure_active_job, attrs)
88+
update!(attrs)
89+
end
90+
91+
status = next_job.status
92+
all_jobs_succeeded = all_jobs_succeeded && status != :failed
93+
return unless status.in?([ :finished, :failed ])
94+
end
8195

8296
if on_finish_active_job.present?
83-
active_job = ActiveJob::Base.deserialize(on_finish_active_job)
84-
active_job.send(:deserialize_arguments_if_needed)
85-
active_job.arguments = [ self ] + Array.wrap(active_job.arguments)
86-
ActiveJob.perform_all_later([ active_job ])
87-
attrs[:job] = Job.find_by(active_job_id: active_job.job_id)
97+
perform_completion_job(:on_finish_active_job, attrs)
98+
end
99+
100+
if on_success_active_job.present? && all_jobs_succeeded
101+
perform_completion_job(:on_success_active_job, attrs)
88102
end
89103

90104
update!({ finished_at: Time.zone.now }.merge(attrs))
91105
end
92106

93107
private
94108

109+
def fire_failure_job?(job)
110+
return false if on_failure_active_job.blank? || job.failed_execution.blank?
111+
job = ActiveJob::Base.deserialize(on_failure_active_job)
112+
job.provider_job_id.blank?
113+
end
114+
115+
def perform_completion_job(job_field, attrs)
116+
active_job = ActiveJob::Base.deserialize(send(job_field))
117+
active_job.send(:deserialize_arguments_if_needed)
118+
active_job.arguments = [ self ] + Array.wrap(active_job.arguments)
119+
ActiveJob.perform_all_later([ active_job ])
120+
active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id
121+
attrs[job_field] = active_job.serialize
122+
end
123+
95124
def reset_changed_at
96125
if changed_at.blank? && last_changed_at.present?
97126
update_columns(last_changed_at: Time.zone.now) # wait another hour before we check again

db/migrate/20240131013203_create_solid_queue_batch_table.rb

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
class CreateSolidQueueBatchTable < ActiveRecord::Migration[7.1]
22
def change
33
create_table :solid_queue_job_batches do |t|
4-
t.references :job, index: { unique: true }
54
t.text :on_finish_active_job
65
t.text :on_success_active_job
6+
t.text :on_failure_active_job
77
t.datetime :finished_at
88
t.datetime :changed_at
99
t.datetime :last_changed_at
@@ -16,6 +16,5 @@ def change
1616

1717
add_reference :solid_queue_jobs, :batch, index: true
1818
add_foreign_key :solid_queue_jobs, :solid_queue_job_batches, column: :batch_id, on_delete: :cascade
19-
add_foreign_key :solid_queue_job_batches, :solid_queue_jobs, column: :job_id
2019
end
2120
end

test/dummy/db/schema.rb

+1-3
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,16 @@
4747
end
4848

4949
create_table "solid_queue_job_batches", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
50-
t.bigint "job_id"
5150
t.text "on_finish_active_job"
5251
t.text "on_success_active_job"
52+
t.text "on_failure_active_job"
5353
t.datetime "finished_at"
5454
t.datetime "changed_at"
5555
t.datetime "last_changed_at"
5656
t.datetime "created_at", null: false
5757
t.datetime "updated_at", null: false
5858
t.index ["changed_at"], name: "index_solid_queue_job_batches_on_changed_at"
5959
t.index ["finished_at"], name: "index_solid_queue_job_batches_on_finished_at"
60-
t.index ["job_id"], name: "index_solid_queue_job_batches_on_job_id", unique: true
6160
t.index ["last_changed_at"], name: "index_solid_queue_job_batches_on_last_changed_at"
6261
end
6362

@@ -160,7 +159,6 @@
160159
add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
161160
add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
162161
add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
163-
add_foreign_key "solid_queue_job_batches", "solid_queue_jobs", column: "job_id"
164162
add_foreign_key "solid_queue_jobs", "solid_queue_job_batches", column: "batch_id", on_delete: :cascade
165163
add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
166164
add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade

test/test_helpers/jobs_test_helper.rb

+8
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@ def wait_for_jobs_to_finish_for(timeout = 1.second, except: [])
99
end
1010
end
1111

12+
def wait_for_job_batches_to_finish_for(timeout = 1.second)
13+
wait_while_with_timeout(timeout) do
14+
skip_active_record_query_cache do
15+
SolidQueue::JobBatch.where(finished_at: nil).any?
16+
end
17+
end
18+
end
19+
1220
def assert_no_unfinished_jobs
1321
skip_active_record_query_cache do
1422
assert SolidQueue::Job.where(finished_at: nil).none?

0 commit comments

Comments
 (0)