Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "fix: fixed failing queries using aggregation pipeline (#26132)" #1

Open
wants to merge 1 commit into
base: release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ public Flux<T> findAll() {
.flatMapMany(principal -> {
Query query = new Query(notDeleted());
return mongoOperations.find(
query.cursorBatchSize(10000),
entityInformation.getJavaType(),
entityInformation.getCollectionName());
query, entityInformation.getJavaType(), entityInformation.getCollectionName());
Comment on lines 163 to +164
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Removing cursorBatchSize(10000) could cause performance issues with large result sets. Consider keeping the batch size configuration to prevent potential memory issues.

});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.appsmith.server.repositories.ce.CustomNewActionRepositoryCEImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.stereotype.Component;
Expand All @@ -15,8 +14,7 @@ public class CustomNewActionRepositoryImpl extends CustomNewActionRepositoryCEIm
public CustomNewActionRepositoryImpl(
ReactiveMongoOperations mongoOperations,
MongoConverter mongoConverter,
CacheableRepositoryHelper cacheableRepositoryHelper,
MongoTemplate mongoTemplate) {
super(mongoOperations, mongoConverter, cacheableRepositoryHelper, mongoTemplate);
CacheableRepositoryHelper cacheableRepositoryHelper) {
super(mongoOperations, mongoConverter, cacheableRepositoryHelper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.appsmith.server.repositories.ce.CustomNewPageRepositoryCEImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.stereotype.Component;
Expand All @@ -14,8 +13,7 @@ public class CustomNewPageRepositoryImpl extends CustomNewPageRepositoryCEImpl i
public CustomNewPageRepositoryImpl(
ReactiveMongoOperations mongoOperations,
MongoConverter mongoConverter,
CacheableRepositoryHelper cacheableRepositoryHelper,
MongoTemplate mongoTemplate) {
super(mongoOperations, mongoConverter, cacheableRepositoryHelper, mongoTemplate);
CacheableRepositoryHelper cacheableRepositoryHelper) {
super(mongoOperations, mongoConverter, cacheableRepositoryHelper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.data.mongodb.repository.Meta;
import org.springframework.security.core.context.ReactiveSecurityContextHolder;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -167,7 +166,7 @@ public Mono<T> findById(String id, List<String> projectionFieldNames, Optional<A

return mongoOperations
.query(this.genericDomain)
.matching(query.cursorBatchSize(10000))
.matching(query)
.one()
.flatMap(obj -> setUserPermissionsInObject(obj, permissionGroups));
});
Expand Down Expand Up @@ -332,7 +331,6 @@ protected Mono<T> queryOne(List<Criteria> criterias, List<String> projectionFiel
});
}

@Meta(cursorBatchSize = 10000)
protected Mono<T> queryOne(
List<Criteria> criterias, List<String> projectionFieldNames, Optional<AclPermission> permission) {
Mono<Set<String>> permissionGroupsMono = getCurrentUserPermissionGroupsIfRequired(permission);
Expand Down Expand Up @@ -541,7 +539,7 @@ public Flux<T> queryAllWithPermissionGroups(
sortOptional.ifPresent(sort -> query.with(sort));
return mongoOperations
.query(this.genericDomain)
.matching(query.cursorBatchSize(10000))
.matching(query)
.all()
.flatMap(obj -> setUserPermissionsInObject(obj, permissionGroups));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Flux<NewAction> findAllNonJsActionsByNameAndPageIdsAndViewMode(

Mono<List<BulkWriteResult>> bulkUpdate(List<NewAction> newActions);

Mono<List<BulkWriteResult>> publishActions(String applicationId, AclPermission permission);
Mono<UpdateResult> publishActions(String applicationId, AclPermission permission);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Return type change from Mono<List> to Mono> means we lose granular information about individual write operations. UpdateResult only provides aggregate success/failure, while BulkWriteResult provided details for each operation.


Mono<UpdateResult> archiveDeletedUnpublishedActions(String applicationId, AclPermission permission);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@
import org.bson.Document;
import org.bson.types.ObjectId;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.Fields;
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
import org.springframework.data.mongodb.core.aggregation.GroupOperation;
import org.springframework.data.mongodb.core.aggregation.MatchOperation;
import org.springframework.data.mongodb.core.aggregation.ProjectionOperation;
Expand Down Expand Up @@ -55,15 +52,11 @@
public class CustomNewActionRepositoryCEImpl extends BaseAppsmithRepositoryImpl<NewAction>
implements CustomNewActionRepositoryCE {

private final MongoTemplate mongoTemplate;

public CustomNewActionRepositoryCEImpl(
ReactiveMongoOperations mongoOperations,
MongoConverter mongoConverter,
CacheableRepositoryHelper cacheableRepositoryHelper,
MongoTemplate mongoTemplate) {
CacheableRepositoryHelper cacheableRepositoryHelper) {
super(mongoOperations, mongoConverter, cacheableRepositoryHelper);
this.mongoTemplate = mongoTemplate;
}

@Override
Expand Down Expand Up @@ -578,33 +571,16 @@ public Flux<NewAction> findByDefaultApplicationId(String defaultApplicationId, O
}

@Override
public Mono<List<BulkWriteResult>> publishActions(String applicationId, AclPermission permission) {
public Mono<UpdateResult> publishActions(String applicationId, AclPermission permission) {
Criteria applicationIdCriteria =
where(fieldName(QNewAction.newAction.applicationId)).is(applicationId);
// using aggregation update instead of regular update here
// it's required to set a field to a value of another field from the same domain
AggregationUpdate aggregationUpdate = AggregationUpdate.update()
.set(fieldName(QNewAction.newAction.publishedAction))
.toValue("$" + fieldName(QNewAction.newAction.unpublishedAction));

Mono<Set<String>> permissionGroupsMono =
getCurrentUserPermissionGroupsIfRequired(Optional.ofNullable(permission));

return permissionGroupsMono.flatMap(permissionGroups -> {
AggregationOperation matchAggregationWithPermission = null;
if (permission == null) {
matchAggregationWithPermission = Aggregation.match(new Criteria().andOperator(notDeleted()));
} else {
matchAggregationWithPermission = Aggregation.match(
new Criteria().andOperator(notDeleted(), userAcl(permissionGroups, permission)));
}
AggregationOperation matchAggregation = Aggregation.match(applicationIdCriteria);
AggregationOperation wholeProjection = Aggregation.project(NewAction.class);
AggregationOperation addFieldsOperation = Aggregation.addFields()
.addField(fieldName(QNewAction.newAction.publishedAction))
.withValueOf(Fields.field(fieldName(QNewAction.newAction.unpublishedAction)))
.build();
Aggregation combinedAggregation = Aggregation.newAggregation(
matchAggregation, matchAggregationWithPermission, wholeProjection, addFieldsOperation);
AggregationResults<NewAction> updatedResults =
mongoTemplate.aggregate(combinedAggregation, NewAction.class, NewAction.class);
return bulkUpdate(updatedResults.getMappedResults());
});
return updateByCriteria(List.of(applicationIdCriteria), aggregationUpdate, permission);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.appsmith.server.acl.AclPermission;
import com.appsmith.server.domains.NewPage;
import com.appsmith.server.repositories.AppsmithRepository;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.result.UpdateResult;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -43,7 +43,5 @@ Mono<NewPage> findByGitSyncIdAndDefaultApplicationId(
Mono<NewPage> findByGitSyncIdAndDefaultApplicationId(
String defaultApplicationId, String gitSyncId, Optional<AclPermission> permission);

Mono<List<BulkWriteResult>> publishPages(Collection<String> pageIds, AclPermission permission);

Mono<List<BulkWriteResult>> bulkUpdate(List<NewPage> newPages);
Mono<UpdateResult> publishPages(Collection<String> pageIds, AclPermission permission);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Changing from BulkWriteResult to UpdateResult means losing per-document update status information. Ensure this doesn't break error handling for partial page publish failures.

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,48 +9,32 @@
import com.appsmith.server.dtos.PageDTO;
import com.appsmith.server.repositories.BaseAppsmithRepositoryImpl;
import com.appsmith.server.repositories.CacheableRepositoryHelper;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.result.UpdateResult;
import lombok.extern.slf4j.Slf4j;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.Fields;
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static org.springframework.data.mongodb.core.query.Criteria.where;

@Slf4j
public class CustomNewPageRepositoryCEImpl extends BaseAppsmithRepositoryImpl<NewPage>
implements CustomNewPageRepositoryCE {

private final MongoTemplate mongoTemplate;

public CustomNewPageRepositoryCEImpl(
ReactiveMongoOperations mongoOperations,
MongoConverter mongoConverter,
CacheableRepositoryHelper cacheableRepositoryHelper,
MongoTemplate mongoTemplate) {
CacheableRepositoryHelper cacheableRepositoryHelper) {
super(mongoOperations, mongoConverter, cacheableRepositoryHelper);
this.mongoTemplate = mongoTemplate;
}

@Override
Expand Down Expand Up @@ -267,55 +251,14 @@ public Mono<NewPage> findByGitSyncIdAndDefaultApplicationId(
}

@Override
public Mono<List<BulkWriteResult>> publishPages(Collection<String> pageIds, AclPermission permission) {
public Mono<UpdateResult> publishPages(Collection<String> pageIds, AclPermission permission) {
Criteria applicationIdCriteria = where(fieldName(QNewPage.newPage.id)).in(pageIds);
// using aggregation update instead of regular update here
// it's required to set a field to a value of another field from the same domain
AggregationUpdate aggregationUpdate = AggregationUpdate.update()
.set(fieldName(QNewPage.newPage.publishedPage))
.toValue("$" + fieldName(QNewPage.newPage.unpublishedPage));
Comment on lines +258 to +260
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Using $-prefixed field references in aggregation updates can be fragile if field names change. Consider using a more robust field reference mechanism.


Mono<Set<String>> permissionGroupsMono =
getCurrentUserPermissionGroupsIfRequired(Optional.ofNullable(permission));

return permissionGroupsMono.flatMap(permissionGroups -> {
AggregationOperation matchAggregationWithPermission = null;
if (permission == null) {
matchAggregationWithPermission = Aggregation.match(new Criteria().andOperator(notDeleted()));
} else {
matchAggregationWithPermission = Aggregation.match(
new Criteria().andOperator(notDeleted(), userAcl(permissionGroups, permission)));
}
AggregationOperation matchAggregation = Aggregation.match(applicationIdCriteria);
AggregationOperation wholeProjection = Aggregation.project(NewPage.class);
AggregationOperation addFieldsOperation = Aggregation.addFields()
.addField(fieldName(QNewPage.newPage.publishedPage))
.withValueOf(Fields.field(fieldName(QNewPage.newPage.unpublishedPage)))
.build();
Aggregation combinedAggregation = Aggregation.newAggregation(
matchAggregation, matchAggregationWithPermission, wholeProjection, addFieldsOperation);
AggregationResults<NewPage> updatedResults =
mongoTemplate.aggregate(combinedAggregation, NewPage.class, NewPage.class);
return bulkUpdate(updatedResults.getMappedResults());
});
}

@Override
public Mono<List<BulkWriteResult>> bulkUpdate(List<NewPage> newPages) {
if (CollectionUtils.isEmpty(newPages)) {
return Mono.just(Collections.emptyList());
}

// convert the list of new pages to a list of DBObjects
List<WriteModel<Document>> dbObjects = newPages.stream()
.map(newPage -> {
assert newPage.getId() != null;
Document document = new Document();
mongoOperations.getConverter().write(newPage, document);
document.remove("_id");
return (WriteModel<Document>) new UpdateOneModel<Document>(
new Document("_id", new ObjectId(newPage.getId())), new Document("$set", document));
})
.collect(Collectors.toList());

return mongoOperations
.getCollection(mongoOperations.getCollectionName(NewPage.class))
.flatMapMany(documentMongoCollection -> documentMongoCollection.bulkWrite(dbObjects))
.collectList();
return updateByCriteria(List.of(applicationIdCriteria), aggregationUpdate, permission);
}
Comment on lines +254 to 263
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Moving from bulk writes to individual updates could significantly impact performance when publishing multiple pages simultaneously. Consider keeping bulk operations or documenting why they were problematic.

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,12 @@
import com.appsmith.server.domains.NewAction;
import com.appsmith.server.repositories.BaseRepository;
import com.appsmith.server.repositories.CustomNewActionRepository;
import org.springframework.data.mongodb.repository.Meta;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface NewActionRepositoryCE extends BaseRepository<NewAction, String>, CustomNewActionRepository {

@Meta(cursorBatchSize = 10000)
Flux<NewAction> findByApplicationId(String applicationId);

@Meta(cursorBatchSize = 10000)
Flux<NewAction> findAllByIdIn(Iterable<String> ids);

Mono<Long> countByDeletedAtNull();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import com.appsmith.server.solutions.PagePermission;
import com.appsmith.server.solutions.WorkspacePermission;
import com.google.common.base.Strings;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.result.UpdateResult;
import jakarta.annotation.Nullable;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -1135,7 +1134,7 @@ public Mono<Application> publish(String applicationId, boolean isPublishedManual
if (isPublishedManually) {
application.setLastDeployedAt(Instant.now());
}
Mono<List<BulkWriteResult>> publishPagesMono =
Mono<UpdateResult> publishPagesMono =
newPageService.publishPages(editedPageIds, pagePermission.getEditPermission());
Comment on lines +1137 to 1138
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Reverting from BulkWriteResult to UpdateResult may reduce performance by not using MongoDB's bulk operation capabilities. Consider keeping bulk operations if possible.


// Archive the deleted pages and save the application changes and then return the pages so that
Expand All @@ -1145,7 +1144,7 @@ public Mono<Application> publish(String applicationId, boolean isPublishedManual
})
.cache(); // caching as we'll need this to send analytics attributes after publishing the app

Mono<List<BulkWriteResult>> publishActionsMono =
Mono<UpdateResult> publishActionsMono =
newActionService.publishActions(applicationId, actionPermission.getEditPermission());

// this is a map of pluginType to count of actions for that pluginType, required for analytics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import com.appsmith.server.dtos.ce.ImportedActionAndCollectionMapsDTO;
import com.appsmith.server.helpers.ce.ImportApplicationPermissionProvider;
import com.appsmith.server.services.CrudService;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.result.UpdateResult;
import org.springframework.data.domain.Sort;
import org.springframework.util.MultiValueMap;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -134,7 +134,7 @@ Mono<ImportedActionAndCollectionMapsDTO> updateActionsWithImportedCollectionIds(
ImportActionCollectionResultDTO importActionCollectionResultDTO,
ImportActionResultDTO importActionResultDTO);

Mono<List<BulkWriteResult>> publishActions(String applicationId, AclPermission permission);
Mono<UpdateResult> publishActions(String applicationId, AclPermission permission);

Flux<PluginTypeAndCountDTO> countActionsByPluginType(String applicationId);
}
Loading