Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: delete tg post video & update FinalRip & store relative path #10

Merged
merged 6 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ cython_debug/

/deploy/docker/

/tests/task.json
/task.json
/conf/store.json
/store.json
/conf/store.json
/tests/store.json
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ or you can use docker to run the project, see [docker-compose.yml](./deploy/dock

#### RSS Config:

supports hot reloading, which means you can update your config without needing to restart the service.
supports hot reloading, which means you can update your config without needing to restart the service.

you should provide the compatible params and scripts in the [params](./conf/params) and [scripts](./conf/scripts) folder.

Expand Down
2 changes: 1 addition & 1 deletion animepipeline/bt/qb.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def get_downloaded_path(self, torrent_hash: str) -> Optional[Path]:
if torrent[0].state in self.COMPLETE_STATES:
file_list: List[Tuple[str, int]] = [(file["name"], file["size"]) for file in torrent[0].files]
file_list.sort(key=lambda x: x[1], reverse=True)
return self.download_path / Path(file_list[0][0])
return Path(file_list[0][0])

else:
return None
Expand Down
10 changes: 10 additions & 0 deletions animepipeline/config/rss.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class BaseConfig(BaseModel):
uploader: str
script: str
param: str
slice: Optional[bool] = True
timeout: Optional[int] = 20


class NyaaConfig(BaseModel):
Expand All @@ -21,6 +23,8 @@ class NyaaConfig(BaseModel):
uploader: Optional[str] = None
script: Optional[str] = None
param: Optional[str] = None
slice: Optional[bool] = None
timeout: Optional[int] = None


class RSSConfig(BaseModel):
Expand Down Expand Up @@ -108,6 +112,12 @@ def _gen_dict(folder_path: Union[Path, str]) -> Dict[str, str]:
if item.uploader is None:
item.uploader = config.base.uploader

if item.slice is None:
item.slice = config.base.slice

if item.timeout is None:
item.timeout = config.base.timeout

if item.script is None:
item.script = config.base.script
else:
Expand Down
3 changes: 0 additions & 3 deletions animepipeline/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ class FinalRipConfig(BaseModel):

class TelegramConfig(BaseModel):
enable: bool
local_mode: bool
base_url: AnyUrl
base_file_url: AnyUrl
bot_token: str
channel_id: Union[str, int]

Expand Down
16 changes: 12 additions & 4 deletions animepipeline/encode/finalrip.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import mimetypes
import time
from pathlib import Path
from typing import Union
from typing import Optional, Union

import httpx
from httpx import AsyncClient
Expand All @@ -30,7 +30,7 @@

class FinalRipClient:
def __init__(self, config: FinalRipConfig):
self.client = AsyncClient(base_url=str(config.url), headers={"token": str(config.token)})
self.client = AsyncClient(base_url=str(config.url), headers={"token": str(config.token)}, timeout=30)

async def ping(self) -> PingResponse:
try:
Expand Down Expand Up @@ -184,15 +184,23 @@ def _upload_file() -> None:
if not new_task_response.success:
logger.error(f"Error creating task: {new_task_response.error.message}") # type: ignore

async def start_task(self, video_key: str, encode_param: str, script: str) -> None:
async def start_task(
self, video_key: str, encode_param: str, script: str, slice: Optional[bool] = True, timeout: Optional[int] = 20
) -> None:
"""
start encode task

:param video_key: video_key of the task
:param encode_param: encode param
:param script: encode script
:param slice: cut video into clips or not
:param timeout: clip timeout, default 20 minutes
"""
resp = await self._start_task(StartTaskRequest(video_key=video_key, encode_param=encode_param, script=script))
resp = await self._start_task(
StartTaskRequest(
video_key=video_key, encode_param=encode_param, script=script, slice=slice, timeout=timeout
)
)
if not resp.success:
logger.warning(f"Failed to start finalrip task: {resp.error.message}") # type: ignore

Expand Down
2 changes: 2 additions & 0 deletions animepipeline/encode/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class StartTaskRequest(BaseModel):
encode_param: str
script: str
video_key: str
slice: Optional[bool] = None
timeout: Optional[int] = None


class StartTaskResponse(BaseModel):
Expand Down
80 changes: 41 additions & 39 deletions animepipeline/loop.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
from pathlib import Path
from typing import Any, Callable, Coroutine, List
from typing import Any, Callable, Coroutine, List, Optional

from loguru import logger
from pydantic import DirectoryPath

from animepipeline.bt import QBittorrentManager
from animepipeline.config import NyaaConfig, RSSConfig, ServerConfig
Expand All @@ -15,33 +16,42 @@


class TaskInfo(TorrentInfo):
download_path: DirectoryPath
uploader: str
script: str
param: str
script_content: str
param_content: str
slice: Optional[bool] = True
timeout: Optional[int] = 20


def build_task_info(torrent_info: TorrentInfo, nyaa_config: NyaaConfig, rss_config: RSSConfig) -> TaskInfo:
def build_task_info(
torrent_info: TorrentInfo, nyaa_config: NyaaConfig, rss_config: RSSConfig, server_config: ServerConfig
) -> TaskInfo:
"""
Build TaskInfo from TorrentInfo, NyaaConfig and RSSConfig

:param torrent_info: TorrentInfo
:param nyaa_config: NyaaConfig
:param rss_config: RSSConfig
:param server_config: ServerConfig
:return: TaskInfo
"""
if nyaa_config.script not in rss_config.scripts:
raise ValueError(f"script not found: {nyaa_config.script}")
if nyaa_config.param not in rss_config.params:
raise ValueError(f"param not found: {nyaa_config.param}")

script = rss_config.scripts[nyaa_config.script]
param = rss_config.params[nyaa_config.param]
script_content = rss_config.scripts[nyaa_config.script]
param_content = rss_config.params[nyaa_config.param]

return TaskInfo(
**torrent_info.model_dump(),
download_path=server_config.qbittorrent.download_path,
uploader=nyaa_config.uploader,
script=script,
param=param,
script_content=script_content,
param_content=param_content,
slice=nyaa_config.slice,
timeout=nyaa_config.timeout,
)


Expand Down Expand Up @@ -94,7 +104,12 @@ async def start(self) -> None:
torrent_info_list = parse_nyaa(cfg)

for torrent_info in torrent_info_list:
task_info = build_task_info(torrent_info, cfg, self.rss_config)
task_info = build_task_info(
torrent_info=torrent_info,
nyaa_config=cfg,
rss_config=self.rss_config,
server_config=self.server_config,
)

await self.task_executor.submit_task(torrent_info.hash, self.pipeline, task_info)

Expand All @@ -107,7 +122,7 @@ def add_pipeline_task(self) -> None:
"""
self.pipeline_tasks.append(self.pipeline_bt)
self.pipeline_tasks.append(self.pipeline_finalrip)
self.pipeline_tasks.append(self.pipeline_tg)
self.pipeline_tasks.append(self.pipeline_post)

async def pipeline(self, task_info: TaskInfo) -> None:
# init task status
Expand Down Expand Up @@ -168,7 +183,7 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None:
logger.info(f'Start FinalRip Encode for "{task_info.name}" EP {task_info.episode}')
# start finalrip task

bt_downloaded_path = Path(task_status.bt_downloaded_path)
bt_downloaded_path = Path(task_info.download_path) / task_status.bt_downloaded_path

while not await self.finalrip_client.check_task_exist(bt_downloaded_path.name):
try:
Expand All @@ -181,33 +196,21 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None:
try:
await self.finalrip_client.start_task(
video_key=bt_downloaded_path.name,
encode_param=task_info.param,
script=task_info.script,
encode_param=task_info.param_content,
script=task_info.script_content,
slice=task_info.slice,
timeout=task_info.timeout,
)
logger.info(f'FinalRip Task Started for "{task_info.name}" EP {task_info.episode}')
except Exception as e:
logger.error(f"Failed to start finalrip task: {e}")

# wait video cut done
await asyncio.sleep(10)
await asyncio.sleep(30)

# check task progress
while not await self.finalrip_client.check_task_completed(bt_downloaded_path.name):
# retry merge if all clips are done but merge failed?
if await self.finalrip_client.check_task_all_clips_done(bt_downloaded_path.name):
# wait 30s before retry merge
await asyncio.sleep(30)
# check again
if await self.finalrip_client.check_task_completed(bt_downloaded_path.name):
break

try:
await self.finalrip_client.retry_merge(bt_downloaded_path.name)
logger.info(f'Retry Merge Clips for "{task_info.name}" EP {task_info.episode}')
except Exception as e:
logger.error(f'Failed to retry merge clips for "{task_info.name}" EP {task_info.episode}: {e}')

await asyncio.sleep(10)
await asyncio.sleep(30)

# download temp file to bt_downloaded_path's parent directory
temp_saved_path: Path = bt_downloaded_path.parent / (bt_downloaded_path.name + "-encoded.mkv")
Expand All @@ -220,7 +223,7 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None:
path=temp_saved_path, episode=task_info.episode, name=task_info.name, uploader=task_info.uploader
)
)
finalrip_downloaded_path = bt_downloaded_path.parent / gen_name
finalrip_downloaded_path = Path(task_info.download_path) / gen_name
except Exception as e:
logger.error(f"Failed to generate file name: {e}")
raise e
Expand All @@ -234,32 +237,31 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None:
logger.info(f'FinalRip Encode Done for "{finalrip_downloaded_path.name}"')

# update task status
task_status.finalrip_downloaded_path = str(finalrip_downloaded_path)
task_status.finalrip_downloaded_path = gen_name
await self.json_store.update_task(task_info.hash, task_status)

async def pipeline_tg(self, task_info: TaskInfo) -> None:
async def pipeline_post(self, task_info: TaskInfo) -> None:
task_status = await self.json_store.get_task(task_info.hash)

if self.tg_channel_sender is None:
logger.info("Telegram Channel Sender is not enabled. Skip upload.")
return

# check tg
if task_status.tg_uploaded:
if task_status.tg_posted:
return

if task_status.finalrip_downloaded_path is None:
logger.error("FinalRip download path is None! finalrip download task not finished?")
raise ValueError("FinalRip download path is None! finalrip download task not finished?")

logger.info(f'Start Telegram Channel Upload for "{task_info.name}" EP {task_info.episode}')
logger.info(f'Post to Telegram Channel for "{task_info.name}" EP {task_info.episode}')

finalrip_downloaded_path = Path(task_status.finalrip_downloaded_path)
finalrip_downloaded_path = Path(task_info.download_path) / task_status.finalrip_downloaded_path

await self.tg_channel_sender.send_video(
video_path=finalrip_downloaded_path,
caption=f"{task_info.translation} | EP {task_info.episode} | {finalrip_downloaded_path.name}",
await self.tg_channel_sender.send_text(
text=f"{task_info.translation} | EP {task_info.episode} | {finalrip_downloaded_path.name}",
)

task_status.tg_uploaded = True
task_status.tg_posted = True
await self.json_store.update_task(task_info.hash, task_status)
51 changes: 11 additions & 40 deletions animepipeline/post/tg.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from pathlib import Path
from typing import Optional, Union

import telegram.error
import telegram
from loguru import logger
from telegram import Bot
from tenacity import retry, stop_after_attempt, wait_random

from animepipeline.config import TelegramConfig

Expand All @@ -16,47 +14,20 @@ class TGChannelSender:
"""

def __init__(self, config: TelegramConfig) -> None:
if config.local_mode:
self.bot = Bot(
token=config.bot_token,
base_url=str(config.base_url),
base_file_url=str(config.base_file_url),
local_mode=True,
)
else:
self.bot = Bot(token=config.bot_token)

self.bot = Bot(token=config.bot_token)
self.channel_id = config.channel_id

async def send_video(self, video_path: Union[Path, str], caption: Optional[str] = None) -> None:
@retry(wait=wait_random(min=3, max=5), stop=stop_after_attempt(5))
async def send_text(self, text: str) -> None:
"""
Send video to the channel.
Send text to the channel.

:param video_path:
:param caption: the caption of the video
:param text: The text to send.
"""
video_path = Path(video_path)
video_name = video_path.name
if not video_path.exists():
raise FileNotFoundError(f"Video file not found: {video_path}")

if caption is None:
caption = video_name

with open(video_path, "rb") as f:
video_file = f.read()

try:
await self.bot.send_video(
chat_id=self.channel_id,
video=video_file,
filename=video_name,
caption=caption,
read_timeout=6000,
write_timeout=6000,
pool_timeout=6000,
)
await self.bot.send_message(chat_id=self.channel_id, text=text)
except telegram.error.NetworkError as e:
logger.error(f"Network error: {e}, video path: {video_path}, video_caption: {caption}")
logger.error(f"Network error: {e}, text: {text}")
raise e
except Exception as e:
logger.error(f"Unknown Error sending video: {e}, video path: {video_path}, video_caption: {caption}")
logger.error(f"Unknown Error sending text: {e}, text: {text}")
2 changes: 1 addition & 1 deletion animepipeline/store/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TaskStatus(BaseModel):
done: bool = False
bt_downloaded_path: Optional[str] = None
finalrip_downloaded_path: Optional[str] = None
tg_uploaded: bool = False
tg_posted: bool = False
ex_status_dict: Optional[Dict[str, Any]] = None


Expand Down
2 changes: 1 addition & 1 deletion conf/params/default.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ffmpeg -i - -vcodec libx265 -crf 16
ffmpeg -i - -vcodec libx264 -preset ultrafast
Loading