Skip to content
This repository was archived by the owner on Dec 15, 2021. It is now read-only.

Commit bf6d654

Browse files
author
Ahmed Ammar
committed
Update to support logstash 2.0.
1 parent 63c4dc9 commit bf6d654

File tree

3 files changed

+44
-25
lines changed

3 files changed

+44
-25
lines changed

lib/logstash-input-dynamodb_jars.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
require_jar( 'com.amazonaws', 'aws-java-sdk-codedeploy', '1.10.11' )
1616
require_jar( 'com.amazonaws', 'aws-java-sdk-dynamodb', '1.10.10' )
1717
require_jar( 'com.amazonaws', 'aws-java-sdk-directconnect', '1.10.11' )
18-
require_jar( 'org.apache.httpcomponents', 'httpclient', '4.3.6' )
18+
require_jar( 'org.apache.httpcomponents', 'httpclient', '4.4.1' )
19+
require_jar( 'org.apache.httpcomponents', 'httpcore', '4.4.1' )
1920
require_jar( 'com.amazonaws', 'aws-java-sdk-sns', '1.10.11' )
2021
require_jar( 'com.amazonaws', 'aws-java-sdk-directory', '1.10.11' )
2122
require_jar( 'com.google.protobuf', 'protobuf-java', '2.6.1' )

lib/logstash/inputs/dynamodb.rb

+36-20
Original file line numberDiff line numberDiff line change
@@ -182,17 +182,23 @@ def register
182182

183183
public
184184
def run(logstash_queue)
185-
begin
186-
run_with_catch(logstash_queue)
187-
rescue LogStash::ShutdownSignal
188-
exit_threads
189-
until @queue.empty?
190-
@logger.info("Flushing rest of events in logstash queue")
191-
event = @queue.pop()
192-
queue_event(@parser.parse_stream(event), logstash_queue, @host)
193-
end # until [email protected]?
194-
end # begin
195-
end # def run(logstash_queue)
185+
$exit = false
186+
run_with_catch(logstash_queue)
187+
while !stop?
188+
Stud.stoppable_sleep(@interval) { stop? }
189+
end
190+
end
191+
192+
public
193+
def stop
194+
$exit = true
195+
exit_threads
196+
until @queue.empty?
197+
@logger.info("Flushing rest of events in logstash queue")
198+
event = @queue.pop()
199+
queue_event(@parser.parse_stream(event), logstash_queue, @host)
200+
end # until [email protected]?
201+
end
196202

197203
# Starts KCL app in a background thread
198204
# Starts parallel scan if need be in a background thread
@@ -278,12 +284,16 @@ def scan(logstash_queue)
278284
start_table_copy_thread
279285

280286
scan_queue = @logstash_writer.getQueue()
281-
while true
282-
event = scan_queue.take()
283-
if event.getEntry().nil? and event.getSize() == -1
284-
break
285-
end # if event.isEmpty()
286-
queue_event(@parser.parse_scan(event.getEntry(), event.getSize()), logstash_queue, @host)
287+
while !$exit
288+
if !scan_queue.empty?
289+
event = scan_queue.take()
290+
if event.getEntry().nil? and event.getSize() == -1
291+
break
292+
end # if event.isEmpty()
293+
queue_event(@parser.parse_scan(event.getEntry(), event.getSize()), logstash_queue, @host)
294+
else
295+
sleep(1)
296+
end
287297
end # while true
288298
end
289299

@@ -292,14 +302,20 @@ def stream(logstash_queue)
292302
@logger.info("Starting stream...")
293303
start_kcl_thread
294304

295-
while true
296-
event = @queue.pop()
297-
queue_event(@parser.parse_stream(event), logstash_queue, @host)
305+
while !$exit
306+
if !@queue.empty?
307+
event = @queue.pop()
308+
queue_event(@parser.parse_stream(event), logstash_queue, @host)
309+
else
310+
sleep(1)
311+
end
298312
end # while true
299313
end
300314

301315
private
302316
def exit_threads
317+
@worker.shutdown()
318+
303319
unless @dynamodb_scan_thread.nil?
304320
@dynamodb_scan_thread.exit
305321
end # unless @dynamodb_scan_thread.nil?

logstash-input-dynamodb.gemspec

+6-4
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ Gem::Specification.new do |s|
1919
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "input" }
2020

2121
# Gem dependencies
22-
s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0'
23-
s.add_runtime_dependency "logstash-codec-json"
22+
s.add_runtime_dependency "logstash-core", ">= 2.0.0", "< 3.0.0"
23+
s.add_runtime_dependency 'logstash-codec-json'
24+
s.add_runtime_dependency 'stud', '>= 0.0.22'
2425
s.add_runtime_dependency "activesupport-json_encoder"
26+
s.add_development_dependency 'logstash-devutils', '>= 0.0.16'
2527
# Jar dependencies
2628
s.requirements << "jar 'com.amazonaws:aws-java-sdk-elasticbeanstalk', '1.10.11'"
2729
s.requirements << "jar 'com.amazonaws:aws-java-sdk-ses', '1.10.11' "
@@ -37,7 +39,8 @@ Gem::Specification.new do |s|
3739
s.requirements << "jar 'com.amazonaws:aws-java-sdk-codedeploy', '1.10.11'"
3840
s.requirements << "jar 'com.amazonaws:aws-java-sdk-dynamodb', '1.10.10'"
3941
s.requirements << "jar 'com.amazonaws:aws-java-sdk-directconnect', '1.10.11'"
40-
s.requirements << "jar 'org.apache.httpcomponents:httpclient', '4.3.6'"
42+
s.requirements << "jar 'org.apache.httpcomponents:httpclient', '4.4.1'"
43+
s.requirements << "jar 'org.apache.httpcomponents:httpcore', '4.4.1'"
4144
s.requirements << "jar 'com.amazonaws:aws-java-sdk-sns', '1.10.11'"
4245
s.requirements << "jar 'com.amazonaws:aws-java-sdk-directory', '1.10.11'"
4346
s.requirements << "jar 'com.google.protobuf:protobuf-java', '2.6.1'"
@@ -95,6 +98,5 @@ Gem::Specification.new do |s|
9598
s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudwatch', '1.10.8'"
9699
s.add_runtime_dependency 'jar-dependencies'
97100
# Development dependencies
98-
s.add_development_dependency "logstash-devutils"
99101
s.add_development_dependency "mocha"
100102
end

0 commit comments

Comments
 (0)