Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: failover2 #363

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

feat: failover2 #363

wants to merge 5 commits into from

Conversation

sophia-bq
Copy link
Contributor

Summary

Description

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@sophia-bq sophia-bq added the wip Pull requests that are a work in progress label Jan 3, 2025
@sophia-bq sophia-bq requested a review from sergiyvamz January 3, 2025 18:07
@sophia-bq sophia-bq force-pushed the feat/failover2 branch 2 times, most recently from c4a4778 to 871729d Compare January 7, 2025 04:05
@sophia-bq sophia-bq force-pushed the feat/failover2 branch 3 times, most recently from 4a749c6 to f86dc16 Compare January 9, 2025 19:02
fix: integration tests

apply review comments

test: include rw integration tests

fix: remove wrapper properties
@sophia-bq sophia-bq force-pushed the feat/failover2 branch 2 times, most recently from 94cee75 to 64dcca3 Compare January 29, 2025 22:23
Comment on lines +90 to +96
let monitor: ClusterTopologyMonitor = MonitoringRdsHostListProvider.monitors.get(
this.clusterId,
MonitoringRdsHostListProvider.MONITOR_EXPIRATION_NANOS
);
if (!monitor) {
monitor = this.initMonitor();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

computeIfAbsent will check the map and create the item for the given key if item doesn't already exist. I don't think we need to call get here, then call computeIfAbsent in initMonitor. This implementation will check the map twice.

You could just call computeIfAbsent directly here

    const monitor: ClusterTopologyMonitor = MonitoringRdsHostListProvider.monitors.computeIfAbsent(
      this.clusterId,
      (x) =>
        new ClusterTopologyMonitorImpl(
          this.clusterId,
          MonitoringRdsHostListProvider.topologyCache,
          this.initialHost,
          this.properties,
          this.pluginService,
          this,
          WrapperProperties.CLUSTER_TOPOLOGY_REFRESH_RATE_MS.get(this.properties),
          WrapperProperties.CLUSTER_TOPOLOGY_HIGH_REFRESH_RATE_MS.get(this.properties)
        ),
      MonitoringRdsHostListProvider.MONITOR_EXPIRATION_NANOS
    );

async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<boolean> {
const hostListProvider: HostListProvider = this.getHostListProvider();
if (!this.isBlockingHostListProvider(hostListProvider)) {
throw new AwsWrapperError(Messages.get("PluginService.requiredMonitoringRdsHostListProvider"), typeof hostListProvider);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update the error message to use BlockingHostListProvider instead of MonitoringRdsHostListProvider

Comment on lines +186 to +189
const updatedHostList: HostInfo[] = await (hostListProvider as MonitoringRdsHostListProvider).forceMonitoringRefresh(
shouldVerifyWriter,
timeoutMs
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to cast it.

Suggested change
const updatedHostList: HostInfo[] = await (hostListProvider as MonitoringRdsHostListProvider).forceMonitoringRefresh(
shouldVerifyWriter,
timeoutMs
);
const updatedHostList: HostInfo[] = await hostListProvider.forceMonitoringRefresh(shouldVerifyWriter, timeoutMs);

If you really need to cast it, use the interface not the implementation.

);
}

async updateTopology() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer used in the code, could remove this and relevant unit tests

Comment on lines +142 to +150
// Call was initiated by Failover2 Plugin, does not require additional processing.
if (props.has(Failover2Plugin.INTERNAL_CONNECT_PROPERTY_NAME)) {
return await this._staleDnsHelper.getVerifiedConnection(hostInfo.host, isInitialConnection, this.hostListProviderService!, props, connectFunc);
}

// Failover is not enabled, does not require additional processing.
if (!this.enableFailoverSetting || !WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.get(props)) {
return await this._staleDnsHelper.getVerifiedConnection(hostInfo.host, isInitialConnection, this.hostListProviderService!, props, connectFunc);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Call was initiated by Failover2 Plugin, does not require additional processing.
if (props.has(Failover2Plugin.INTERNAL_CONNECT_PROPERTY_NAME)) {
return await this._staleDnsHelper.getVerifiedConnection(hostInfo.host, isInitialConnection, this.hostListProviderService!, props, connectFunc);
}
// Failover is not enabled, does not require additional processing.
if (!this.enableFailoverSetting || !WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.get(props)) {
return await this._staleDnsHelper.getVerifiedConnection(hostInfo.host, isInitialConnection, this.hostListProviderService!, props, connectFunc);
}
if (
// Call was initiated by Failover2 Plugin, does not require additional processing.
props.has(Failover2Plugin.INTERNAL_CONNECT_PROPERTY_NAME) ||
// Failover is not enabled, does not require additional processing.
!this.enableFailoverSetting ||
!WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.get(props)
) {
return await this._staleDnsHelper.getVerifiedConnection(hostInfo.host, isInitialConnection, this.hostListProviderService!, props, connectFunc);
}

Can we merge the checks to reduce code duplication

await this.failoverReader();
}

if (this._isInTransaction || this.pluginService.isInTransaction()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this._isInTransaction is set via this.pluginService.isInTransaction(), and when we use it we check this.pluginService.isInTransaction() anyway. Not sure if it is worth having this local variable.

Comment on lines +412 to +414
logger.error(Messages.get("Failover.unableToConnectToWriter"));
this.failoverWriterFailedCounter.inc();
throw new FailoverFailedError(Messages.get("Failover.unableToConnectToWriter"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we refactor this logic into a method to reduce duplicated code

  private logAndThrowError(errorMessage: string) {
    logger.error(errorMessage);
    this.failoverWriterFailedCounter.inc();
    throw new FailoverFailedError(errorMessage);
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants