Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/email rate limiting #94

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .env.selfhost.example
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ GITHUB_SECRET="<your-github-client-secret>"
AWS_DEFAULT_REGION="us-east-1"
AWS_SECRET_KEY="<your-aws-secret-key>"
AWS_ACCESS_KEY="<your-aws-access-key>"



DOCKER_OUTPUT=1
API_RATE_LIMIT=1
Expand Down
1 change: 1 addition & 0 deletions apps/web/src/app/(dashboard)/emails/email-status-badge.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { EmailStatus } from "@prisma/client";

// ADD RATE_LIMITED STATUS LATER
export const EmailStatusBadge: React.FC<{ status: EmailStatus }> = ({
status,
}) => {
Expand Down
4 changes: 4 additions & 0 deletions apps/web/src/env.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ export const env = createEnv({
S3_COMPATIBLE_SECRET_KEY: z.string().optional(),
S3_COMPATIBLE_API_URL: z.string().optional(),
S3_COMPATIBLE_PUBLIC_URL: z.string().optional(),
RATE_LIMIT_BY: z.enum(['DOMAIN', "RECIEVER"]).optional(),
SEND_RATE_LIMITTED_WITH_DELAY: z.boolean().optional().default(false)
},

/**
Expand Down Expand Up @@ -98,6 +100,8 @@ export const env = createEnv({
S3_COMPATIBLE_SECRET_KEY: process.env.S3_COMPATIBLE_SECRET_KEY,
S3_COMPATIBLE_API_URL: process.env.S3_COMPATIBLE_API_URL,
S3_COMPATIBLE_PUBLIC_URL: process.env.S3_COMPATIBLE_PUBLIC_URL,
RATE_LIMIT_BY: process.env.RATE_LIMIT_BY,
SEND_RATE_LIMITTED_WITH_DELAY: SEND_RATE_LIMITTED_WITH_DELAY
},
/**
* Run `build` or `dev` with `SKIP_ENV_VALIDATION` to skip env validation. This is especially
Expand Down
2 changes: 1 addition & 1 deletion apps/web/src/server/public-api/api/emails/send-email.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ function send(app: PublicAPIApp) {
teamId: team.id,
});

return c.json({ emailId: email?.id });
return c.json({ emailId: email.withinLimitEmail?.id });
});
}

Expand Down
230 changes: 194 additions & 36 deletions apps/web/src/server/service/email-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { db } from "../db";
import { UnsendApiError } from "~/server/public-api/api-error";
import { EmailQueueService } from "./email-queue-service";
import { validateDomainFromEmail } from "./domain-service";
import { RedisRateLimiter } from "./rate-limitter";
import { env } from "~/env";

const rateLimiter = new RedisRateLimiter(process.env.REDIS_URL, 1, 10);

async function checkIfValidEmail(emailId: string) {
const email = await db.email.findUnique({
Expand Down Expand Up @@ -50,61 +54,145 @@ export async function sendEmail(
scheduledAt,
} = emailContent;

const domain = await validateDomainFromEmail(from, teamId);
const [domain, rateLimitInfo] = await Promise.all([
await validateDomainFromEmail(from, teamId),
await emailRateLimiter(to, "DOMAIN", teamId)
])

const scheduledAtDate = scheduledAt ? new Date(scheduledAt) : undefined;
const delay = scheduledAtDate
? Math.max(0, scheduledAtDate.getTime() - Date.now())
: undefined;

const email = await db.email.create({
data: {
to: Array.isArray(to) ? to : [to],
from,
subject,
replyTo: replyTo
? Array.isArray(replyTo)
? replyTo
: [replyTo]
: undefined,
cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined,
bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined,
text,
html,
teamId,
domainId: domain.id,
attachments: attachments ? JSON.stringify(attachments) : undefined,
scheduledAt: scheduledAtDate,
latestStatus: scheduledAtDate ? "SCHEDULED" : "QUEUED",
},
});
let withinLimitEmail;
let rateLimittedEmail = { id: '' };

try {
withinLimitEmail = await db.email.create({
data: {
to: rateLimitInfo.withinLimits,
from,
subject,
replyTo: replyTo
? Array.isArray(replyTo)
? replyTo
: [replyTo]
: undefined,
cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined,
bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined,
text,
html,
teamId,
domainId: domain.id,
attachments: attachments ? JSON.stringify(attachments) : undefined,
scheduledAt: scheduledAtDate,
latestStatus: scheduledAtDate ? "SCHEDULED" : "QUEUED",
},
});

// Create rate limited email always if there are rate-limited recipients
if (rateLimitInfo.rateLimited.length > 0) {
rateLimittedEmail = await db.email.create({
data: {
to: rateLimitInfo.rateLimited,
from,
subject,
replyTo: replyTo
? Array.isArray(replyTo)
? replyTo
: [replyTo]
: undefined,
cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined,
bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined,
text,
html,
teamId,
domainId: domain.id,
attachments: attachments ? JSON.stringify(attachments) : undefined,
scheduledAt: scheduledAtDate,
// If SEND_RATE_LIMITTED_WITH_DELAY is false, mark as RATE_LIMITED
latestStatus: env.SEND_RATE_LIMITTED_WITH_DELAY
? (scheduledAtDate ? "SCHEDULED" : "QUEUED")
: "FAILED",
},
});

if (!env.SEND_RATE_LIMITTED_WITH_DELAY) {
await db.emailEvent.create({
data: {
emailId: rateLimittedEmail.id,
status: "FAILED",
data: {
message: "Email rate-limited and not queued for retry",
},
},
});
}
}

await EmailQueueService.queueEmail(
email.id,
withinLimitEmail.id,
domain.region,
true,
undefined,
delay
);

if (rateLimittedEmail?.id && env.SEND_RATE_LIMITTED_WITH_DELAY) {
const retryDelay = 1 * 1000;
await EmailQueueService.queueEmail(
rateLimittedEmail.id,
domain.region,
true,
undefined,
delay ? delay + retryDelay : retryDelay
);
}

return {
withinLimitEmail,
rateLimittedEmail,
rateLimitStatus: rateLimitInfo.rateLimited.length > 0
? (env.SEND_RATE_LIMITTED_WITH_DELAY ? true : false)
: undefined
};

} catch (error: any) {
await db.emailEvent.create({
data: {
emailId: email.id,
status: "FAILED",
// If any error occurs, mark both emails as failed
if (withinLimitEmail?.id) {
await db.emailEvent.create({
data: {
error: error.toString(),
emailId: withinLimitEmail.id,
status: "FAILED",
data: {
error: error.toString(),
},
},
},
});
await db.email.update({
where: { id: email.id },
data: { latestStatus: "FAILED" },
});
});
await db.email.update({
where: { id: withinLimitEmail.id },
data: { latestStatus: "FAILED" },
});
}

if (rateLimittedEmail?.id) {
await db.emailEvent.create({
data: {
emailId: rateLimittedEmail.id,
status: "FAILED",
data: {
error: error.toString(),
},
},
});
await db.email.update({
where: { id: rateLimittedEmail.id },
data: { latestStatus: "FAILED" },
});
}

throw error;
}

return email;
}

export async function updateEmail(
Expand Down Expand Up @@ -165,3 +253,73 @@ export async function cancelEmail(emailId: string) {
},
});
}

async function emailRateLimiter(to: string | string[], rateLimitBy: 'DOMAIN' | 'RECIEVER', teamId: number): Promise<{rateLimited: string[], withinLimits: string[]}> {
let identifiers: string | string[];

switch (rateLimitBy) {
case 'DOMAIN':
if (typeof to === 'string') {
const domain = to.split('@')[1];
if (!domain) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "Invalid email address format for rate limiting by domain.",
});
}
identifiers = [domain];
} else {
identifiers = to.map(email => {
const domain = email.split('@')[1];
if (!domain) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "Invalid email address format for rate limiting by domain.",
});
}

return domain;
});
}
break;

case 'RECIEVER':
identifiers = Array.isArray(to) ? to : [to];
break;

default:
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "Invalid rate limiting strategy",
});
}

return await processRateLimits(rateLimitBy, identifiers, teamId);
}

async function processRateLimits(
rateLimitBy: 'DOMAIN' | 'RECIEVER',
identifiers: string[],
teamId: number
) {
const rateLimited: string[] = [];
const withinLimits: string[] = [];

await Promise.all(
identifiers.map(async (identifier) => {
try {
const key = rateLimiter.getRateLimitKey(`${teamId}:${rateLimitBy}:${identifier}`);
await rateLimiter.checkRateLimit(key);
withinLimits.push(identifier);
} catch (error) {
console.error(`Rate limiting failed for ${identifier}`);
rateLimited.push(identifier);
}
})
);

console.log(`Rate-limited: ${rateLimited.join(", ")}`);
console.log(`Within limit: ${withinLimits.join(", ")}`);

return { rateLimited, withinLimits }
}
64 changes: 64 additions & 0 deletions apps/web/src/server/service/rate-limitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { Redis } from "ioredis";
import { env } from "~/env";
import { UnsendApiError } from "../public-api/api-error";

export class RedisRateLimiter {
private redis: Redis;
private windowSize: number;
private maxRequests: number;

constructor(
redisUrl: string = env.REDIS_URL ?? 'localhost:6379',
windowSize: number = 1,
maxRequests: number = env.API_RATE_LIMIT
) {
this.redis = new Redis(redisUrl);
this.windowSize = windowSize;
this.maxRequests = maxRequests;
}

public getRateLimitKey(token: string): string {
return `rate_limit:${token}`;
}

private async cleanupOldEntries(key: string): Promise<void> {
const now = Math.floor(Date.now() / 1000);
await this.redis.zremrangebyscore(key, 0, now - this.windowSize);
}

private async addCurrentRequest(key: string): Promise<void> {
const now = Math.floor(Date.now() / 1000);
await this.redis.zadd(key, now, `${now}-${Math.random()}`);
await this.redis.expire(key, this.windowSize * 2);
}

public async checkRateLimit(token: string): Promise<void> {
const key = this.getRateLimitKey(token);

await this.cleanupOldEntries(key);
await this.addCurrentRequest(key);

const requestCount = await this.redis.zcard(key);

if (requestCount > this.maxRequests) {
throw new UnsendApiError({
code: "RATE_LIMITED",
message: `Rate limit exceeded, ${this.maxRequests} requests per second`,
});
}
}

async getRateLimitInfo(token: string) {
const key = this.getRateLimitKey(token);
const now = Math.floor(Date.now() / 1000);

await this.cleanupOldEntries(key);
const currentUsage = await this.redis.zcard(key);

return {
limit: this.maxRequests,
remaining: Math.max(0, this.maxRequests - currentUsage),
reset: now + this.windowSize,
};
}
}