Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sync acknowledgment #749

Merged
merged 4 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/backend/src/delivery.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,8 @@ describe('Delivery', () => {
return (_: any, __: any, ___: any) => {};
},
getIdEnsName: async (ensName: string) => ensName,
syncAcknoledgment: async (
syncAcknowledge: async (
conversationId: string,
ensName: string,
lastMessagePull: string,
) => Promise<void>,
};
Expand Down
69 changes: 59 additions & 10 deletions packages/backend/src/delivery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export default () => {
next(e);
}
});

//TODO remove after storage refactoring
router.post(
'/messages/:ensName/syncAcknoledgment/:last_message_pull',
async (req, res, next) => {
Expand Down Expand Up @@ -148,17 +148,66 @@ export default () => {
contactEnsName,
);

if (req.app.locals.db) {
const db: IDatabase = req.app.locals.db;
const db: IDatabase = req.app.locals.db;

await db.syncAcknowledge(
conversationId,
Number.parseInt(req.params.last_message_pull),
);
}),
);

res.json();
} catch (e) {
next(e);
}
},
);
router.post(
'/messages/:ensName/syncAcknowledgment/:last_message_pull',
async (req, res, next) => {
const hasValidParams = validateSchema(
syncAcknoledgmentParamsSchema,
req.params,
);

const hasValidBody = validateSchema(
syncAcknoledgmentBodySchema,
req.body,
);

// eslint-disable-next-line max-len
//Express transform number inputs into strings. So we have to check if a string used as last_message_pull can be converted to a number later on.
const isLastMessagePullNumber = !isNaN(
Number.parseInt(req.params.last_message_pull),
);

if (!hasValidParams || !isLastMessagePullNumber || !hasValidBody) {
return res.send(400);
}

try {
const ensName = await req.app.locals.db.getIdEnsName(
req.params.ensName,
);

db.syncAcknoledgment(
conversationId,
ensName,
req.params.last_message_pull,
await Promise.all(
req.body.acknoledgments.map(async (ack: Acknoledgment) => {
const contactEnsName =
await await req.app.locals.db.getIdEnsName(
ack.contactAddress,
);
} else {
throw Error('db not connected');
}
const conversationId = getConversationId(
ensName,
contactEnsName,
);

const db: IDatabase = req.app.locals.db;

await db.syncAcknowledge(
conversationId,
Number.parseInt(req.params.last_message_pull),
);
}),
);

Expand Down
9 changes: 4 additions & 5 deletions packages/backend/src/persistance/getDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import { UserStorage } from '@dm3-org/dm3-lib-storage';
import { createClient } from 'redis';
import { getAliasChain, getIdEnsName } from './getIdEnsName';
import Messages from './messages';
import { syncAcknoledgment } from './messages/syncAcknoledgment';
import Notification from './notification';
import Pending from './pending';
import Session from './session';
import Storage from './storage';
import { syncAcknowledge } from './messages/syncAcknowledge';

export enum RedisPrefix {
Conversation = 'conversation:',
Expand Down Expand Up @@ -81,7 +81,7 @@ export async function getDatabase(_redis?: Redis): Promise<IDatabase> {
deletePending: Pending.deletePending(redis),
getIdEnsName: getIdEnsName(redis),
getAliasChain: getAliasChain(redis),
syncAcknoledgment: syncAcknoledgment(redis),
syncAcknowledge: syncAcknowledge(redis),
//Notification
getUsersNotificationChannels:
Notification.getUsersNotificationChannels(redis),
Expand Down Expand Up @@ -137,10 +137,9 @@ export interface IDatabase {
deletePending: (ensName: string) => Promise<void>;
getIdEnsName: (ensName: string) => Promise<string>;
getAliasChain: (ensName: string) => Promise<string[]>;
syncAcknoledgment: (
syncAcknowledge: (
conversationId: string,
ensName: string,
lastMessagePull: string,
syncTime: number,
) => Promise<void>;
getUsersNotificationChannels: (
ensName: string,
Expand Down
4 changes: 2 additions & 2 deletions packages/backend/src/persistance/messages/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { createMessage } from './createMessage';
import { deleteExpiredMessages } from './deleteExpiredMessages';
import { getMessages } from './getMessages';
import { syncAcknoledgment } from './syncAcknoledgment';
import { syncAcknowledge } from './syncAcknowledge';
import { getIncomingMessages } from './getIncomingMessages';
export default {
createMessage,
deleteExpiredMessages,
getMessages,
syncAcknoledgment,
syncAcknowledge,
getIncomingMessages,
};
31 changes: 0 additions & 31 deletions packages/backend/src/persistance/messages/syncAcknoledgment.ts

This file was deleted.

150 changes: 150 additions & 0 deletions packages/backend/src/persistance/messages/syncAcknowledge.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import { EncryptionEnvelop } from '@dm3-org/dm3-lib-messaging';
import winston from 'winston';
import { IDatabase, Redis, getDatabase, getRedisClient } from '../getDatabase';

const SENDER_ADDRESS = '0x25A643B6e52864d0eD816F1E43c0CF49C83B8292';
const RECEIVER_ADDRESS = '0xDd36ae7F9a8E34FACf1e110c6e9d37D0dc917855';

global.logger = winston.createLogger({
transports: [new winston.transports.Console()],
});

describe('Sync Acknowledge', () => {
let redisClient: Redis;
let db: IDatabase;
const logger = winston.createLogger({
transports: [new winston.transports.Console()],
});

beforeEach(async () => {
redisClient = await getRedisClient();
db = await getDatabase(redisClient);
await redisClient.flushDb();
});

afterEach(async () => {
await redisClient.flushDb();
await redisClient.disconnect();
});

it('Removes acknowledged messages from DS', async () => {
const envelop: EncryptionEnvelop = {
message: '',
metadata: {
deliveryInformation: {
to: RECEIVER_ADDRESS,
from: SENDER_ADDRESS,
},
signature: '',
encryptedMessageHash: '',
version: '',
encryptionScheme: 'x25519-chacha20-poly1305',
},
};

const conversionId = SENDER_ADDRESS + RECEIVER_ADDRESS;

const priorCreateMessages = await db.getIncomingMessages(
RECEIVER_ADDRESS,
10,
);

expect(priorCreateMessages.length).toBe(0);

await db.createMessage(conversionId, envelop, 200);

const afterCreateMessages = await db.getIncomingMessages(
RECEIVER_ADDRESS,
10,
);

expect(afterCreateMessages.length).toBe(1);

await db.syncAcknowledge(conversionId, 300);

const afterSyncAcknowledge = await db.getIncomingMessages(
RECEIVER_ADDRESS,
10,
);

expect(afterSyncAcknowledge.length).toBe(0);
});
it('Keeps messages on the DS that have been created after the sync ', async () => {
const envelop1: EncryptionEnvelop = {
message: '',
metadata: {
deliveryInformation: {
to: RECEIVER_ADDRESS,
from: SENDER_ADDRESS,
},
signature: '',
encryptedMessageHash: '',
version: '',
encryptionScheme: 'x25519-chacha20-poly1305',
},
};
const envelop2: EncryptionEnvelop = {
message: 'foo',
metadata: {
deliveryInformation: {
to: RECEIVER_ADDRESS,
from: SENDER_ADDRESS,
},
signature: '',
encryptedMessageHash: '',
version: '',
encryptionScheme: 'x25519-chacha20-poly1305',
},
};
const envelop3: EncryptionEnvelop = {
message: 'bar',
metadata: {
deliveryInformation: {
to: RECEIVER_ADDRESS,
from: SENDER_ADDRESS,
},
signature: '',
encryptedMessageHash: '',
version: '',
encryptionScheme: 'x25519-chacha20-poly1305',
},
};

const conversionId = SENDER_ADDRESS + RECEIVER_ADDRESS;

const priorCreateMessages = await db.getIncomingMessages(
RECEIVER_ADDRESS,
10,
);

expect(priorCreateMessages.length).toBe(0);

await db.createMessage(conversionId, envelop1, 200);
await db.createMessage(conversionId, envelop2, 301);
await db.createMessage(conversionId, envelop3, 302);

const afterCreateMessages = await db.getIncomingMessages(
RECEIVER_ADDRESS,
10,
);

expect(afterCreateMessages.length).toBe(3);

await db.syncAcknowledge(conversionId, 300);

let afterSyncAcknowledge = await db.getIncomingMessages(
RECEIVER_ADDRESS,
10,
);

expect(afterSyncAcknowledge.length).toBe(2);

await db.syncAcknowledge(conversionId, 303);

afterSyncAcknowledge = await db.getIncomingMessages(
RECEIVER_ADDRESS,
10,
);
expect(afterSyncAcknowledge.length).toBe(0);
});
});
10 changes: 10 additions & 0 deletions packages/backend/src/persistance/messages/syncAcknowledge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Redis, RedisPrefix } from '../getDatabase';
export function syncAcknowledge(redis: Redis) {
return async (conversationId: string, syncTime: number) => {
await redis.zRemRangeByScore(
RedisPrefix.Conversation + conversationId,
0,
syncTime,
);
};
}
30 changes: 26 additions & 4 deletions packages/lib/delivery-api/src/messaging-http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,28 @@ import { checkAccount, getAxiosConfig } from './utils';

const DELIVERY_PATH = process.env.REACT_APP_BACKEND + '/delivery';

//TOOD REMOVE AFTER STORAGE REFACTOR
export async function syncAcknoledgment(
provider: ethers.providers.JsonRpcProvider,
account: Account,
acknoledgments: Acknoledgment[],
token: string,
lastMessagePull: number,
): Promise<void> {
const { profile } = checkAccount(account);

const url = `${DELIVERY_PATH}/messages/${normalizeEnsName(
account!.ensName,
)}/syncAcknoledgment/${lastMessagePull}`;

return getDeliveryServiceClient(
profile,
provider,
async (url: string) => (await axios.get(url)).data,
).post(url, { acknoledgments }, getAxiosConfig(token));
}
export type SyncAcknoledgment = typeof syncAcknoledgment;

/**
* let the delivery service know that messages have been stored
* and can be deleted on the delivery service
Expand All @@ -20,26 +42,26 @@ const DELIVERY_PATH = process.env.REACT_APP_BACKEND + '/delivery';
* @param token The auth token
* @param lastMessagePull Timestamp of the last message pull
*/
export async function syncAcknoledgment(
export async function syncAcknowledgment(
provider: ethers.providers.JsonRpcProvider,
account: Account,
acknoledgments: Acknoledgment[],
token: string,
lastMessagePull: number,
lastSyncTime: number,
): Promise<void> {
const { profile } = checkAccount(account);

const url = `${DELIVERY_PATH}/messages/${normalizeEnsName(
account!.ensName,
)}/syncAcknoledgment/${lastMessagePull}`;
)}/syncAcknowledgment/${lastSyncTime}`;

return getDeliveryServiceClient(
profile,
provider,
async (url: string) => (await axios.get(url)).data,
).post(url, { acknoledgments }, getAxiosConfig(token));
}
export type SyncAcknoledgment = typeof syncAcknoledgment;
export type SyncAcknowledgment = typeof syncAcknoledgment;

/**
* returns the bufferd message which were send form contactEnsName
Expand Down
Loading