Skip to content

Commit

Permalink
ISSUE-1199 Update DistributedLinagoraSecondaryBlobStoreTest to add ev…
Browse files Browse the repository at this point in the history
…entdeadletters test
  • Loading branch information
hung phan authored and chibenwa committed Oct 28, 2024
1 parent 7fd1e05 commit 774c7d9
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.james.events.EventBusId;
import org.apache.james.events.EventBusName;
import org.apache.james.events.EventDeadLetters;
import org.apache.james.events.EventSerializer;
import org.apache.james.events.NamingStrategy;
import org.apache.james.events.RabbitMQEventBus;
import org.apache.james.events.RetryBackoffConfiguration;
Expand All @@ -22,6 +23,7 @@
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.multibindings.ProvidesIntoSet;
import com.google.inject.name.Names;
import com.linagora.tmail.james.jmap.EmailAddressContactInjectKeys;
Expand All @@ -36,6 +38,10 @@ public class DistributedEmailAddressContactEventModule extends AbstractModule {
@Override
protected void configure() {
bind(EventBusId.class).annotatedWith(Names.named(EmailAddressContactInjectKeys.AUTOCOMPLETE)).toInstance(EventBusId.random());

Multibinder.newSetBinder(binder(), EventSerializer.class)
.addBinding()
.to(TmailJmapEventSerializer.class);
}

@ProvidesIntoSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.james.events.EventBusId;
import org.apache.james.events.EventBusName;
import org.apache.james.events.EventDeadLetters;
import org.apache.james.events.EventSerializer;
import org.apache.james.events.NamingStrategy;
import org.apache.james.events.RabbitMQEventBus;
import org.apache.james.events.RetryBackoffConfiguration;
Expand Down Expand Up @@ -41,6 +42,9 @@ protected void configure() {
bind(TmailEventSerializer.class).in(Scopes.SINGLETON);
bind(EventBusId.class).annotatedWith(Names.named(TmailInjectNameConstants.TMAIL_EVENT_BUS_INJECT_NAME)).toInstance(EventBusId.random());

Multibinder.newSetBinder(binder(), EventSerializer.class)
.addBinding()
.to(TmailEventSerializer.class);
Multibinder.newSetBinder(binder(), TmailReactiveGroupEventListener.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import static io.netty.handler.codec.http.HttpHeaderNames.ACCEPT;
import static io.restassured.RestAssured.given;
import static io.restassured.RestAssured.requestSpecification;
import static io.restassured.RestAssured.with;
import static io.restassured.http.ContentType.JSON;
import static org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration.UPLOAD_RETRY_EXCEPTION_PREDICATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.awaitility.Durations.FIVE_SECONDS;
import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
import static org.awaitility.Durations.ONE_MINUTE;
import static org.awaitility.Durations.TEN_SECONDS;
Expand All @@ -30,6 +33,8 @@
import org.apache.james.blob.objectstorage.aws.S3BlobStoreDAO;
import org.apache.james.core.Domain;
import org.apache.james.core.Username;
import org.apache.james.events.EventDeadLetters;
import org.apache.james.events.Group;
import org.apache.james.jmap.JMAPUrls;
import org.apache.james.jmap.JmapGuiceProbe;
import org.apache.james.jmap.http.UserCredential;
Expand All @@ -39,6 +44,10 @@
import org.apache.james.modules.MailboxProbeImpl;
import org.apache.james.utils.DataProbeImpl;
import org.apache.james.utils.GuiceProbe;
import org.apache.james.utils.WebAdminGuiceProbe;
import org.apache.james.webadmin.WebAdminUtils;
import org.apache.james.webadmin.routes.EventDeadLettersRoutes;
import org.apache.james.webadmin.routes.TasksRoutes;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -51,7 +60,10 @@
import com.google.inject.multibindings.Multibinder;
import com.google.inject.name.Named;
import com.linagora.tmail.blob.guice.BlobStoreConfiguration;
import com.linagora.tmail.blob.secondaryblobstore.FailedBlobEvents;
import com.linagora.tmail.blob.secondaryblobstore.FailedBlobOperationListener;
import com.linagora.tmail.blob.secondaryblobstore.SecondaryBlobStoreDAO;
import com.linagora.tmail.common.event.TmailInjectNameConstants;
import com.linagora.tmail.encrypted.MailboxConfiguration;
import com.linagora.tmail.james.app.CassandraExtension;
import com.linagora.tmail.james.app.DistributedJamesConfiguration;
Expand All @@ -73,8 +85,6 @@

@Tag(BasicFeature.TAG)
class DistributedLinagoraSecondaryBlobStoreTest {
public static final int FIVE_SECONDS = 5000;

public static final ConditionFactory calmlyAwait = Awaitility.with()
.pollInterval(ONE_HUNDRED_MILLISECONDS)
.and()
Expand All @@ -85,12 +95,15 @@ class DistributedLinagoraSecondaryBlobStoreTest {
static class BlobStoreProbe implements GuiceProbe {
private final S3BlobStoreDAO primaryBlobStoreDAO;
private final S3BlobStoreDAO secondaryBlobStoreDAO;
private final EventDeadLetters eventDeadLetters;

@Inject
public BlobStoreProbe(@Named(MAYBE_SECONDARY_BLOBSTORE) BlobStoreDAO blobStoreDAO) {
public BlobStoreProbe(@Named(MAYBE_SECONDARY_BLOBSTORE) BlobStoreDAO blobStoreDAO,
@Named(TmailInjectNameConstants.TMAIL_EVENT_BUS_INJECT_NAME) EventDeadLetters eventDeadLetters) {
SecondaryBlobStoreDAO secondaryBlobStoreDAO = (SecondaryBlobStoreDAO) blobStoreDAO;
this.primaryBlobStoreDAO = (S3BlobStoreDAO) secondaryBlobStoreDAO.getFirstBlobStoreDAO();
this.secondaryBlobStoreDAO = (S3BlobStoreDAO) secondaryBlobStoreDAO.getSecondBlobStoreDAO();
this.eventDeadLetters = eventDeadLetters;
}

public S3BlobStoreDAO getPrimaryBlobStoreDAO() {
Expand All @@ -100,6 +113,10 @@ public S3BlobStoreDAO getPrimaryBlobStoreDAO() {
public S3BlobStoreDAO getSecondaryBlobStoreDAO() {
return secondaryBlobStoreDAO;
}

public EventDeadLetters getEventDeadLetters() {
return eventDeadLetters;
}
}

static final Domain DOMAIN = Domain.of("domain.tld");
Expand Down Expand Up @@ -162,7 +179,7 @@ static void afterAll() {
}

@BeforeEach
void setUp(GuiceJamesServer server) throws Exception {
void beforeEach(GuiceJamesServer server) throws Exception {
prepareBlobStore(server);

server.getProbe(DataProbeImpl.class)
Expand All @@ -171,6 +188,10 @@ void setUp(GuiceJamesServer server) throws Exception {
.addUser(BOB.asString(), BOB_PASSWORD)
.addUser(ANDRE.asString(), ANDRE_PASSWORD);

MailboxProbeImpl mailboxProbe = server.getProbe(MailboxProbeImpl.class);
mailboxProbe.createMailbox(MailboxPath.inbox(BOB));
mailboxProbe.createMailbox(MailboxPath.inbox(ANDRE));

UserCredential userCredential = new UserCredential(BOB, BOB_PASSWORD);
PreemptiveBasicAuthScheme authScheme = new PreemptiveBasicAuthScheme();
authScheme.setUserName(userCredential.username().asString());
Expand All @@ -186,11 +207,6 @@ void setUp(GuiceJamesServer server) throws Exception {
.setAuth(authScheme)
.addHeader(ACCEPT.toString(), ACCEPT_RFC8621_VERSION_HEADER)
.build();

MailboxProbeImpl mailboxProbe = server.getProbe(MailboxProbeImpl.class);
mailboxProbe.createMailbox(MailboxPath.inbox(BOB));
mailboxProbe.createMailbox(MailboxPath.inbox(ANDRE));

EncryptHelper.uploadPublicKey(ACCOUNT_ID, requestSpecification);
}

Expand Down Expand Up @@ -229,7 +245,7 @@ void sendEmailShouldResultingInSavingDataToBothObjectStorages(GuiceJamesServer s
}

@Test
void sendEmailShouldResultingInEventuallySavingDataToBothObjectStoragesWhenSecondStorageIsDown(GuiceJamesServer server) throws Exception {
void sendEmailShouldResultingInEventuallySavingDataToBothObjectStoragesWhenSecondStorageIsDownForShortTime(GuiceJamesServer server) throws Exception {
secondaryS3.pause();

given()
Expand All @@ -248,12 +264,79 @@ void sendEmailShouldResultingInEventuallySavingDataToBothObjectStoragesWhenSecon

BlobStoreProbe blobStoreProbe = server.getProbe(BlobStoreProbe.class);
BucketName bucketName = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBuckets()).collectList().block().getFirst();
List<BlobId> blobIds = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBlobs(bucketName)).collectList().block();
calmlyAwait.atMost(ONE_MINUTE)
.untilAsserted(() -> {
List<BlobId> blobIds = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBlobs(bucketName)).collectList().block();
List<BlobId> blobIds2 = Flux.from(blobStoreProbe.getSecondaryBlobStoreDAO().listBlobs(bucketName)).collectList().block();
assertThat(blobIds2).hasSameSizeAs(blobIds);
assertThat(blobIds2).hasSameElementsAs(blobIds);
});
}

@Test
void sendEmailShouldResultingInEventuallySavingDataToBothObjectStoragesWhenSecondStorageIsDownForLongTime(GuiceJamesServer server) throws Exception {
secondaryS3.pause();

given()
.body(LinagoraEmailSendMethodContract$.MODULE$.bobSendsAMailToAndre(server))
.when()
.post()
.then()
.statusCode(HttpStatus.SC_OK)
.contentType(JSON)
.extract()
.body()
.asString();

checkIfAllEventDeadLettersArePersisted(server);
secondaryS3.unpause();

WebAdminGuiceProbe webAdminGuiceProbe = server.getProbe(WebAdminGuiceProbe.class);
requestSpecification = WebAdminUtils.buildRequestSpecification(webAdminGuiceProbe.getWebAdminPort())
.build();

// trigger reprocessing event dead letters
String taskId = with()
.queryParam("action", "reDeliver")
.post(EventDeadLettersRoutes.BASE_PATH + "/groups/" + new FailedBlobOperationListener.FailedBlobOperationListenerGroup().asString())
.then()
.extract()
.jsonPath()
.get("taskId");

with()
.basePath(TasksRoutes.BASE)
.queryParam("timeout", "1m")
.get(taskId + "/await");

BlobStoreProbe blobStoreProbe = server.getProbe(BlobStoreProbe.class);
BucketName bucketName = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBuckets()).collectList().block().getFirst();
List<BlobId> expectedBlobIds = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBlobs(bucketName)).collectList().block();
calmlyAwait.atMost(TEN_SECONDS)
.untilAsserted(() -> {
List<BlobId> blobIds2 = Flux.from(blobStoreProbe.getSecondaryBlobStoreDAO().listBlobs(bucketName)).collectList().block();
assertThat(blobIds2).hasSameSizeAs(expectedBlobIds);
assertThat(blobIds2).hasSameElementsAs(expectedBlobIds);
});
}

private void checkIfAllEventDeadLettersArePersisted(GuiceJamesServer server) {
BlobStoreProbe blobStoreProbe = server.getProbe(BlobStoreProbe.class);
Group group = new FailedBlobOperationListener.FailedBlobOperationListenerGroup();
calmlyAwait.atMost(ONE_MINUTE)
.untilAsserted(() -> {
assertThatCode(() -> {
BucketName bucketName = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBuckets()).collectList().block().getFirst();
List<BlobId> expectedBlobIds = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBlobs(bucketName)).collectList().block();
List<EventDeadLetters.InsertionId> insertionIds = blobStoreProbe.getEventDeadLetters().failedIds(group)
.collectList().block();
List<FailedBlobEvents.BlobAddition> events = insertionIds.stream()
.map(insertionId -> blobStoreProbe.getEventDeadLetters().failedEvent(group, insertionId).block())
.map(FailedBlobEvents.BlobAddition.class::cast)
.toList();
assertThat(bucketName).isEqualTo(events.getFirst().bucketName());
assertThat(events.stream().map(FailedBlobEvents.BlobAddition::blobId)).hasSameElementsAs(expectedBlobIds);
}).doesNotThrowAnyException();
});
}
}

0 comments on commit 774c7d9

Please sign in to comment.