Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23747 Support timeouts for RO transactions #4902

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,37 @@
* Ignite transaction options.
*/
public class TransactionOptions {
/** Transaction timeout. */
/** Transaction timeout. 0 means 'use default timeout'. */
private long timeoutMillis = 0;

/** Read-only transaction. */
private boolean readOnly = false;

/**
* Returns transaction timeout, in milliseconds.
* Returns transaction timeout, in milliseconds. 0 means 'use default timeout'.
*
* @return Transaction timeout, in milliseconds.
*/
public long timeoutMillis() {
return timeoutMillis;
}

// TODO: remove a note that timeouts are not supported for RW after IGNITE-15936 is implemented.
/**
* Sets transaction timeout, in milliseconds.
*
* @param timeoutMillis Transaction timeout, in milliseconds.
* @param timeoutMillis Transaction timeout, in milliseconds. Cannot be negative; 0 means 'use default timeout'.
ptupitsyn marked this conversation as resolved.
Show resolved Hide resolved
* <ul>
* <li>For RO transactions, the default timeout is configured via ignite.transaction.timeout configuration property.</li>
* <li>For RW transactions, timeouts are not supported yet.</li>
* </ul>
* @return {@code this} for chaining.
*/
public TransactionOptions timeoutMillis(long timeoutMillis) {
sanpwc marked this conversation as resolved.
Show resolved Hide resolved
if (timeoutMillis < 0) {
throw new IllegalArgumentException("Negative timeoutMillis: " + timeoutMillis);
}

this.timeoutMillis = timeoutMillis;

return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.tx;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;

import org.junit.jupiter.api.Test;

class TransactionOptionsTest {
@Test
void readOnlyIsFalseByDefault() {
assertThat(new TransactionOptions().readOnly(), is(false));
}

@Test
void readOnlyStatusIsSet() {
var options = new TransactionOptions();

options.readOnly(true);

assertThat(options.readOnly(), is(true));
}

@Test
void readOnlySetterReturnsSameObject() {
var options = new TransactionOptions();

TransactionOptions afterSetting = options.readOnly(true);

assertSame(options, afterSetting);
}

@Test
void timeoutIsZeroByDefault() {
assertThat(new TransactionOptions().timeoutMillis(), is(0L));
}

@Test
void timeoutIsSet() {
var options = new TransactionOptions();

options.timeoutMillis(3333);

assertThat(options.timeoutMillis(), is(3333L));
}

@Test
void timeoutSetterReturnsSameObject() {
var options = new TransactionOptions();

TransactionOptions afterSetting = options.timeoutMillis(3333);

assertSame(options, afterSetting);
}

@Test
void positiveTimeoutIsAllowed() {
assertDoesNotThrow(() -> new TransactionOptions().timeoutMillis(0));
}

@Test
void zeroTimeoutIsAllowed() {
assertDoesNotThrow(() -> new TransactionOptions().timeoutMillis(0));
}

@Test
void negativeTimeoutIsRejected() {
IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> new TransactionOptions().timeoutMillis(-1));

assertThat(ex.getMessage(), is("Negative timeoutMillis: -1"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContext;
Expand Down Expand Up @@ -130,7 +129,7 @@ public class ClientHandlerModule implements IgniteComponent {

@TestOnly
@SuppressWarnings("unused")
private volatile ChannelHandler handler;
private volatile ClientInboundMessageHandler handler;

/**
* Constructor.
Expand Down Expand Up @@ -396,4 +395,9 @@ private ClientInboundMessageHandler createInboundMessageHandler(ClientConnectorV
partitionOperationsExecutor
);
}

@TestOnly
public ClientInboundMessageHandler handler() {
return handler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
import org.apache.ignite.sql.SqlBatchException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/**
* Handles messages from thin clients.
Expand Down Expand Up @@ -1139,4 +1140,9 @@ private static Set<AuthenticationEvent> authenticationEventsToSubscribe() {
AuthenticationEvent.USER_REMOVED
);
}

@TestOnly
public ClientResourceRegistry resources() {
return resources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
import org.apache.ignite.lang.CancelHandle;
Expand Down Expand Up @@ -186,8 +187,8 @@ public HybridTimestampTracker getTimestampTracker() {
}

private static SqlProperties createProperties(
JdbcStatementType stmtType,
boolean multiStatement,
JdbcStatementType stmtType,
boolean multiStatement,
ZoneId timeZoneId,
long queryTimeoutMillis
) {
Expand Down Expand Up @@ -452,7 +453,7 @@ ZoneId timeZoneId() {
* @return Transaction associated with the current connection.
*/
InternalTransaction getOrStartTransaction(HybridTimestampTracker timestampProvider) {
return tx == null ? tx = txManager.begin(timestampProvider, false) : tx;
return tx == null ? tx = txManager.beginExplicitRw(timestampProvider, InternalTxOptions.defaults()) : tx;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.type.DecimalNativeType;
import org.apache.ignite.internal.type.NativeType;
Expand Down Expand Up @@ -439,32 +440,53 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
if (tx == null) {
// Implicit transactions do not use an observation timestamp because RW never depends on it, and implicit RO is always direct.
// The direct transaction uses a current timestamp on the primary replica by definition.
tx = startTx(out, txManager, null, true, readOnly);
tx = startImplicitTx(out, txManager, null, readOnly);
}

return tx;
}

/**
* Start a transaction.
* Starts an explicit transaction.
*
* @param out Packer.
* @param txManager Ignite transactions.
* @param currentTs Current observation timestamp or {@code null} if it is not defined.
* @param implicit Implicit transaction flag.
* @param readOnly Read only flag.
* @param options Transaction options.
* @return Transaction.
*/
public static InternalTransaction startTx(
public static InternalTransaction startExplicitTx(
ClientMessagePacker out,
TxManager txManager,
@Nullable HybridTimestamp currentTs,
boolean readOnly,
InternalTxOptions options
) {
return txManager.beginExplicit(
HybridTimestampTracker.clientTracker(currentTs, ts -> {}),
readOnly,
options
);
}

/**
* Starts an implicit transaction.
*
* @param out Packer.
* @param txManager Ignite transactions.
* @param currentTs Current observation timestamp or {@code null} if it is not defined.
* @param readOnly Read only flag.
* @return Transaction.
*/
public static InternalTransaction startImplicitTx(
ClientMessagePacker out,
TxManager txManager,
@Nullable HybridTimestamp currentTs,
boolean implicit,
boolean readOnly
) {
return txManager.begin(
HybridTimestampTracker.clientTracker(currentTs, implicit ? out::meta : ts -> {}),
implicit,
return txManager.beginImplicit(
HybridTimestampTracker.clientTracker(currentTs, out::meta),
readOnly
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.ignite.client.handler.requests.tx;

import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.startTx;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.startExplicitTx;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
Expand All @@ -27,6 +27,7 @@
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.TxManager;
import org.jetbrains.annotations.Nullable;

Expand All @@ -52,12 +53,20 @@ public class ClientTransactionBeginRequest {
ClientHandlerMetricSource metrics
) throws IgniteInternalCheckedException {
boolean readOnly = in.unpackBoolean();
long timeoutMillis = in.unpackLong();

// Timestamp makes sense only for read-only transactions.
HybridTimestamp observableTs = readOnly ? HybridTimestamp.nullableHybridTimestamp(in.unpackLong()) : null;
HybridTimestamp observableTs = null;
if (readOnly) {
// Timestamp makes sense only for read-only transactions.
observableTs = HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
}

InternalTxOptions txOptions = InternalTxOptions.builder()
.timeoutMillis(timeoutMillis)
.build();

// NOTE: we don't use beginAsync here because it is synchronous anyway.
var tx = startTx(out, txManager, observableTs, false, readOnly);
var tx = startExplicitTx(out, txManager, observableTs, readOnly, txOptions);

if (readOnly) {
// For read-only tx, override observable timestamp that we send to the client:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -133,7 +132,7 @@ public void contextClosedDuringBatchQuery() throws Exception {
CountDownLatch registryCloseLatch = new CountDownLatch(1);
long connectionId = acquireConnectionId();

when(txManager.begin(any(), eq(false))).thenAnswer(v -> {
when(txManager.beginExplicitRw(any(), any())).thenAnswer(v -> {
registryCloseLatch.countDown();
assertThat(startTxLatch.await(timeout, TimeUnit.SECONDS), is(true));

Expand Down Expand Up @@ -161,13 +160,13 @@ public void explicitTxRollbackOnCloseRegistry() {
InternalTransaction tx = mock(InternalTransaction.class);

when(tx.rollbackAsync()).thenReturn(nullCompletedFuture());
when(txManager.begin(any(), eq(false))).thenReturn(tx);
when(txManager.beginExplicitRw(any(), any())).thenReturn(tx);

long connectionId = acquireConnectionId();

await(eventHandler.batchAsync(connectionId, createExecuteBatchRequest("x", "UPDATE 1")));

verify(txManager).begin(any(), eq(false));
verify(txManager).beginExplicitRw(any(), any());
verify(tx, times(0)).rollbackAsync();

resourceRegistry.close();
Expand All @@ -183,32 +182,32 @@ public void singleTxUsedForMultipleOperations() {
InternalTransaction tx = mock(InternalTransaction.class);
when(tx.commitAsync()).thenReturn(nullCompletedFuture());
when(tx.rollbackAsync()).thenReturn(nullCompletedFuture());
when(txManager.begin(any(), eq(false))).thenReturn(tx);
when(txManager.beginExplicitRw(any(), any())).thenReturn(tx);

long connectionId = acquireConnectionId();
verify(txManager, times(0)).begin(any(), eq(false));
verify(txManager, times(0)).beginExplicitRw(any(), any());

String schema = "schema";
JdbcStatementType type = JdbcStatementType.SELECT_STATEMENT_TYPE;

await(eventHandler.queryAsync(
connectionId, createExecuteRequest(schema, "SELECT 1", type)
));
verify(txManager, times(1)).begin(any(), eq(false));
verify(txManager, times(1)).beginExplicitRw(any(), any());
await(eventHandler.batchAsync(connectionId, createExecuteBatchRequest("schema", "UPDATE 1", "UPDATE 2")));
verify(txManager, times(1)).begin(any(), eq(false));
verify(txManager, times(1)).beginExplicitRw(any(), any());

await(eventHandler.finishTxAsync(connectionId, false));
verify(tx).rollbackAsync();

await(eventHandler.batchAsync(connectionId, createExecuteBatchRequest("schema", "UPDATE 1", "UPDATE 2")));
verify(txManager, times(2)).begin(any(), eq(false));
verify(txManager, times(2)).beginExplicitRw(any(), any());
await(eventHandler.queryAsync(
connectionId, createExecuteRequest(schema, "SELECT 2", type)
));
verify(txManager, times(2)).begin(any(), eq(false));
verify(txManager, times(2)).beginExplicitRw(any(), any());
await(eventHandler.batchAsync(connectionId, createExecuteBatchRequest("schema", "UPDATE 3", "UPDATE 4")));
verify(txManager, times(2)).begin(any(), eq(false));
verify(txManager, times(2)).beginExplicitRw(any(), any());

await(eventHandler.finishTxAsync(connectionId, true));
verify(tx).commitAsync();
Expand All @@ -223,7 +222,7 @@ void simpleQueryCancellation() {

long connectionId = acquireConnectionId();

JdbcQueryExecuteRequest executeRequest = createExecuteRequest("schema", "SELECT 1", JdbcStatementType.SELECT_STATEMENT_TYPE);
JdbcQueryExecuteRequest executeRequest = createExecuteRequest("schema", "SELECT 1", JdbcStatementType.SELECT_STATEMENT_TYPE);

CompletableFuture<? extends Response> resultFuture = eventHandler.queryAsync(connectionId, executeRequest);

Expand Down
Loading