diff --git a/.env.selfhost.example b/.env.selfhost.example index 0ee4dcb..93e0f25 100644 --- a/.env.selfhost.example +++ b/.env.selfhost.example @@ -25,8 +25,6 @@ GOOGLE_CLIENT_SECRET="" AWS_DEFAULT_REGION="us-east-1" AWS_SECRET_KEY="" AWS_ACCESS_KEY="" - - DOCKER_OUTPUT=1 API_RATE_LIMIT=1 diff --git a/apps/web/prisma/migrations/20250311195754_added_rate_limit_settings/migration.sql b/apps/web/prisma/migrations/20250311195754_added_rate_limit_settings/migration.sql new file mode 100644 index 0000000..4739746 --- /dev/null +++ b/apps/web/prisma/migrations/20250311195754_added_rate_limit_settings/migration.sql @@ -0,0 +1,5 @@ +-- CreateEnum +CREATE TYPE "RateLimitPolicy" AS ENUM ('CANCEL', 'DELAY'); + +-- AlterTable +ALTER TABLE "Team" ADD COLUMN "rateLimitStrategy" "RateLimitPolicy" NOT NULL DEFAULT 'CANCEL'; diff --git a/apps/web/prisma/migrations/20250322114438_add_rate_limiting/migration.sql b/apps/web/prisma/migrations/20250322114438_add_rate_limiting/migration.sql new file mode 100644 index 0000000..b334ba0 --- /dev/null +++ b/apps/web/prisma/migrations/20250322114438_add_rate_limiting/migration.sql @@ -0,0 +1,19 @@ +/* + Warnings: + + - You are about to drop the column `rateLimitStrategy` on the `Team` table. All the data in the column will be lost. + +*/ +-- CreateEnum +CREATE TYPE "RateLimitType" AS ENUM ('EMAIL', 'DOMAIN'); + +-- CreateEnum +CREATE TYPE "RateLimitAction" AS ENUM ('DELAY', 'CANCEL'); + +-- AlterTable +ALTER TABLE "Team" DROP COLUMN "rateLimitStrategy", +ADD COLUMN "rateLimitAction" "RateLimitAction" NOT NULL DEFAULT 'DELAY', +ADD COLUMN "rateLimitType" "RateLimitType" NOT NULL DEFAULT 'EMAIL'; + +-- DropEnum +DROP TYPE "RateLimitPolicy"; diff --git a/apps/web/prisma/schema.prisma b/apps/web/prisma/schema.prisma index 44c6d6f..194528a 100644 --- a/apps/web/prisma/schema.prisma +++ b/apps/web/prisma/schema.prisma @@ -103,6 +103,18 @@ model Team { campaigns Campaign[] templates Template[] dailyEmailUsages DailyEmailUsage[] + rateLimitType RateLimitType @default(EMAIL) + rateLimitAction RateLimitAction @default(DELAY) +} + +enum RateLimitType { + EMAIL // Rate limit per email address + DOMAIN // Rate limit per domain +} + +enum RateLimitAction { + DELAY // Queue emails and send them later + CANCEL // Drop emails that exceed the rate limit } enum Role { diff --git a/apps/web/src/app/(dashboard)/emails/email-status-badge.tsx b/apps/web/src/app/(dashboard)/emails/email-status-badge.tsx index 93adcfa..0e20a72 100644 --- a/apps/web/src/app/(dashboard)/emails/email-status-badge.tsx +++ b/apps/web/src/app/(dashboard)/emails/email-status-badge.tsx @@ -1,5 +1,6 @@ import { EmailStatus } from "@prisma/client"; +// ADD RATE_LIMITED STATUS LATER export const EmailStatusBadge: React.FC<{ status: EmailStatus }> = ({ status, }) => { diff --git a/apps/web/src/env.js b/apps/web/src/env.js index 5c9f475..42bea19 100644 --- a/apps/web/src/env.js +++ b/apps/web/src/env.js @@ -95,6 +95,8 @@ export const env = createEnv({ S3_COMPATIBLE_SECRET_KEY: process.env.S3_COMPATIBLE_SECRET_KEY, S3_COMPATIBLE_API_URL: process.env.S3_COMPATIBLE_API_URL, S3_COMPATIBLE_PUBLIC_URL: process.env.S3_COMPATIBLE_PUBLIC_URL, + RATE_LIMIT_BY: process.env.RATE_LIMIT_BY, + SEND_RATE_LIMITTED_WITH_DELAY: SEND_RATE_LIMITTED_WITH_DELAY }, /** * Run `build` or `dev` with `SKIP_ENV_VALIDATION` to skip env validation. This is especially diff --git a/apps/web/src/server/public-api/api/emails/send-email.ts b/apps/web/src/server/public-api/api/emails/send-email.ts index 4d67eaf..f74d59c 100644 --- a/apps/web/src/server/public-api/api/emails/send-email.ts +++ b/apps/web/src/server/public-api/api/emails/send-email.ts @@ -73,7 +73,7 @@ function send(app: PublicAPIApp) { apiKeyId: team.apiKeyId, }); - return c.json({ emailId: email?.id }); + return c.json({ emailId: email.withinLimitEmail?.id }); }); } diff --git a/apps/web/src/server/service/email-service.ts b/apps/web/src/server/service/email-service.ts index a2d9104..40b7cdd 100644 --- a/apps/web/src/server/service/email-service.ts +++ b/apps/web/src/server/service/email-service.ts @@ -4,6 +4,10 @@ import { UnsendApiError } from "~/server/public-api/api-error"; import { EmailQueueService } from "./email-queue-service"; import { validateDomainFromEmail } from "./domain-service"; import { EmailRenderer } from "@unsend/email-editor/src/renderer"; +import { RedisRateLimiter } from "./rate-limitter"; +import { RateLimitAction, RateLimitType } from "@prisma/client"; + +const rateLimiter = new RedisRateLimiter(process.env.REDIS_URL, 1, 10); async function checkIfValidEmail(emailId: string) { const email = await db.email.findUnique({ @@ -67,7 +71,18 @@ export async function sendEmail( let subject = subjectFromApiCall; let html = htmlFromApiCall; - const domain = await validateDomainFromEmail(from, teamId); + const team = await db.team.findUnique({ + where: { id: teamId }, + }); + + if (!team) { + throw new Error(`Team with ID ${teamId} not found`); + } + + const [domain, rateLimitInfo] = await Promise.all([ + validateDomainFromEmail(from, teamId), + emailRateLimiter(to, team.rateLimitType, teamId), + ]); if (templateId) { const template = await db.template.findUnique({ @@ -78,7 +93,7 @@ export async function sendEmail( const jsonContent = JSON.parse(template.content || "{}"); const renderer = new EmailRenderer(jsonContent); - subject = replaceVariables(template.subject || "", variables || {}); + subject = replaceVariables(template.subject || "", variables || {}) ?? ''; // {{}} for link replacements const modifiedVariables = { @@ -104,55 +119,135 @@ export async function sendEmail( ? Math.max(0, scheduledAtDate.getTime() - Date.now()) : undefined; - const email = await db.email.create({ - data: { - to: Array.isArray(to) ? to : [to], - from, - subject: subject as string, - replyTo: replyTo - ? Array.isArray(replyTo) - ? replyTo - : [replyTo] - : undefined, - cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined, - bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined, - text, - html, - teamId, - domainId: domain.id, - attachments: attachments ? JSON.stringify(attachments) : undefined, - scheduledAt: scheduledAtDate, - latestStatus: scheduledAtDate ? "SCHEDULED" : "QUEUED", - apiId: apiKeyId, - }, - }); + let withinLimitEmail; + let rateLimittedEmail = { id: '' }; try { + withinLimitEmail = await db.email.create({ + data: { + to: rateLimitInfo.withinLimits, + from, + subject: subject ?? '', + replyTo: replyTo + ? Array.isArray(replyTo) + ? replyTo + : [replyTo] + : undefined, + cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined, + bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined, + text, + html, + teamId, + domainId: domain.id, + attachments: attachments ? JSON.stringify(attachments) : undefined, + scheduledAt: scheduledAtDate, + latestStatus: scheduledAtDate ? "SCHEDULED" : "QUEUED", + }, + }); + + // Create rate limited email always if there are rate-limited recipients + if (rateLimitInfo.rateLimited.length > 0) { + rateLimittedEmail = await db.email.create({ + data: { + to: rateLimitInfo.rateLimited, + from, + subject: subject ?? '', + replyTo: replyTo + ? Array.isArray(replyTo) + ? replyTo + : [replyTo] + : undefined, + cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined, + bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined, + text, + html, + teamId, + domainId: domain.id, + attachments: attachments ? JSON.stringify(attachments) : undefined, + scheduledAt: scheduledAtDate, + // If SEND_RATE_LIMITTED_WITH_DELAY is false, mark as RATE_LIMITED + latestStatus: team?.rateLimitAction === RateLimitAction.DELAY + ? (scheduledAtDate ? "SCHEDULED" : "QUEUED") + : "FAILED", + }, + }); + + if (!teamId) { + await db.emailEvent.create({ + data: { + emailId: rateLimittedEmail.id, + status: "FAILED", + data: { + message: "Email rate-limited and not queued for retry", + }, + }, + }); + } + } + await EmailQueueService.queueEmail( - email.id, + withinLimitEmail.id, domain.region, true, undefined, delay ); + + if (rateLimittedEmail?.id && team?.rateLimitAction === RateLimitAction.DELAY) { + const retryDelay = 1 * 1000; + await EmailQueueService.queueEmail( + rateLimittedEmail.id, + domain.region, + true, + undefined, + delay ? delay + retryDelay : retryDelay + ); + } + + return { + withinLimitEmail, + rateLimittedEmail, + rateLimitStatus: rateLimitInfo.rateLimited.length > 0 + ? (team?.rateLimitAction === RateLimitAction.DELAY ? true : false) + : undefined + }; + } catch (error: any) { - await db.emailEvent.create({ - data: { - emailId: email.id, - status: "FAILED", + // If any error occurs, mark both emails as failed + if (withinLimitEmail?.id) { + await db.emailEvent.create({ data: { - error: error.toString(), + emailId: withinLimitEmail.id, + status: "FAILED", + data: { + error: error.toString(), + }, }, - }, - }); - await db.email.update({ - where: { id: email.id }, - data: { latestStatus: "FAILED" }, - }); + }); + await db.email.update({ + where: { id: withinLimitEmail.id }, + data: { latestStatus: "FAILED" }, + }); + } + + if (rateLimittedEmail?.id) { + await db.emailEvent.create({ + data: { + emailId: rateLimittedEmail.id, + status: "FAILED", + data: { + error: error.toString(), + }, + }, + }); + await db.email.update({ + where: { id: rateLimittedEmail.id }, + data: { latestStatus: "FAILED" }, + }); + } + throw error; } - - return email; } export async function updateEmail( @@ -213,3 +308,73 @@ export async function cancelEmail(emailId: string) { }, }); } + +async function emailRateLimiter(to: string | string[], rateLimitBy: RateLimitType, teamId: number): Promise<{rateLimited: string[], withinLimits: string[]}> { + let identifiers: string | string[]; + + switch (rateLimitBy) { + case 'DOMAIN': + if (typeof to === 'string') { + const domain = to.split('@')[1]; + if (!domain) { + throw new UnsendApiError({ + code: "BAD_REQUEST", + message: "Invalid email address format for rate limiting by domain.", + }); + } + identifiers = [domain]; + } else { + identifiers = to.map(email => { + const domain = email.split('@')[1]; + if (!domain) { + throw new UnsendApiError({ + code: "BAD_REQUEST", + message: "Invalid email address format for rate limiting by domain.", + }); + } + + return domain; + }); + } + break; + + case 'EMAIL': + identifiers = Array.isArray(to) ? to : [to]; + break; + + default: + throw new UnsendApiError({ + code: "BAD_REQUEST", + message: "Invalid rate limiting strategy", + }); + } + + return await processRateLimits(rateLimitBy, identifiers, teamId); +} + +async function processRateLimits( + rateLimitBy: RateLimitType, + identifiers: string[], + teamId: number +) { + const rateLimited: string[] = []; + const withinLimits: string[] = []; + + await Promise.all( + identifiers.map(async (identifier) => { + try { + const key = rateLimiter.getRateLimitKey(`${teamId}:${rateLimitBy}:${identifier}`); + await rateLimiter.checkRateLimit(key); + withinLimits.push(identifier); + } catch (error) { + console.error(`Rate limiting failed for ${identifier}`); + rateLimited.push(identifier); + } + }) + ); + + console.log(`Rate-limited: ${rateLimited.join(", ")}`); + console.log(`Within limit: ${withinLimits.join(", ")}`); + + return { rateLimited, withinLimits } +} diff --git a/apps/web/src/server/service/rate-limitter.ts b/apps/web/src/server/service/rate-limitter.ts new file mode 100644 index 0000000..617f159 --- /dev/null +++ b/apps/web/src/server/service/rate-limitter.ts @@ -0,0 +1,64 @@ +import { Redis } from "ioredis"; +import { env } from "~/env"; +import { UnsendApiError } from "../public-api/api-error"; + +export class RedisRateLimiter { + private redis: Redis; + private windowSize: number; + private maxRequests: number; + + constructor( + redisUrl: string = env.REDIS_URL ?? 'localhost:6379', + windowSize: number = 1, + maxRequests: number = env.API_RATE_LIMIT + ) { + this.redis = new Redis(redisUrl); + this.windowSize = windowSize; + this.maxRequests = maxRequests; + } + + public getRateLimitKey(token: string): string { + return `rate_limit:${token}`; + } + + private async cleanupOldEntries(key: string): Promise { + const now = Math.floor(Date.now() / 1000); + await this.redis.zremrangebyscore(key, 0, now - this.windowSize); + } + + private async addCurrentRequest(key: string): Promise { + const now = Math.floor(Date.now() / 1000); + await this.redis.zadd(key, now, `${now}-${Math.random()}`); + await this.redis.expire(key, this.windowSize * 2); + } + + public async checkRateLimit(token: string): Promise { + const key = this.getRateLimitKey(token); + + await this.cleanupOldEntries(key); + await this.addCurrentRequest(key); + + const requestCount = await this.redis.zcard(key); + + if (requestCount > this.maxRequests) { + throw new UnsendApiError({ + code: "RATE_LIMITED", + message: `Rate limit exceeded, ${this.maxRequests} requests per second`, + }); + } + } + + async getRateLimitInfo(token: string) { + const key = this.getRateLimitKey(token); + const now = Math.floor(Date.now() / 1000); + + await this.cleanupOldEntries(key); + const currentUsage = await this.redis.zcard(key); + + return { + limit: this.maxRequests, + remaining: Math.max(0, this.maxRequests - currentUsage), + reset: now + this.windowSize, + }; + } + } \ No newline at end of file