Skip to content

Commit 44ae7fa

Browse files
committed
Bug Fix
- Support one token -> multi sockets - Tempt to fix concurrency income request causing same temp filename
1 parent 436af5c commit 44ae7fa

File tree

4 files changed

+244
-151
lines changed

4 files changed

+244
-151
lines changed

bin/tcp_server.dart

+189-137
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* @Author : Linloir
33
* @Date : 2022-10-06 15:44:16
4-
* @LastEditTime : 2022-10-14 10:26:00
4+
* @LastEditTime : 2022-10-17 22:56:11
55
* @Description :
66
*/
77

@@ -16,162 +16,214 @@ import 'package:tcp_server/tcpcontroller/response.dart';
1616

1717
void main(List<String> arguments) async {
1818
//Set port
19-
var port = arguments.isEmpty ? 20706 : int.tryParse(arguments[0]) ?? 20706;
19+
var address = arguments.isEmpty ? '127.0.0.1' : arguments[0];
20+
//Set address
21+
var port = arguments.length < 2 ? 20706 : int.tryParse(arguments[1]) ?? 20706;
2022

2123
//Create nessesary working directories
2224
await Directory('${Directory.current.path}/.tmp').create();
2325
await Directory('${Directory.current.path}/.data').create();
2426
await Directory('${Directory.current.path}/.data/files').create();
2527

2628
await DataBaseHelper().initialize();
27-
var tokenMap = <int, TCPController>{};
28-
var controllerMap = <TCPController, Future<int>>{};
29-
var listenSocket = await ServerSocket.bind('127.0.0.1', port);
29+
Map<int, List<TCPController>> tokenMap = {};
30+
Map<TCPController, Future<int>> controllerMap = {};
31+
var listenSocket = await ServerSocket.bind(address, port);
3032
listenSocket.listen(
3133
(socket) {
3234
var controller = TCPController(socket: socket);
33-
controller.requestStreamBroadcast.listen((request) async {
34-
print('[L] ${request.toJSON}');
35-
if(!(await DataBaseHelper().isTokenValid(tokenid: request.tokenID))) {
36-
if(controllerMap[controller] == null) {
37-
controllerMap[controller] = (() async => (await DataBaseHelper().createToken()))();
38-
}
39-
request.tokenID = await controllerMap[controller];
40-
var tokenResponse = TCPResponse(
41-
type: ResponseType.token,
42-
status: ResponseStatus.ok,
43-
body: {
44-
"tokenid": request.tokenID
45-
}
46-
);
47-
controller.outStream.add(tokenResponse);
35+
controller.responseStreamBroadcast.listen(
36+
null,
37+
onError: (_) {
38+
print('[L] [EXCEPTION]-----------------------');
39+
print('[L] TCP Controller ran into exception');
40+
print('[L] Remote: ${controller.socket.remoteAddress}:${controller.socket.remotePort}');
41+
var token = controllerMap[controller];
42+
controllerMap.remove(controller);
43+
tokenMap[token]?.remove(controller);
4844
}
49-
tokenMap[request.tokenID!] = tokenMap[request.tokenID!] ?? controller;
50-
switch(request.requestType) {
51-
case RequestType.checkState: {
52-
var response = await onCheckState(request, socket);
53-
controller.outStream.add(response);
54-
break;
55-
}
56-
case RequestType.register: {
57-
var response = await onRegister(request, socket);
58-
controller.outStream.add(response);
59-
break;
60-
}
61-
case RequestType.login: {
62-
var response = await onLogin(request, socket);
63-
controller.outStream.add(response);
64-
break;
65-
}
66-
case RequestType.logout: {
67-
var response = await onLogout(request, socket);
68-
controller.outStream.add(response);
69-
break;
70-
}
71-
case RequestType.profile: {
72-
var response = await onFetchProfile(request, socket);
73-
controller.outStream.add(response);
74-
break;
75-
}
76-
case RequestType.modifyProfile: {
77-
var response = await onModifyProfile(request, socket);
78-
controller.outStream.add(response);
79-
break;
45+
);
46+
controller.requestStreamBroadcast.listen(
47+
(request) async {
48+
print('[L] [INCOMING ]-----------------------');
49+
print('[L] Incoming from ${controller.socket.remoteAddress}:${controller.socket.remotePort}');
50+
print('[L] Message: ${request.toJSON}');
51+
if(!(await DataBaseHelper().isTokenValid(tokenid: request.tokenID))) {
52+
if(controllerMap[controller] == null) {
53+
controllerMap[controller] = (() async => (await DataBaseHelper().createToken()))();
54+
}
55+
request.tokenID = await controllerMap[controller];
56+
var tokenResponse = TCPResponse(
57+
type: ResponseType.token,
58+
status: ResponseStatus.ok,
59+
body: {
60+
"tokenid": request.tokenID
61+
}
62+
);
63+
controller.outStream.add(tokenResponse);
8064
}
81-
case RequestType.modifyPassword: {
82-
var response = await onModifyPassword(request, socket);
83-
controller.outStream.add(response);
84-
break;
65+
tokenMap[request.tokenID!] ??= [];
66+
if(!tokenMap[request.tokenID]!.contains(controller)) {
67+
tokenMap[request.tokenID]!.add(controller);
8568
}
86-
case RequestType.sendMessage: {
87-
//Forword Message
88-
var message = Message.fromJSONObject(request.body);
89-
await DataBaseHelper().setFetchHistoryFor(
90-
tokenID: request.tokenID,
91-
newTimeStamp: message.timestamp
92-
);
93-
var originUserID = message.senderID;
94-
var onlineDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: originUserID);
95-
for(var device in onlineDevices) {
96-
if(device == request.tokenID) {
97-
continue;
98-
}
99-
var targetController = tokenMap[device];
100-
var forwardResponse = TCPResponse(
101-
type: ResponseType.forwardMessage,
102-
status: ResponseStatus.ok,
103-
body: message.jsonObject
104-
);
105-
targetController?.outStream.add(forwardResponse);
106-
//Update Fetch Histories
107-
await DataBaseHelper().setFetchHistoryFor(
108-
tokenID: device,
109-
newTimeStamp: message.timestamp
110-
);
69+
switch(request.requestType) {
70+
case RequestType.checkState: {
71+
var response = await onCheckState(request, socket);
72+
controller.outStream.add(response);
73+
break;
11174
}
112-
var targetUserID = message.receiverID;
113-
var targetDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: targetUserID);
114-
for(var device in targetDevices) {
115-
//Forward to socket
116-
var targetController = tokenMap[device];
117-
var forwardResponse = TCPResponse(
118-
type: ResponseType.forwardMessage,
119-
status: ResponseStatus.ok,
120-
body: message.jsonObject
121-
);
122-
targetController?.outStream.add(forwardResponse);
123-
//Update Fetch Histories
75+
case RequestType.register: {
76+
var response = await onRegister(request, socket);
77+
controller.outStream.add(response);
78+
break;
79+
}
80+
case RequestType.login: {
81+
var response = await onLogin(request, socket);
82+
controller.outStream.add(response);
83+
break;
84+
}
85+
case RequestType.logout: {
86+
var response = await onLogout(request, socket);
87+
controller.outStream.add(response);
88+
break;
89+
}
90+
case RequestType.profile: {
91+
var response = await onFetchProfile(request, socket);
92+
controller.outStream.add(response);
93+
break;
94+
}
95+
case RequestType.modifyProfile: {
96+
var response = await onModifyProfile(request, socket);
97+
controller.outStream.add(response);
98+
break;
99+
}
100+
case RequestType.modifyPassword: {
101+
var response = await onModifyPassword(request, socket);
102+
controller.outStream.add(response);
103+
break;
104+
}
105+
case RequestType.sendMessage: {
106+
//Forword Message
107+
var message = Message.fromJSONObject(request.body);
124108
await DataBaseHelper().setFetchHistoryFor(
125-
tokenID: device,
109+
tokenID: request.tokenID,
126110
newTimeStamp: message.timestamp
127111
);
112+
var originUserID = message.senderID;
113+
var onlineDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: originUserID);
114+
for(var device in onlineDevices) {
115+
if(device == request.tokenID) {
116+
continue;
117+
}
118+
var targetControllers = tokenMap[device] ?? [];
119+
var forwardResponse = TCPResponse(
120+
type: ResponseType.forwardMessage,
121+
status: ResponseStatus.ok,
122+
body: message.jsonObject
123+
);
124+
for(var controller in targetControllers) {
125+
try {
126+
controller.outStream.add(forwardResponse);
127+
} catch(e) {
128+
print(e);
129+
}
130+
}
131+
//Update Fetch Histories
132+
await DataBaseHelper().setFetchHistoryFor(
133+
tokenID: device,
134+
newTimeStamp: message.timestamp
135+
);
136+
}
137+
var targetUserID = message.receiverID;
138+
var targetDevices = await DataBaseHelper().fetchTokenIDsViaUserID(userID: targetUserID);
139+
for(var device in targetDevices) {
140+
//Forward to socket
141+
var targetControllers = tokenMap[device] ?? [];
142+
var forwardResponse = TCPResponse(
143+
type: ResponseType.forwardMessage,
144+
status: ResponseStatus.ok,
145+
body: message.jsonObject
146+
);
147+
for(int i = targetControllers.length - 1; i >= 0; i--) {
148+
var controller = targetControllers[i];
149+
try{
150+
print('[L] [MSGFOWARD]-----------------------');
151+
print('[L] Forwarding message to ${controller.socket.remoteAddress}:${controller.socket.remotePort}');
152+
controller.outStream.add(forwardResponse);
153+
} catch(e) {
154+
print('[E] [EXCEPTION]-----------------------');
155+
var token = controllerMap[controller];
156+
controllerMap.remove(controller);
157+
tokenMap[token]?.remove(controller);
158+
}
159+
}
160+
//Update Fetch Histories
161+
await DataBaseHelper().setFetchHistoryFor(
162+
tokenID: device,
163+
newTimeStamp: message.timestamp
164+
);
165+
}
166+
var response = await onSendMessage(request, socket);
167+
controller.outStream.add(response);
168+
break;
169+
}
170+
case RequestType.fetchMessage: {
171+
var response = await onFetchMessage(request, socket);
172+
controller.outStream.add(response);
173+
break;
174+
}
175+
case RequestType.findFile: {
176+
var response = await onFindFile(request, socket);
177+
controller.outStream.add(response);
178+
break;
179+
}
180+
case RequestType.fetchFile: {
181+
var response = await onFetchFile(request, socket);
182+
controller.outStream.add(response);
183+
break;
184+
}
185+
case RequestType.searchUser: {
186+
var response = await onSearchUser(request, socket);
187+
controller.outStream.add(response);
188+
break;
189+
}
190+
case RequestType.addContact: {
191+
var response = await onAddContact(request, socket);
192+
controller.outStream.add(response);
193+
break;
194+
}
195+
case RequestType.fetchContact: {
196+
var response = await onFetchContact(request, socket);
197+
controller.outStream.add(response);
198+
break;
199+
}
200+
case RequestType.unknown: {
201+
var response = await onUnknownRequest(request, socket);
202+
controller.outStream.add(response);
203+
break;
204+
}
205+
default: {
206+
print('[E] Drop out of switch case');
128207
}
129-
var response = await onSendMessage(request, socket);
130-
controller.outStream.add(response);
131-
break;
132-
}
133-
case RequestType.fetchMessage: {
134-
var response = await onFetchMessage(request, socket);
135-
controller.outStream.add(response);
136-
break;
137-
}
138-
case RequestType.findFile: {
139-
var response = await onFindFile(request, socket);
140-
controller.outStream.add(response);
141-
break;
142-
}
143-
case RequestType.fetchFile: {
144-
var response = await onFetchFile(request, socket);
145-
controller.outStream.add(response);
146-
break;
147-
}
148-
case RequestType.searchUser: {
149-
var response = await onSearchUser(request, socket);
150-
controller.outStream.add(response);
151-
break;
152-
}
153-
case RequestType.addContact: {
154-
var response = await onAddContact(request, socket);
155-
controller.outStream.add(response);
156-
break;
157-
}
158-
case RequestType.fetchContact: {
159-
var response = await onFetchContact(request, socket);
160-
controller.outStream.add(response);
161-
break;
162-
}
163-
case RequestType.unknown: {
164-
var response = await onUnknownRequest(request, socket);
165-
controller.outStream.add(response);
166-
break;
167-
}
168-
default: {
169-
print('[E] Drop out of switch case');
170208
}
209+
//Clear temp file
210+
if(request.payload?.existsSync() ?? false) {
211+
request.payload?.delete();
212+
}
213+
},
214+
onError: (e) {
215+
print(e);
216+
var token = controllerMap[controller];
217+
controllerMap.remove(controller);
218+
tokenMap[token]?.remove(controller);
219+
},
220+
onDone: () {
221+
var token = controllerMap[controller];
222+
controllerMap.remove(controller);
223+
tokenMap[token]?.remove(controller);
171224
}
172-
//Clear temp file
173-
request.payload?.delete();
174-
});
225+
);
175226
},
227+
cancelOnError: true
176228
);
177229
}

lib/database.dart

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* @Author : Linloir
33
* @Date : 2022-10-06 16:15:01
4-
* @LastEditTime : 2022-10-14 12:13:23
4+
* @LastEditTime : 2022-10-15 11:26:03
55
* @Description :
66
*/
77

@@ -502,6 +502,16 @@ class DataBaseHelper {
502502
var filePath = '${Directory.current.path}/.data/files/$fileMd5';
503503
await tempFile.copy(filePath);
504504
try {
505+
var sameFile = await _database.query(
506+
'files',
507+
where: 'filemd5 = ?',
508+
whereArgs: [
509+
fileMd5
510+
]
511+
);
512+
if(sameFile.isNotEmpty) {
513+
return;
514+
}
505515
await _database.insert(
506516
'files',
507517
{
@@ -533,7 +543,7 @@ class DataBaseHelper {
533543
}) async {
534544
var queryResult = await _database.query(
535545
'msgfiles natural join files',
536-
where: 'msgfile.msgmd5 = ?',
546+
where: 'msgfiles.msgmd5 = ?',
537547
whereArgs: [
538548
msgMd5
539549
]

0 commit comments

Comments
 (0)