-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathhft.py
36 lines (27 loc) · 1.12 KB
/
hft.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/usr/bin/env python
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from helpers import *
sc = SparkContext("local[2]", "NetworkWordCount")
st = StreamingContext(sc, 1)
setup_checkpoint(st)
portfolio = { u'MSFT': Stock('MSFT', 1, 150.06, None, 0.0), u'APPL': Stock('APPL', 4, 70.23, None, 0.0), u'GOOG': Stock('GOOG', 2, 104.55, None, 0.0) }
def actualizar_portfolio(stocks):
actualizaciones = stocks.filter(lambda s: s.simbolo in portfolio).collect()
al_menos_una_actualizacion = False
for a in actualizaciones:
al_menos_una_actualizacion = True
actual = portfolio[a.simbolo]
nuevo = actual._replace( \
ultimo_precio = a.ultimo_precio, \
returns = (a.ultimo_precio - actual.precio_compra) / actual.precio_compra)
portfolio[a.simbolo] = nuevo
if al_menos_una_actualizacion:
print map(lambda s: list(s), portfolio.values())
stocks = st.socketTextStream("localhost", 9999) \
.map(parse_stock) \
.foreachRDD(actualizar_portfolio)
# stocks.pprint()
# stocks.reduceByKey(lambda a,b: a + b)
st.start()
st.awaitTermination()