Skip to content

Commit 403a28f

Browse files
authored
buffer: fix emit error of race condition (#4450)
Backported from 13a5199. After 95438b2 (#4342), there is a section where chunks do not have a lock in `write_step_by_step()`. `write_step_by_step()` must ensure their locks until passing them to the block. Otherwise, race condition can occur and it can cause emit error by IOError. Example of warning messages of emit error: [warn]: #0 emit transaction failed: error_class=IOError error="closed stream" location=... [warn]: #0 send an error event stream to @error: error_class=IOError error="closed stream" location=... Signed-off-by: Daijiro Fukuda <[email protected]>
1 parent 1ffa714 commit 403a28f

File tree

2 files changed

+134
-68
lines changed

2 files changed

+134
-68
lines changed

lib/fluent/plugin/buffer.rb

+75-68
Original file line numberDiff line numberDiff line change
@@ -764,94 +764,95 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
764764
while writing_splits_index < splits.size
765765
chunk = get_next_chunk.call
766766
errors = []
767+
# The chunk must be locked until being passed to &block.
768+
chunk.mon_enter
767769
modified_chunks << {chunk: chunk, adding_bytesize: 0, errors: errors}
768-
chunk.synchronize do
769-
raise ShouldRetry unless chunk.writable?
770-
staged_chunk_used = true if chunk.staged?
771-
772-
original_bytesize = committed_bytesize = chunk.bytesize
773-
begin
774-
while writing_splits_index < splits.size
775-
split = splits[writing_splits_index]
776-
formatted_split = format ? format.call(split) : nil
777770

778-
if split.size == 1 # Check BufferChunkOverflowError
779-
determined_bytesize = nil
780-
if @compress != :text
781-
determined_bytesize = nil
782-
elsif formatted_split
783-
determined_bytesize = formatted_split.bytesize
784-
elsif split.first.respond_to?(:bytesize)
785-
determined_bytesize = split.first.bytesize
786-
end
771+
raise ShouldRetry unless chunk.writable?
772+
staged_chunk_used = true if chunk.staged?
787773

788-
if determined_bytesize && determined_bytesize > @chunk_limit_size
789-
# It is a obvious case that BufferChunkOverflowError should be raised here.
790-
# But if it raises here, already processed 'split' or
791-
# the proceeding 'split' will be lost completely.
792-
# So it is a last resort to delay raising such a exception
793-
errors << "a #{determined_bytesize} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
794-
writing_splits_index += 1
795-
next
796-
end
774+
original_bytesize = committed_bytesize = chunk.bytesize
775+
begin
776+
while writing_splits_index < splits.size
777+
split = splits[writing_splits_index]
778+
formatted_split = format ? format.call(split) : nil
797779

798-
if determined_bytesize.nil? || chunk.bytesize + determined_bytesize > @chunk_limit_size
799-
# The split will (might) cause size over so keep already processed
800-
# 'split' content here (allow performance regression a bit).
801-
chunk.commit
802-
committed_bytesize = chunk.bytesize
803-
end
780+
if split.size == 1 # Check BufferChunkOverflowError
781+
determined_bytesize = nil
782+
if @compress != :text
783+
determined_bytesize = nil
784+
elsif formatted_split
785+
determined_bytesize = formatted_split.bytesize
786+
elsif split.first.respond_to?(:bytesize)
787+
determined_bytesize = split.first.bytesize
804788
end
805789

806-
if format
807-
chunk.concat(formatted_split, split.size)
808-
else
809-
chunk.append(split, compress: @compress)
790+
if determined_bytesize && determined_bytesize > @chunk_limit_size
791+
# It is a obvious case that BufferChunkOverflowError should be raised here.
792+
# But if it raises here, already processed 'split' or
793+
# the proceeding 'split' will be lost completely.
794+
# So it is a last resort to delay raising such a exception
795+
errors << "a #{determined_bytesize} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
796+
writing_splits_index += 1
797+
next
810798
end
811-
adding_bytes = chunk.bytesize - committed_bytesize
812799

813-
if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
814-
chunk.rollback
800+
if determined_bytesize.nil? || chunk.bytesize + determined_bytesize > @chunk_limit_size
801+
# The split will (might) cause size over so keep already processed
802+
# 'split' content here (allow performance regression a bit).
803+
chunk.commit
815804
committed_bytesize = chunk.bytesize
805+
end
806+
end
816807

817-
if split.size == 1 # Check BufferChunkOverflowError again
818-
if adding_bytes > @chunk_limit_size
819-
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
820-
writing_splits_index += 1
821-
next
822-
else
823-
# As already processed content is kept after rollback, then unstaged chunk should be queued.
824-
# After that, re-process current split again.
825-
# New chunk should be allocated, to do it, modify @stage and so on.
826-
synchronize { @stage.delete(modified_metadata) }
827-
staged_chunk_used = false
828-
chunk.unstaged!
829-
break
830-
end
831-
end
808+
if format
809+
chunk.concat(formatted_split, split.size)
810+
else
811+
chunk.append(split, compress: @compress)
812+
end
813+
adding_bytes = chunk.bytesize - committed_bytesize
832814

833-
if chunk_size_full?(chunk) || split.size == 1
834-
enqueue_chunk_before_retry = true
815+
if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
816+
chunk.rollback
817+
committed_bytesize = chunk.bytesize
818+
819+
if split.size == 1 # Check BufferChunkOverflowError again
820+
if adding_bytes > @chunk_limit_size
821+
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
822+
writing_splits_index += 1
823+
next
835824
else
836-
splits_count *= 10
825+
# As already processed content is kept after rollback, then unstaged chunk should be queued.
826+
# After that, re-process current split again.
827+
# New chunk should be allocated, to do it, modify @stage and so on.
828+
synchronize { @stage.delete(modified_metadata) }
829+
staged_chunk_used = false
830+
chunk.unstaged!
831+
break
837832
end
833+
end
838834

839-
raise ShouldRetry
835+
if chunk_size_full?(chunk) || split.size == 1
836+
enqueue_chunk_before_retry = true
837+
else
838+
splits_count *= 10
840839
end
841840

842-
writing_splits_index += 1
841+
raise ShouldRetry
842+
end
843843

844-
if chunk_size_full?(chunk)
845-
break
846-
end
844+
writing_splits_index += 1
845+
846+
if chunk_size_full?(chunk)
847+
break
847848
end
848-
rescue
849-
chunk.purge if chunk.unstaged? # unstaged chunk will leak unless purge it
850-
raise
851849
end
852-
853-
modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize
850+
rescue
851+
chunk.purge if chunk.unstaged? # unstaged chunk will leak unless purge it
852+
raise
854853
end
854+
855+
modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize
855856
end
856857
modified_chunks.each do |data|
857858
block.call(data[:chunk], data[:adding_bytesize], data[:errors])
@@ -863,9 +864,15 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
863864
if chunk.unstaged?
864865
chunk.purge rescue nil
865866
end
867+
chunk.mon_exit rescue nil
866868
end
867869
enqueue_chunk(metadata) if enqueue_chunk_before_retry
868870
retry
871+
ensure
872+
modified_chunks.each do |data|
873+
chunk = data[:chunk]
874+
chunk.mon_exit
875+
end
869876
end
870877

871878
STATS_KEYS = [

test/plugin/test_buffer.rb

+59
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,65 @@ def create_chunk_es(metadata, es)
901901

902902
assert_equal 2, purge_count
903903
end
904+
905+
# https://github.com/fluent/fluentd/issues/4446
906+
test "#write_step_by_step keeps chunks kept in locked in entire #write process" do
907+
assert_equal 8 * 1024 * 1024, @p.chunk_limit_size
908+
assert_equal 0.95, @p.chunk_full_threshold
909+
910+
mon_enter_counts_by_chunk = {}
911+
mon_exit_counts_by_chunk = {}
912+
913+
stub.proxy(@p).generate_chunk(anything) do |chunk|
914+
stub(chunk).mon_enter do
915+
enter_count = 1 + mon_enter_counts_by_chunk.fetch(chunk, 0)
916+
exit_count = mon_exit_counts_by_chunk.fetch(chunk, 0)
917+
mon_enter_counts_by_chunk[chunk] = enter_count
918+
919+
# Assert that chunk is passed to &block of write_step_by_step before exiting the lock.
920+
# (i.e. The lock count must be 2 greater than the exit count).
921+
# Since ShouldRetry occurs once, the staged chunk takes the lock 3 times when calling the block.
922+
if chunk.staged?
923+
lock_in_block = enter_count == 3
924+
assert_equal(enter_count - 2, exit_count) if lock_in_block
925+
else
926+
lock_in_block = enter_count == 2
927+
assert_equal(enter_count - 2, exit_count) if lock_in_block
928+
end
929+
end
930+
stub(chunk).mon_exit do
931+
exit_count = 1 + mon_exit_counts_by_chunk.fetch(chunk, 0)
932+
mon_exit_counts_by_chunk[chunk] = exit_count
933+
end
934+
chunk
935+
end
936+
937+
m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
938+
small_row = "x" * 1024 * 400
939+
big_row = "x" * 1024 * 1024 * 8 # just `chunk_size_limit`, it does't cause BufferOverFlowError.
940+
941+
# Write 42 events in 1 event stream, last one is for triggering `ShouldRetry`
942+
@p.write({m => [small_row] * 40 + [big_row] + ["x"]})
943+
944+
# Above event strem will be splitted twice by `Buffer#write_step_by_step`
945+
#
946+
# 1. `write_once`: 42 [events] * 1 [stream]
947+
# 2. `write_step_by_step`: 4 [events]* 10 [streams] + 2 [events] * 1 [stream]
948+
# 3. `write_step_by_step` (by `ShouldRetry`): 1 [event] * 42 [streams]
949+
#
950+
# Example of staged chunk lock behavior:
951+
#
952+
# 1. mon_enter in write_step_by_step
953+
# 2. ShouldRetry occurs
954+
# 3. mon_exit in write_step_by_step
955+
# 4. mon_enter again in write_step_by_step (retry)
956+
# 5. passed to &block of write_step_by_step
957+
# 6. mon_enter in the block (write)
958+
# 7. mon_exit in write_step_by_step
959+
# 8. mon_exit in write
960+
961+
assert_equal(mon_enter_counts_by_chunk.values, mon_exit_counts_by_chunk.values)
962+
end
904963
end
905964

906965
sub_test_case 'standard format with configuration for test with lower chunk limit size' do

0 commit comments

Comments
 (0)