-
Notifications
You must be signed in to change notification settings - Fork 68
Component Implementation
The StreamFlow delegation architecture makes integrating your existing Spouts and Bolts easy. The delegation architecture allows you to develop your Spouts and Bolts using the same native Storm libraries you already use making the transition of your current Storm components over to StreamFlow a relatively simple process. Although StreamFlow does not require you to import a StreamFlow specific API, there are a few simple libraries that can enhance your experience working with StreamFlow.
The first optional dependency is Google Guice, a dependency injection framework, which StreamFlow uses internally for injection of component properties. As you will see in the examples, the Guice @Inject
annotation allow you to inject the user provided component properties into your Spouts and Bolts.
To include support for Guice in your framework project, add the following dependency to your pom.xml
.
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>3.0</version>
<scope>provided</scope>
</dependency>
Once the above dependency is added to the pom.xml
, the @Inject
annotations in your component implementation will be available.
Note: The Guice dependency injection is optional as all StreamFlow properties are also added to the
Map config
object passed to theprepare()
andinitialize()
methods
The second optional dependency is the SLF4J logging framework which StreamFlow utilizes for capturing log output data. To alleviate the need to configure and manage your log files manually, StreamFlow can inject a preconfigured org.slf4j.Logger
object for your convenience. The Logger is configured to output the log data using the settings specified in the Logger
section of the streamflow.yml
application configuration file. Using this configuration, StreamFlow will output the log data from your components to the directory specified by the logger.baseDir
property using the pattern specified by the logger.formatPattern
property. In the sample component implementation provided in the following sections, you will see sample code to inject the Logger
and capture some text using the logging instance.
Important: While optional, the primary benefit of using the StreamFlow provided logger is that all log data collected by the logger is visible in the StreamFlow UI by selecting your topology in the topology builder and clicking the "View Log" button.
To include support for SLF4J in your framework project, add the following dependencies to your pom.xml
.
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
This section will walk through a sample component implementation using a mock framework.yml and the associated component source code as reference. The example below is taken from the Twitter Framework included in the StreamFlow project. Some methods have been removed for the sake of brevity, however, the full source code is available if you would like to examine things in greater detail.
name: twitter-framework
label: Twitter Framework
version: ${project.version}
description: Spouts and Bolts supporting Twitter functionality
components:
- name: twitter-sample-spout
label: Twitter Sample Spout
type: storm-spout
description: Utilizes Twitter Streaming API to stream of 1% Twitter data for analysis. Twitter OAuth credentials for you application are required for use.
mainClass: streamflow.spout.twitter.TwitterSampleSpout
icon: icons/twitter.png
properties:
- name: oauth-consumer-key
label: OAuth Consumer Key
type: text
description: Twitter OAuth Consumer Key
defaultValue:
required: true
- name: oauth-consumer-secret
label: OAuth Consumer Secret
type: text
description: Twitter OAuth Consumer Secret
defaultValue:
required: true
- name: oauth-access-token
label: OAuth Access Token
type: text
description: Twitter OAuth Access Token
defaultValue:
required: true
- name: oauth-access-token-secret
label: OAuth Access Token Secret
type: text
description: Twitter OAuth Access Token Secret
defaultValue:
required: true
outputs:
- key: default
description: Twitter Status
serializations:
- typeClass: twitter4j.Status
1. public class TwitterSampleSpout extends BaseRichSpout {
2.
3. private SpoutOutputCollector collector;
4.
5. private Logger logger;
6.
7. private String consumerKey;
8. private String consumerSecret;
9. private String accessToken;
10. private String accessTokenSecret;
11.
12. private final LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Status>(100000);
13. private TwitterStream twitterStream;
14.
15.
16. @Inject
17. public void setConsumerKey(@Named("oauth-consumer-key") String consumerKey) {
18. this.consumerKey = consumerKey;
19. }
20.
21. @Inject
22. public void setConsumerSecret(@Named("oauth-consumer-secret") String consumerSecret) {
23. this.consumerSecret = consumerSecret;
24. }
25.
26. @Inject
27. public void setAccessToken(@Named("oauth-access-token") String accessToken) {
28. this.accessToken = accessToken;
29. }
30.
31. @Inject
32. public void setAccessTokenSecret(@Named("oauth-access-token-secret") String accessTokenSecret) {
33. this.accessTokenSecret = accessTokenSecret;
34. }
35.
36. @Inject
37. public void setLogger(Logger logger){
38. this.logger = logger;
39. }
40.
41. @Override
42. public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
43. this.collector = collector;
44.
45. logger.info("Twitter Sampler Started: Consumer Key = " + consumerKey
46. + ", Consumer Secret = " + consumerSecret + ", Access Token = " + accessToken
47. + ", Access Token Secret = " + accessTokenSecret);
48.
49. // Build the twitter config to authenticate the requests
50. ConfigurationBuilder twitterConfig = new ConfigurationBuilder()
51. .setOAuthConsumerKey(consumerKey)
52. .setOAuthConsumerSecret(consumerSecret)
53. .setOAuthAccessToken(accessToken)
54. .setOAuthAccessTokenSecret(accessTokenSecret)
55.
56. // Status listener which handle the status events and add them to the queue
57. StatusListener listener = new StatusListener() {
58. @Override
59. public void onStatus(Status status) {
60. queue.offer(status);
61. }
62.
63. // ADDITIONAL METHODS OMITTED FOR SIMPLICITY
64. };
65.
66. TwitterStreamFactory twitterFactory = new TwitterStreamFactory(twitterConfig.build());
67. twitterStream = twitterFactory.getInstance();
68. twitterStream.addListener(listener);
69. twitterStream.sample();
70. }
71.
72. @Override
73. public void nextTuple() {
74. Status status = queue.poll();
75.
76. if (status == null) {
77. Utils.sleep(50);
78. } else {
79. // Emit the twitter status as a JSON String
80. collector.emit(new Values(status));
81. }
82. }
83.
84. @Override
85. public void close() {
86. twitterStream.shutdown();
87. }
88.
89. @Override
90. public void declareOutputFields(OutputFieldsDeclarer declarer) {
91. declarer.declare(new Fields("tweet"));
92. }
93. }
The purpose of the Twitter Sample Spout is to consume a steady stream of Twitter data using the Twitter streaming API. The code takes advantage of the Twitter4J open source library which simplifies the communication with Twitter's web services. In order to use the Twitter Sample Spout or any of the Twitter services in general, it requires registration on the Twitter Application Management website which provides the necessary OAuth credentials.
In the above example, the framework.yml
configuration defines four properties which are necessary for authentication with Twitter: "processing-url" and "processing-pipeline" for the component. These properties were added to the
config as they were determined to be values that could be changed at runtime.
Typically properties are added to allow users to make modifications to the
behavior of the Spout or Bolt without needing to change internal constants and
recompile the code. Line 11 of the sample framework.json file
references a mainClass for the implementation of the Bolt. The class above
shows the concrete implementation of one of the Storm Bolt base classes along
with the injected properties.
Lines 25 and 30 of the sample Bolt implementation shows the use of the @Named annotation to inject the values of the "processing-url" and "processing-pipeline" properties during runtime. When a topology using this Bolt is deployed, the instance variables will be populated with the current values after the constructor is called, but before any calls the inherited Bolt methods (e.g. prepare(), execute()).
Lines 15 and 20 demonstrate the use of two special @Named injected properties: "http.proxy.host" and "http.proxy.port". As the names imply, these properties are populated with the configured HTTP proxy host and port values during runtime if specified in the Storm environment. If no proxy is configured in the environment, the proxyHost and proxyPort will have a value null and -1 respectively. The proxy host and proxy port were added as special properties as they were used frequently by many different plugins and is centrally configured in the jetstream.yml configuration file.
This example demonstrates how values from the dynamically built jobs in the JetStream UI can be provided to the Spouts and Bolts during runtime. This feature is critical in allowing topologies to be built dynamically in the JetStream UI while allowing configurations to be modified and redeployed to the cluster without modifying the original Spout or Bolt code.