@@ -40,11 +40,13 @@ import { ClientWrapper } from "./client_wrapper";
40
40
import { logger } from "../logutils" ;
41
41
import { Messages } from "./utils/messages" ;
42
42
import { DatabaseDialectCodes } from "./database_dialect/database_dialect_codes" ;
43
- import { getWriter } from "./utils/utils" ;
43
+ import { getWriter , logTopology } from "./utils/utils" ;
44
44
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory" ;
45
45
import { DriverDialect } from "./driver_dialect/driver_dialect" ;
46
+ import { AllowedAndBlockedHosts } from "./AllowedAndBlockedHosts" ;
46
47
47
48
export class PluginService implements ErrorHandler , HostListProviderService {
49
+ private static readonly DEFAULT_HOST_AVAILABILITY_CACHE_EXPIRE_NANO = 5 * 60_000_000_000 ; // 5 minutes
48
50
private readonly _currentClient : AwsClient ;
49
51
private _currentHostInfo ?: HostInfo ;
50
52
private _hostListProvider ?: HostListProvider ;
@@ -59,6 +61,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
59
61
protected readonly sessionStateService : SessionStateService ;
60
62
protected static readonly hostAvailabilityExpiringCache : CacheMap < string , HostAvailability > = new CacheMap < string , HostAvailability > ( ) ;
61
63
readonly props : Map < string , any > ;
64
+ private allowedAndBlockedHosts : AllowedAndBlockedHosts | null = null ;
62
65
63
66
constructor (
64
67
container : PluginServiceManagerContainer ,
@@ -114,17 +117,29 @@ export class PluginService implements ErrorHandler, HostListProviderService {
114
117
this . _currentHostInfo = this . _initialConnectionHostInfo ;
115
118
116
119
if ( ! this . _currentHostInfo ) {
117
- if ( this . getHosts ( ) . length === 0 ) {
120
+ if ( this . getAllHosts ( ) . length === 0 ) {
118
121
throw new AwsWrapperError ( Messages . get ( "PluginService.hostListEmpty" ) ) ;
119
122
}
120
123
121
- const writerHost = getWriter ( this . getHosts ( ) ) ;
124
+ const writerHost = getWriter ( this . getAllHosts ( ) ) ;
125
+ if ( ! this . getHosts ( ) . some ( ( hostInfo : HostInfo ) => hostInfo . host === writerHost ?. host ) ) {
126
+ throw new AwsWrapperError (
127
+ Messages . get (
128
+ "PluginService.currentHostNotAllowed" ,
129
+ this . _currentHostInfo ? this . _currentHostInfo . host : "<null>" ,
130
+ logTopology ( this . hosts , "[PluginService.currentHostNotAllowed] " )
131
+ )
132
+ ) ;
133
+ }
134
+
122
135
if ( writerHost ) {
123
136
this . _currentHostInfo = writerHost ;
124
137
} else {
125
138
this . _currentHostInfo = this . getHosts ( ) [ 0 ] ;
126
139
}
127
140
}
141
+
142
+ logger . debug ( `Set current host to: ${ this . _currentHostInfo . host } ` ) ;
128
143
}
129
144
130
145
return this . _currentHostInfo ;
@@ -286,11 +301,64 @@ export class PluginService implements ErrorHandler, HostListProviderService {
286
301
}
287
302
}
288
303
289
- getHosts ( ) : HostInfo [ ] {
304
+ getAllHosts ( ) : HostInfo [ ] {
290
305
return this . hosts ;
291
306
}
292
307
293
- setAvailability ( hostAliases : Set < string > , availability : HostAvailability ) { }
308
+ getHosts ( ) : HostInfo [ ] {
309
+ const hostPermissions = this . allowedAndBlockedHosts ;
310
+ if ( ! hostPermissions ) {
311
+ return this . hosts ;
312
+ }
313
+
314
+ let hosts = this . hosts ;
315
+ const allowedHostIds = hostPermissions . getAllowedHostIds ( ) ;
316
+ const blockedHostIds = hostPermissions . getBlockedHostIds ( ) ;
317
+
318
+ if ( allowedHostIds && allowedHostIds . size > 0 ) {
319
+ hosts = hosts . filter ( ( host : HostInfo ) => allowedHostIds . has ( host . hostId ) ) ;
320
+ }
321
+
322
+ if ( blockedHostIds && blockedHostIds . size > 0 ) {
323
+ hosts = hosts . filter ( ( host : HostInfo ) => ! blockedHostIds . has ( host . hostId ) ) ;
324
+ }
325
+
326
+ return hosts ;
327
+ }
328
+
329
+ setAvailability ( hostAliases : Set < string > , availability : HostAvailability ) {
330
+ if ( hostAliases . size === 0 ) {
331
+ return ;
332
+ }
333
+
334
+ const hostsToChange = [
335
+ ...new Set (
336
+ this . getAllHosts ( ) . filter (
337
+ ( host : HostInfo ) => hostAliases . has ( host . asAlias ) || [ ...host . aliases ] . some ( ( hostAlias : string ) => hostAliases . has ( hostAlias ) )
338
+ )
339
+ )
340
+ ] ;
341
+
342
+ if ( hostsToChange . length === 0 ) {
343
+ logger . debug ( Messages . get ( "PluginService.hostsChangeListEmpty" ) ) ;
344
+ return ;
345
+ }
346
+
347
+ const changes = new Map < string , Set < HostChangeOptions > > ( ) ;
348
+ for ( const host of hostsToChange ) {
349
+ const currentAvailability = host . getAvailability ( ) ;
350
+ PluginService . hostAvailabilityExpiringCache . put ( host . url , availability , PluginService . DEFAULT_HOST_AVAILABILITY_CACHE_EXPIRE_NANO ) ;
351
+ if ( currentAvailability !== availability ) {
352
+ let hostChanges = new Set < HostChangeOptions > ( ) ;
353
+ if ( availability === HostAvailability . AVAILABLE ) {
354
+ hostChanges = new Set ( [ HostChangeOptions . WENT_UP , HostChangeOptions . HOST_CHANGED ] ) ;
355
+ } else {
356
+ hostChanges = new Set ( [ HostChangeOptions . WENT_DOWN , HostChangeOptions . HOST_CHANGED ] ) ;
357
+ }
358
+ changes . set ( host . url , hostChanges ) ;
359
+ }
360
+ }
361
+ }
294
362
295
363
updateConfigWithProperties ( props : Map < string , any > ) {
296
364
this . _currentClient . config = Object . fromEntries ( props . entries ( ) ) ;
@@ -527,4 +595,8 @@ export class PluginService implements ErrorHandler, HostListProviderService {
527
595
attachNoOpErrorListener ( clientWrapper : ClientWrapper | undefined ) : void {
528
596
this . getDialect ( ) . getErrorHandler ( ) . attachNoOpErrorListener ( clientWrapper ) ;
529
597
}
598
+
599
+ setAllowedAndBlockedHosts ( allowedAndBlockedHosts : AllowedAndBlockedHosts ) {
600
+ this . allowedAndBlockedHosts = allowedAndBlockedHosts ;
601
+ }
530
602
}
0 commit comments