Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StatelessSession insertAll in batch does not do batching #2136

Merged
merged 2 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1779,7 +1779,8 @@ default <T> Uni<T> get(Class<T> entityClass, Object id, LockModeType lockModeTyp
Uni<Void> insert(Object entity);

/**
* Insert multiple rows.
* Insert multiple rows, using the number of the
* given entities as the batch size.
*
* @param entities new transient instances
*
Expand Down Expand Up @@ -1817,7 +1818,8 @@ default <T> Uni<T> get(Class<T> entityClass, Object id, LockModeType lockModeTyp
Uni<Void> delete(Object entity);

/**
* Delete multiple rows.
* Delete multiple rows, using the number of the
* given entities as the batch size.
*
* @param entities detached entity instances
*
Expand Down Expand Up @@ -1855,7 +1857,8 @@ default <T> Uni<T> get(Class<T> entityClass, Object id, LockModeType lockModeTyp
Uni<Void> update(Object entity);

/**
* Update multiple rows.
* Update multiple rows, using the number of the
* given entities as the batch size.
*
* @param entities detached entity instances
*
Expand Down Expand Up @@ -1915,7 +1918,8 @@ default <T> Uni<T> get(Class<T> entityClass, Object id, LockModeType lockModeTyp
Uni<Void> refresh(Object entity);

/**
* Refresh the entity instance state from the database.
* Refresh the entity instance state from the database, using the number of the
* given entities as the batch size.
*
* @param entities The entities to be refreshed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public Uni<Void> insert(Object entity) {

@Override
public Uni<Void> insertAll(Object... entities) {
return uni( () -> delegate.reactiveInsertAll( entities ) );
return uni( () -> delegate.reactiveInsertAll( entities.length, entities ) );
}

@Override
Expand All @@ -158,12 +158,12 @@ public Uni<Void> delete(Object entity) {

@Override
public Uni<Void> deleteAll(Object... entities) {
return uni( () -> delegate.reactiveDeleteAll( entities ) );
return uni( () -> delegate.reactiveDeleteAll( entities.length, entities ) );
}

@Override
public Uni<Void> deleteAll(int batchSize, Object... entities) {
return uni( () -> delegate.reactiveDeleteAll( entities ) );
return uni( () -> delegate.reactiveDeleteAll( batchSize, entities ) );
}

@Override
Expand All @@ -178,7 +178,7 @@ public Uni<Void> update(Object entity) {

@Override
public Uni<Void> updateAll(Object... entities) {
return uni( () -> delegate.reactiveUpdateAll( entities ) );
return uni( () -> delegate.reactiveUpdateAll( entities.length, entities ) );
}

@Override
Expand Down Expand Up @@ -208,7 +208,7 @@ public Uni<Void> upsert(String entityName, Object entity) {

@Override
public Uni<Void> refreshAll(Object... entities) {
return uni( () -> delegate.reactiveRefreshAll( entities ) );
return uni( () -> delegate.reactiveRefreshAll( entities.length, entities ) );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public class ReactiveStatelessSessionImpl extends StatelessSessionImpl implement

private final ReactiveConnection reactiveConnection;

private final ReactiveStatelessSession batchingHelperSession;
private final ReactiveStatelessSessionImpl batchingHelperSession;

private final PersistenceContext persistenceContext;

Expand All @@ -150,10 +150,9 @@ private ReactiveStatelessSessionImpl(
PersistenceContext persistenceContext) {
super( factory, options );
this.persistenceContext = persistenceContext;
Integer batchSize = getConfiguredJdbcBatchSize();
reactiveConnection = batchSize == null || batchSize < 2
? connection
: new BatchingConnection( connection, batchSize );
// Setting batch size to 0 because `StatelessSession` does not consider
// the value of `hibernate.jdbc.batch_size`
reactiveConnection = new BatchingConnection( connection, 0 );
batchingHelperSession = this;
influencers = new LoadQueryInfluencers( factory );
}
Expand Down Expand Up @@ -551,9 +550,12 @@ public CompletionStage<Void> reactiveInsertAll(Object... entities) {

@Override
public CompletionStage<Void> reactiveInsertAll(int batchSize, Object... entities) {
final Integer jdbcBatchSize = batchingHelperSession.getJdbcBatchSize();
batchingHelperSession.setJdbcBatchSize( batchSize );
final ReactiveConnection connection = batchingConnection( batchSize );
return loop( entities, batchingHelperSession::reactiveInsert )
.thenCompose( v -> connection.executeBatch() );
.thenCompose( v -> connection.executeBatch() )
.whenComplete( (v, throwable) -> batchingHelperSession.setJdbcBatchSize( jdbcBatchSize ) );
}

@Override
Expand All @@ -564,9 +566,12 @@ public CompletionStage<Void> reactiveUpdateAll(Object... entities) {

@Override
public CompletionStage<Void> reactiveUpdateAll(int batchSize, Object... entities) {
final Integer jdbcBatchSize = batchingHelperSession.getJdbcBatchSize();
batchingHelperSession.setJdbcBatchSize( batchSize );
final ReactiveConnection connection = batchingConnection( batchSize );
return loop( entities, batchingHelperSession::reactiveUpdate )
.thenCompose( v -> connection.executeBatch() );
.thenCompose( v -> connection.executeBatch() )
.whenComplete( (v, throwable) -> batchingHelperSession.setJdbcBatchSize( jdbcBatchSize ) );
}

@Override
Expand All @@ -577,9 +582,11 @@ public CompletionStage<Void> reactiveDeleteAll(Object... entities) {

@Override
public CompletionStage<Void> reactiveDeleteAll(int batchSize, Object... entities) {
final Integer jdbcBatchSize = batchingHelperSession.getJdbcBatchSize();
batchingHelperSession.setJdbcBatchSize( batchSize );
final ReactiveConnection connection = batchingConnection( batchSize );
return loop( entities, batchingHelperSession::reactiveDelete )
.thenCompose( v -> connection.executeBatch() );
return loop( entities, batchingHelperSession::reactiveDelete ).thenCompose( v -> connection.executeBatch() )
.whenComplete( (v, throwable) -> batchingHelperSession.setJdbcBatchSize( jdbcBatchSize ) );
}


Expand All @@ -591,9 +598,12 @@ public CompletionStage<Void> reactiveRefreshAll(Object... entities) {

@Override
public CompletionStage<Void> reactiveRefreshAll(int batchSize, Object... entities) {
final Integer jdbcBatchSize = batchingHelperSession.getJdbcBatchSize();
batchingHelperSession.setJdbcBatchSize( batchSize );
final ReactiveConnection connection = batchingConnection( batchSize );
return loop( entities, batchingHelperSession::reactiveRefresh )
.thenCompose( v -> connection.executeBatch() );
.thenCompose( v -> connection.executeBatch() )
.whenComplete( (v, throwable) -> batchingHelperSession.setJdbcBatchSize( jdbcBatchSize ) );
}

private ReactiveConnection batchingConnection(int batchSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1836,7 +1836,8 @@ default <T> CompletionStage<T> get(Class<T> entityClass, Object id, LockModeType
CompletionStage<Void> insert(Object entity);

/**
* Insert multiple rows.
* Insert multiple rows, using the number of the
* given entities as the batch size.
*
* @param entities new transient instances
*
Expand Down Expand Up @@ -1874,7 +1875,8 @@ default <T> CompletionStage<T> get(Class<T> entityClass, Object id, LockModeType
CompletionStage<Void> delete(Object entity);

/**
* Delete multiple rows.
* Delete multiple rows, using the number of the
* given entities as the batch size.
*
* @param entities detached entity instances
*
Expand Down Expand Up @@ -1912,7 +1914,8 @@ default <T> CompletionStage<T> get(Class<T> entityClass, Object id, LockModeType
CompletionStage<Void> update(Object entity);

/**
* Update multiple rows.
* Update multiple rows, using the number of the
* given entities as the batch size.
*
* @param entities a detached entity instance
*
Expand Down Expand Up @@ -1950,7 +1953,8 @@ default <T> CompletionStage<T> get(Class<T> entityClass, Object id, LockModeType
CompletionStage<Void> refresh(Object entity);

/**
* Refresh the entity instance state from the database.
* Refresh the entity instance state from the database, using the number of the
* given entities as the batch size.
*
* @param entities The entities to be refreshed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public CompletionStage<Void> insert(Object entity) {

@Override
public CompletionStage<Void> insert(Object... entities) {
return delegate.reactiveInsertAll( entities );
return delegate.reactiveInsertAll( entities.length, entities );
}

@Override
Expand All @@ -87,7 +87,7 @@ public CompletionStage<Void> delete(Object entity) {

@Override
public CompletionStage<Void> delete(Object... entities) {
return delegate.reactiveDeleteAll( entities );
return delegate.reactiveDeleteAll( entities.length, entities );
}

@Override
Expand All @@ -107,7 +107,7 @@ public CompletionStage<Void> update(Object entity) {

@Override
public CompletionStage<Void> update(Object... entities) {
return delegate.reactiveUpdateAll( entities );
return delegate.reactiveUpdateAll( entities.length, entities );
}

@Override
Expand All @@ -127,7 +127,7 @@ public CompletionStage<Void> refresh(Object entity) {

@Override
public CompletionStage<Void> refresh(Object... entities) {
return delegate.reactiveRefreshAll( entities );
return delegate.reactiveRefreshAll( entities.length, entities );
}

@Override
Expand Down
Loading
Loading