1
- import { ReplaySubject , BehaviorSubject } from 'rxjs' ;
1
+ import { ReplaySubject , BehaviorSubject , Subject } from 'rxjs' ;
2
2
import { distinctUntilChanged , map } from 'rxjs/operators' ;
3
3
import equal from 'fast-deep-equal' ;
4
4
import Database from './database/database.js' ;
@@ -10,6 +10,7 @@ import stripHexPrefix from 'strip-hex-prefix';
10
10
import { hexToDec } from 'hex2dec' ;
11
11
import EventSyncer from './eventSyncer' ;
12
12
import LogSyncer from './logSyncer' ;
13
+ import hash from 'object-hash' ;
13
14
14
15
export default class Subspace {
15
16
@@ -30,19 +31,13 @@ export default class Subspace {
30
31
this . networkId = undefined ;
31
32
this . isWebsocketProvider = options . disableSubscriptions ? false : ! ! provider . on ;
32
33
34
+ // Stats
35
+ this . latestBlockNumber = undefined ;
36
+ this . latestGasPrice = undefined ;
37
+ this . latestBlock = undefined ;
38
+ this . latest10Blocks = [ ] ;
33
39
34
- // TODO: part of manager
35
- this . latestBlockNumber = undefined ;
36
- this . latestGasPrice = undefined ;
37
- this . latestBlock = undefined ;
38
- this . latest10Blocks = [ ] ;
39
-
40
-
41
- // TODO: part of manager
42
- this . blockNumberObservable = null ;
43
- this . gasPriceObservable = null ;
44
- this . blockObservable = null ;
45
- this . blockTimeObservable = null ;
40
+ this . subjects = { } ;
46
41
47
42
this . newBlocksSubscription = null ;
48
43
this . intervalTracker = null ;
@@ -77,7 +72,7 @@ this.blockTimeObservable = null;
77
72
this . latest10Blocks = await Promise . all ( this . latest10Blocks ) ;
78
73
}
79
74
80
- // TODO: part of manager
75
+ // Initial stats
81
76
this . latestBlockNumber = block . number ;
82
77
this . latestGasPrice = gasPrice ;
83
78
this . latestBlock = block ;
@@ -149,9 +144,13 @@ this.blockTimeObservable = null;
149
144
}
150
145
151
146
// TODO: get contract abi/address instead
152
- trackEvent ( contractInstance , eventName , filterConditionsOrCb ) {
147
+ trackEvent ( contractInstance , eventName , filterConditions ) {
148
+ const subjectHash = hash ( { address : contractInstance . options . address , networkId : this . networkId , eventName, filterConditions} ) ;
149
+
150
+ if ( this . subjects [ subjectHash ] ) return this . subjects [ subjectHash ] ;
151
+
153
152
let deleteFrom = this . latestBlockNumber - this . options . refreshLastNBlocks ;
154
- let returnSub = this . eventSyncer . track ( contractInstance , eventName , filterConditionsOrCb , deleteFrom , this . networkId ) ;
153
+ let returnSub = this . eventSyncer . track ( contractInstance , eventName , filterConditions , deleteFrom , this . networkId ) ;
155
154
156
155
returnSub . map = ( prop ) => {
157
156
return returnSub . pipe ( map ( ( x ) => {
@@ -168,6 +167,8 @@ this.blockTimeObservable = null;
168
167
} ) )
169
168
}
170
169
170
+ this . subjects [ subjectHash ] = returnSub ;
171
+
171
172
return returnSub ;
172
173
}
173
174
@@ -181,7 +182,14 @@ this.blockTimeObservable = null;
181
182
182
183
trackLogs ( options , inputsABI ) {
183
184
if ( ! this . isWebsocketProvider ) console . warn ( "This method only works with websockets" ) ;
184
- return this . logSyncer . track ( options , inputsABI , this . latestBlockNumber - this . options . refreshLastNBlocks , this . networkId ) ;
185
+
186
+ const subjectHash = hash ( { inputsABI, options} ) ;
187
+
188
+ if ( this . subjects [ subjectHash ] ) return this . subjects [ subjectHash ] ;
189
+
190
+ this . subjects [ subjectHash ] = this . logSyncer . track ( options , inputsABI , this . latestBlockNumber - this . options . refreshLastNBlocks , this . networkId ) ;
191
+
192
+ return this . subjects [ subjectHash ] ;
185
193
}
186
194
187
195
_initNewBlocksSubscription ( ) {
@@ -209,30 +217,34 @@ this.blockTimeObservable = null;
209
217
} , this . options . callInterval ) ;
210
218
}
211
219
212
- // TODO: should save value in database?
213
220
trackProperty ( contractInstance , propName , methodArgs = [ ] , callArgs = { } ) {
214
- const sub = new ReplaySubject ( ) ;
221
+ const subjectHash = hash ( { address : contractInstance . options . address , networkId : this . networkId , propName, methodArgs, callArgs} ) ;
222
+
223
+ if ( this . subjects [ subjectHash ] ) return this . subjects [ subjectHash ] ;
224
+
225
+ const subject = new Subject ( ) ;
215
226
216
227
if ( ! Array . isArray ( methodArgs ) ) {
217
228
methodArgs = [ methodArgs ]
218
229
}
219
230
220
- const method = contractInstance . methods [ propName ] . apply ( contractInstance . methods [ propName ] , methodArgs )
231
+ const method = contractInstance . methods [ propName ] . apply ( contractInstance . methods [ propName ] , methodArgs ) ;
232
+
221
233
const callContractMethod = ( ) => {
222
234
method . call . apply ( method . call , [ callArgs , ( err , result ) => {
223
235
if ( err ) {
224
- sub . error ( err ) ;
236
+ subject . error ( err ) ;
225
237
return ;
226
238
}
227
- sub . next ( result ) ;
239
+ subject . next ( result ) ;
228
240
} ] ) ;
229
241
} ;
230
242
231
243
callContractMethod ( ) ;
232
244
233
245
this . callables . push ( callContractMethod ) ;
234
246
235
- let returnSub = sub . pipe ( distinctUntilChanged ( ( a , b ) => equal ( a , b ) ) ) ;
247
+ const returnSub = subject . pipe ( distinctUntilChanged ( ( a , b ) => equal ( a , b ) ) ) ;
236
248
237
249
returnSub . map = ( prop ) => {
238
250
return returnSub . pipe ( map ( ( x ) => {
@@ -249,16 +261,23 @@ this.blockTimeObservable = null;
249
261
} ) )
250
262
}
251
263
264
+ this . subjects [ subjectHash ] = returnSub ;
265
+
252
266
return returnSub ;
253
267
}
254
268
255
269
_addDistinctCallable ( trackAttribute , cbBuilder , subject , subjectArg = undefined ) {
256
- if ( this [ trackAttribute ] ) return this [ trackAttribute ] . pipe ( distinctUntilChanged ( ( a , b ) => equal ( a , b ) ) ) ;
257
- this [ trackAttribute ] = new subject ( subjectArg ) ;
258
- const cb = cbBuilder ( this [ trackAttribute ] ) ;
270
+ if ( this . subjects [ trackAttribute ] ) return this . subjects [ trackAttribute ] ;
271
+
272
+ const sub = new subject ( subjectArg ) ;
273
+
274
+ const cb = cbBuilder ( sub ) ;
259
275
cb ( ) ;
260
276
this . callables . push ( cb ) ;
261
- return this [ trackAttribute ] . pipe ( distinctUntilChanged ( ( a , b ) => equal ( a , b ) ) ) ;
277
+
278
+ this . subjects [ trackAttribute ] = sub . pipe ( distinctUntilChanged ( ( a , b ) => equal ( a , b ) ) ) ;
279
+
280
+ return this . subjects [ trackAttribute ] ;
262
281
}
263
282
264
283
trackBlock ( ) {
0 commit comments