-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebapp.js
106 lines (88 loc) · 2.86 KB
/
webapp.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/* Logging */
var logger = require('winston');
logger.remove(logger.transports.Console);
logger.add(logger.transports.Console, {'timestamp':true, 'colorize': true});
/* Web server */
var express = require('express');
var cors = require('cors');
var app = express();
var requestClient = require('request');
app.use(cors());
var server = app.listen(3000, function() {
var host = server.address().address;
var port = server.address().port;
logger.log("info", "Server is listening at http://%s:%s", host, port);
});
// API
app.get('/test', function (request, response) {
logger.log("info", "GET /test request");
authorize(request, response, function (err, res, body) {
logger.log("info", JSON.stringify(body));
var message = 'Info message returned from example-service with object ' + JSON.stringify(body);
// Async kafka message sending
producer.send([{ topic: "node-test", messages: message}], function (err, result) {
if (err) {
logger.log("error", err);
}
});
response.set('Content-Type', 'application/json');
response.statusCode = res.statusCode
return response.send(body);
});
});
/**
* Authorize method
* Use this method to authorize requests using the
* auth service
*/
function authorize(request, response, callback) {
var server_ip = process.env.AUTH_SERVER_IP;
var server_port = process.env.AUTH_SERVER_PORT;
if (server_ip == null || server_port == null) {
logger.log("error", "Missing auth server configuration.")
logger.log("error", "ip:" + server_ip)
logger.log("error", "port:" + server_port)
response.statusCode = 500;
return response.end("Missing auth server configuration.");
}
var options = {
url: 'http://'+ process.env.AUTH_SERVER_IP + ':' + process.env.AUTH_SERVER_PORT + '/auth/authorize',
method: 'GET',
headers: {
'Authorization': request.headers.authorization
}
};
requestClient(options, function(err, res, body) {
if (res && (res.statusCode === 200 || res.statusCode === 201)) {
callback(err, res, body);
} else {
if (res) {
logger.log("info","Access denied:" + res.statusCode);
} else {
logger.log("info","Access denied with unknown response");
}
response.statusCode = 500
response.body = body;
return response.end();
}
});
}
/**
* Kafka init
* Use this Kafka config in all services
*/
var kafka_ip = process.env.KAFKA_IP;
var kafka_port = process.env.KAFKA_PORT;
if (kafka_ip == null || kafka_port == null) {
logger.log("error", "Missing kafka server configuration.")
logger.log("error", "ip:" + kafka_ip)
logger.log("error", "port:" + kafka_port)
}
var kafka = require('kafka-node');
var Producer = kafka.Producer;
var Client = kafka.Client;
var client = new Client(kafka_ip+':'+kafka_port);
var producer = new Producer(client, { requireAcks: 1 });
producer.on('ready', function () {
logger.log("info", "Kafka producer is ready")
});