-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrepository.py
195 lines (164 loc) · 6.83 KB
/
repository.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import secrets
import string
from fastapi import HTTPException
from sqlalchemy import select, update, delete
from database import new_session, LinkOrm
from schemas import SLinkAdd, SLinkResponse
from datetime import datetime, timedelta
from typing import Optional
import asyncio
import logging
from urllib.parse import unquote
logger = logging.getLogger(__name__)
def normalize_url(url: str) -> str:
return unquote(url).lower().strip()
class LinkRepository:
@staticmethod
def generate_short_code(length: int = 8) -> str:
"""
Генерация короткого кода для URL.
"""
chars = string.ascii_letters + string.digits
return ''.join(secrets.choice(chars) for _ in range(length))
@classmethod
async def add_one(cls, data: SLinkAdd, user_id: Optional[int] = None) -> SLinkResponse:
"""
Добавление новой ссылки в БД.
"""
async with new_session() as session:
try:
normalized_url = normalize_url(str(data.original_url))
if data.custom_alias:
existing_link = await cls.find_by_short_code(data.custom_alias)
if existing_link:
raise HTTPException(
status_code=400,
detail="Пользовательский алиас уже занят."
)
short_code = data.custom_alias
else:
while True:
short_code = cls.generate_short_code()
existing_link = await cls.find_by_short_code(short_code)
if not existing_link:
break
expires_at = data.expires_at if data.expires_at else datetime.utcnow() + timedelta(days=30)
link = LinkOrm(
original_url=normalized_url,
short_code=short_code,
user_id=user_id,
expires_at=expires_at,
)
session.add(link)
await session.flush()
await session.commit()
logger.debug(f"Link created: {link}")
return SLinkResponse(
id=link.id,
original_url=link.original_url,
short_code=link.short_code,
created_at=link.created_at,
expires_at=link.expires_at,
user_id=link.user_id,
click_count=link.click_count,
short_url=None,
)
except HTTPException as e:
raise e
except Exception as e:
logger.error(f"Error adding link: {e}")
await session.rollback()
raise HTTPException(status_code=500, detail="Internal Server Error")
@classmethod
async def find_by_short_code(cls, short_code: str) -> LinkOrm:
"""
Поиск по короткому коду.
"""
async with new_session() as session:
query = select(LinkOrm).where(LinkOrm.short_code == short_code)
result = await session.execute(query)
return result.scalars().first()
@classmethod
async def find_by_original_url(cls, original_url: str) -> Optional[SLinkResponse]:
"""
Поиск по оригинальному URL.
"""
async with new_session() as session:
normalized_url = normalize_url(original_url)
logger.debug(f"Normalized URL: {normalized_url}")
query = select(LinkOrm).where(LinkOrm.original_url == normalized_url)
result = await session.execute(query)
link = result.scalars().first()
if link:
logger.debug(f"Found link: {link.original_url}")
else:
logger.debug("Link not found")
if not link:
return None
return SLinkResponse(
id=link.id,
original_url=link.original_url,
short_code=link.short_code,
created_at=link.created_at,
expires_at=link.expires_at,
user_id=link.user_id,
click_count=link.click_count,
short_url=f"http://127.0.0.1:8000/links/{link.short_code}",
)
@classmethod
async def delete_by_short_code(cls, short_code: str, user_id: int):
"""
Удаление ссылки по короткому коду.
"""
async with new_session() as session:
query = delete(LinkOrm).where(
(LinkOrm.short_code == short_code) & (LinkOrm.user_id == user_id)
)
await session.execute(query)
await session.commit()
@classmethod
async def update_original_url(cls, short_code: str, new_url: str, user_id: int) -> LinkOrm:
"""
Обновление оригинального URL.
"""
async with new_session() as session:
try:
normalized_url = normalize_url(new_url)
query = update(LinkOrm).where(
(LinkOrm.short_code == short_code) & (LinkOrm.user_id == user_id)
).values(original_url=normalized_url)
await session.execute(query)
await session.commit()
updated_link = await cls.find_by_short_code(short_code)
if not updated_link:
logger.error(f"Failed to fetch updated link: short_code={short_code}")
return None
return updated_link
except Exception as e:
logger.error(f"Error updating link in database: {e}")
await session.rollback()
return None
@classmethod
async def increment_click_count(cls, link_id: int):
"""
Счетчик переходов по ссылке.
"""
async with new_session() as session:
query = update(LinkOrm).where(LinkOrm.id == link_id).values(click_count=LinkOrm.click_count + 1)
await session.execute(query)
await session.commit()
async def delete_expired_links():
"""
Фоновая задача для удаления истекших ссылок.
"""
while True:
async with new_session() as session:
try:
query = delete(LinkOrm).where(LinkOrm.expires_at < datetime.utcnow())
await session.execute(query)
await session.commit()
logger.info("Expired links deleted")
except Exception as e:
logger.error(f"Error deleting expired links: {e}")
await session.rollback()
await asyncio.sleep(1800)