@@ -5,6 +5,7 @@ class JobBatch < Record
5
5
belongs_to :job , foreign_key : :job_id , optional : true
6
6
belongs_to :parent_job_batch , foreign_key : :parent_job_batch_id , class_name : "SolidQueue::JobBatch" , optional : true
7
7
has_many :jobs , foreign_key : :batch_id
8
+ has_many :children , foreign_key : :parent_job_batch_id , class_name : "SolidQueue::JobBatch"
8
9
9
10
serialize :on_finish_active_job , coder : JSON
10
11
serialize :on_success_active_job , coder : JSON
@@ -21,28 +22,33 @@ def current_batch_id
21
22
end
22
23
23
24
def enqueue ( attributes = { } )
24
- previous_batch_id = current_batch_id . presence || nil
25
-
26
25
job_batch = nil
27
26
transaction do
28
27
job_batch = create! ( batch_attributes ( attributes ) )
29
- ActiveSupport ::IsolatedExecutionState [ :current_batch_id ] = job_batch . id
30
- yield job_batch
28
+ wrap_in_batch_context ( job_batch . id ) do
29
+ yield job_batch
30
+ end
31
31
end
32
32
33
33
job_batch
34
- ensure
35
- ActiveSupport ::IsolatedExecutionState [ :current_batch_id ] = previous_batch_id
36
34
end
37
35
38
36
def dispatch_finished_batches
39
37
incomplete . order ( :id ) . pluck ( :id ) . each do |id |
40
38
transaction do
41
- where ( id : id ) . non_blocking_lock . each ( &:finish )
39
+ where ( id : id ) . includes ( :children , :jobs ) . non_blocking_lock . each ( &:finish )
42
40
end
43
41
end
44
42
end
45
43
44
+ def wrap_in_batch_context ( batch_id )
45
+ previous_batch_id = current_batch_id . presence || nil
46
+ ActiveSupport ::IsolatedExecutionState [ :current_batch_id ] = batch_id
47
+ yield
48
+ ensure
49
+ ActiveSupport ::IsolatedExecutionState [ :current_batch_id ] = previous_batch_id
50
+ end
51
+
46
52
private
47
53
48
54
def batch_attributes ( attributes )
@@ -62,6 +68,8 @@ def batch_attributes(attributes)
62
68
attributes [ :on_failure_active_job ] = as_active_job ( on_failure_klass ) . serialize
63
69
end
64
70
71
+ attributes [ :parent_job_batch_id ] = current_batch_id if current_batch_id . present?
72
+
65
73
attributes
66
74
end
67
75
@@ -74,16 +82,13 @@ def as_active_job(active_job_klass)
74
82
def enqueue ( attributes = { } )
75
83
raise "You cannot enqueue a batch that is already finished" if finished?
76
84
77
- previous_batch_id = self . class . current_batch_id . presence || nil
78
-
79
85
transaction do
80
- ActiveSupport ::IsolatedExecutionState [ :current_batch_id ] = id
81
- yield self
86
+ self . class . wrap_in_batch_context ( id ) do
87
+ yield self
88
+ end
82
89
end
83
90
84
91
self
85
- ensure
86
- ActiveSupport ::IsolatedExecutionState [ :current_batch_id ] = previous_batch_id
87
92
end
88
93
89
94
def finished?
@@ -110,6 +115,10 @@ def finish
110
115
return unless status . in? ( [ :finished , :failed ] )
111
116
end
112
117
118
+ children . find_each do |child |
119
+ return unless child . finished?
120
+ end
121
+
113
122
if on_finish_active_job . present?
114
123
perform_completion_job ( :on_finish_active_job , attrs )
115
124
end
@@ -118,7 +127,10 @@ def finish
118
127
perform_completion_job ( :on_success_active_job , attrs )
119
128
end
120
129
121
- update! ( { finished_at : Time . zone . now } . merge ( attrs ) )
130
+ transaction do
131
+ parent_job_batch . touch ( :changed_at , :last_changed_at ) if parent_job_batch_id . present?
132
+ update! ( { finished_at : Time . zone . now } . merge ( attrs ) )
133
+ end
122
134
end
123
135
124
136
private
@@ -133,7 +145,9 @@ def perform_completion_job(job_field, attrs)
133
145
active_job = ActiveJob ::Base . deserialize ( send ( job_field ) )
134
146
active_job . send ( :deserialize_arguments_if_needed )
135
147
active_job . arguments = [ self ] + Array . wrap ( active_job . arguments )
136
- ActiveJob . perform_all_later ( [ active_job ] )
148
+ self . class . wrap_in_batch_context ( id ) do
149
+ ActiveJob . perform_all_later ( [ active_job ] )
150
+ end
137
151
active_job . provider_job_id = Job . find_by ( active_job_id : active_job . job_id ) . id
138
152
attrs [ job_field ] = active_job . serialize
139
153
end
0 commit comments