-
Notifications
You must be signed in to change notification settings - Fork 10
/
index.js
107 lines (95 loc) · 2.75 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"use strict";
/*
Use the Ripple Account Monitor to recieve
notifications about a particular ripple account
as they come in, beginning at a point
specified by you in the configuration.
The Account Monitor binds to a local tcp socket
and using ZeroMQ sends notifications to a client
process for handling. It obtains notifications
by polling the account notifications endpoint of
a Ripple REST server specified in the configuration
When the client disconnects from the ZeroMQ socket
the Account Monitor halts its operation. Upon
reconnection of the client the Account Monitor
resumes polling Ripple REST
*/
var config = require(__dirname+"/config/config.js");
var request = require("request");
var zmq = require("zmq");
var TIMEOUT = config.get("timeout");;
var PULLER_CONNECTED = false;
var pusher = zmq.socket("push");
pusher.bindSync(config.get("socket"));
pusher.monitor();
pusher.on("accept", function(){
console.log("client connected");
PULLER_CONNECTED = true;
});
pusher.on("disconnect", function(){
console.log("client disconnected");
PULLER_CONNECTED = false;
});
process.on('SIGINT', function() {
pusher.close();
console.log('closed socket:', config.get("socket"));
process.exit(-1);
});
function getNextNotification(callback){
getNotification(config.get('lastTransactionHash'), function(err, notification){
if (err) {
setTimeout(function(){
callback(callback);
}, TIMEOUT);
return;
}
if (notification.next_hash) {
getNotification(notification.next_hash, function(err, newNotification){
if (err) {
setTimeout(function(){
callback(callback);
}, TIMEOUT);
return;
}
handleNewNotification(newNotification, function(){
callback(callback);
});
});
} else {
setTimeout(function(){
callback(callback);
}, TIMEOUT);
}
});
}
function getNotification(hash, fn) {
var rippleRestServerUrl = config.get("rippleRestServerUrl");
var account = config.get('account');
var url = rippleRestServerUrl+"v1/accounts/"+account+'/notifications/'+hash;
if (PULLER_CONNECTED){
request.get({ url: url, json: true }, function(err, resp, body){
if (body && body.success && body.notification) {
fn(null, body.notification);
} else {
fn(err, null);
}
});
} else {
fn('disconnected', null);
}
}
function handleNewNotification(notification, done){
if (notification){
if (PULLER_CONNECTED){
var value = JSON.stringify(notification);
pusher.send(value);
config.set('lastTransactionHash', notification.hash);
config.save(done);
} else {
done();
}
} else {
done();
}
}
getNextNotification(getNextNotification);