-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSpark.py
102 lines (66 loc) · 2.82 KB
/
Spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from geopy.geocoders import Nominatim
from textblob import TextBlob
import json
from elasticsearch import Elasticsearch
from datetime import datetime
TCP_IP = 'localhost'
TCP_PORT = 9001
def processTweet(tweet):
# Here, you should implement:
# (i) Sentiment analysis,
# (ii) Get data corresponding to place where the tweet was generate (using geopy or googlemaps)
# (iii) Index the data using Elastic Search
es = Elasticsearch(cloud_id="", http_auth=('elastic', ''))
tweetData = tweet.split("::")
if len(tweetData) > 2:
created_at = tweetData[2]
text = tweetData[1]
rawLocation = tweetData[0]
doc = {}
# (i) Apply Sentiment analysis in "text"
# (ii) Get geolocation (state, country, lat, lon, etc...) from rawLocation
print("\n\n=========================\ntweet: ", text)
print("\nRaw location from tweet status: ", rawLocation)
t = TextBlob(text)
print("\n Sentiment : ", t.sentiment)
print("\n Polarity : ", t.sentiment.polarity)
doc["Polarity"] = t.sentiment.polarity
doc["Subjectivity"] = t.sentiment.subjectivity
geolocator = Nominatim(user_agent="bts")
l = geolocator.geocode(rawLocation, exactly_one=True, addressdetails=True)
if l is not None:
print("\nLocation: ", l.raw)
print("\nLatitude", l.latitude)
print("\nLongitude", l.longitude)
doc["location"] = {"lat": l.latitude if l is not None else 0.0, "lon": l.longitude if l is not None else 0.0}
doc["Address"] = l.raw['address'] if l is not None else {}
if t.sentiment.polarity < 0:
sentiment = "negative"
elif t.sentiment.polarity == 0.0:
sentiment = "neutral"
else:
sentiment = "positive"
doc["message"] = text
doc["sentiment"] = sentiment
doc["date"] = datetime.strptime(created_at, '%a %b %d %H:%M:%S %Y')
#"polarity": t.sentiment.polarity,"subjectivity": t.sentiment.subjectivity,
# (iii) Post the index on ElasticSearch or log your data in some other way
es.index(index="bb-index",body=doc)
print(doc)
# Pyspark
# create spark configuration
conf = SparkConf()
conf.setAppName('TwitterApp')
conf.setMaster('local[2]')
# create spark context with the above configuration
sc = SparkContext(conf=conf)
# create the Streaming Context from spark context with interval size 4 seconds
ssc = StreamingContext(sc, 4)
ssc.checkpoint("checkpoint_TwitterApp")
# read data from port 900
dataStream = ssc.socketTextStream(TCP_IP, TCP_PORT)
dataStream.foreachRDD(lambda rdd: rdd.foreach(processTweet))
ssc.start()
ssc.awaitTermination()