1
1
import asyncio
2
2
from pathlib import Path
3
- from typing import Any , Callable , Coroutine , List
3
+ from typing import Any , Callable , Coroutine , List , Optional
4
4
5
5
from loguru import logger
6
+ from pydantic import DirectoryPath
6
7
7
8
from animepipeline .bt import QBittorrentManager
8
9
from animepipeline .config import NyaaConfig , RSSConfig , ServerConfig
15
16
16
17
17
18
class TaskInfo (TorrentInfo ):
19
+ download_path : DirectoryPath
18
20
uploader : str
19
- script : str
20
- param : str
21
+ script_content : str
22
+ param_content : str
23
+ slice : Optional [bool ] = True
24
+ timeout : Optional [int ] = 20
21
25
22
26
23
- def build_task_info (torrent_info : TorrentInfo , nyaa_config : NyaaConfig , rss_config : RSSConfig ) -> TaskInfo :
27
+ def build_task_info (
28
+ torrent_info : TorrentInfo , nyaa_config : NyaaConfig , rss_config : RSSConfig , server_config : ServerConfig
29
+ ) -> TaskInfo :
24
30
"""
25
31
Build TaskInfo from TorrentInfo, NyaaConfig and RSSConfig
26
32
27
33
:param torrent_info: TorrentInfo
28
34
:param nyaa_config: NyaaConfig
29
35
:param rss_config: RSSConfig
36
+ :param server_config: ServerConfig
30
37
:return: TaskInfo
31
38
"""
32
39
if nyaa_config .script not in rss_config .scripts :
33
40
raise ValueError (f"script not found: { nyaa_config .script } " )
34
41
if nyaa_config .param not in rss_config .params :
35
42
raise ValueError (f"param not found: { nyaa_config .param } " )
36
43
37
- script = rss_config .scripts [nyaa_config .script ]
38
- param = rss_config .params [nyaa_config .param ]
44
+ script_content = rss_config .scripts [nyaa_config .script ]
45
+ param_content = rss_config .params [nyaa_config .param ]
39
46
40
47
return TaskInfo (
41
48
** torrent_info .model_dump (),
49
+ download_path = server_config .qbittorrent .download_path ,
42
50
uploader = nyaa_config .uploader ,
43
- script = script ,
44
- param = param ,
51
+ script_content = script_content ,
52
+ param_content = param_content ,
53
+ slice = nyaa_config .slice ,
54
+ timeout = nyaa_config .timeout ,
45
55
)
46
56
47
57
@@ -94,7 +104,12 @@ async def start(self) -> None:
94
104
torrent_info_list = parse_nyaa (cfg )
95
105
96
106
for torrent_info in torrent_info_list :
97
- task_info = build_task_info (torrent_info , cfg , self .rss_config )
107
+ task_info = build_task_info (
108
+ torrent_info = torrent_info ,
109
+ nyaa_config = cfg ,
110
+ rss_config = self .rss_config ,
111
+ server_config = self .server_config ,
112
+ )
98
113
99
114
await self .task_executor .submit_task (torrent_info .hash , self .pipeline , task_info )
100
115
@@ -107,7 +122,7 @@ def add_pipeline_task(self) -> None:
107
122
"""
108
123
self .pipeline_tasks .append (self .pipeline_bt )
109
124
self .pipeline_tasks .append (self .pipeline_finalrip )
110
- self .pipeline_tasks .append (self .pipeline_tg )
125
+ self .pipeline_tasks .append (self .pipeline_post )
111
126
112
127
async def pipeline (self , task_info : TaskInfo ) -> None :
113
128
# init task status
@@ -168,7 +183,7 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None:
168
183
logger .info (f'Start FinalRip Encode for "{ task_info .name } " EP { task_info .episode } ' )
169
184
# start finalrip task
170
185
171
- bt_downloaded_path = Path (task_status .bt_downloaded_path )
186
+ bt_downloaded_path = Path (task_info . download_path ) / task_status .bt_downloaded_path
172
187
173
188
while not await self .finalrip_client .check_task_exist (bt_downloaded_path .name ):
174
189
try :
@@ -181,33 +196,21 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None:
181
196
try :
182
197
await self .finalrip_client .start_task (
183
198
video_key = bt_downloaded_path .name ,
184
- encode_param = task_info .param ,
185
- script = task_info .script ,
199
+ encode_param = task_info .param_content ,
200
+ script = task_info .script_content ,
201
+ slice = task_info .slice ,
202
+ timeout = task_info .timeout ,
186
203
)
187
204
logger .info (f'FinalRip Task Started for "{ task_info .name } " EP { task_info .episode } ' )
188
205
except Exception as e :
189
206
logger .error (f"Failed to start finalrip task: { e } " )
190
207
191
208
# wait video cut done
192
- await asyncio .sleep (10 )
209
+ await asyncio .sleep (30 )
193
210
194
211
# check task progress
195
212
while not await self .finalrip_client .check_task_completed (bt_downloaded_path .name ):
196
- # retry merge if all clips are done but merge failed?
197
- if await self .finalrip_client .check_task_all_clips_done (bt_downloaded_path .name ):
198
- # wait 30s before retry merge
199
- await asyncio .sleep (30 )
200
- # check again
201
- if await self .finalrip_client .check_task_completed (bt_downloaded_path .name ):
202
- break
203
-
204
- try :
205
- await self .finalrip_client .retry_merge (bt_downloaded_path .name )
206
- logger .info (f'Retry Merge Clips for "{ task_info .name } " EP { task_info .episode } ' )
207
- except Exception as e :
208
- logger .error (f'Failed to retry merge clips for "{ task_info .name } " EP { task_info .episode } : { e } ' )
209
-
210
- await asyncio .sleep (10 )
213
+ await asyncio .sleep (30 )
211
214
212
215
# download temp file to bt_downloaded_path's parent directory
213
216
temp_saved_path : Path = bt_downloaded_path .parent / (bt_downloaded_path .name + "-encoded.mkv" )
@@ -220,7 +223,7 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None:
220
223
path = temp_saved_path , episode = task_info .episode , name = task_info .name , uploader = task_info .uploader
221
224
)
222
225
)
223
- finalrip_downloaded_path = bt_downloaded_path . parent / gen_name
226
+ finalrip_downloaded_path = Path ( task_info . download_path ) / gen_name
224
227
except Exception as e :
225
228
logger .error (f"Failed to generate file name: { e } " )
226
229
raise e
@@ -234,32 +237,31 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None:
234
237
logger .info (f'FinalRip Encode Done for "{ finalrip_downloaded_path .name } "' )
235
238
236
239
# update task status
237
- task_status .finalrip_downloaded_path = str ( finalrip_downloaded_path )
240
+ task_status .finalrip_downloaded_path = gen_name
238
241
await self .json_store .update_task (task_info .hash , task_status )
239
242
240
- async def pipeline_tg (self , task_info : TaskInfo ) -> None :
243
+ async def pipeline_post (self , task_info : TaskInfo ) -> None :
241
244
task_status = await self .json_store .get_task (task_info .hash )
242
245
243
246
if self .tg_channel_sender is None :
244
247
logger .info ("Telegram Channel Sender is not enabled. Skip upload." )
245
248
return
246
249
247
250
# check tg
248
- if task_status .tg_uploaded :
251
+ if task_status .tg_posted :
249
252
return
250
253
251
254
if task_status .finalrip_downloaded_path is None :
252
255
logger .error ("FinalRip download path is None! finalrip download task not finished?" )
253
256
raise ValueError ("FinalRip download path is None! finalrip download task not finished?" )
254
257
255
- logger .info (f'Start Telegram Channel Upload for "{ task_info .name } " EP { task_info .episode } ' )
258
+ logger .info (f'Post to Telegram Channel for "{ task_info .name } " EP { task_info .episode } ' )
256
259
257
- finalrip_downloaded_path = Path (task_status .finalrip_downloaded_path )
260
+ finalrip_downloaded_path = Path (task_info . download_path ) / task_status .finalrip_downloaded_path
258
261
259
- await self .tg_channel_sender .send_video (
260
- video_path = finalrip_downloaded_path ,
261
- caption = f"{ task_info .translation } | EP { task_info .episode } | { finalrip_downloaded_path .name } " ,
262
+ await self .tg_channel_sender .send_text (
263
+ text = f"{ task_info .translation } | EP { task_info .episode } | { finalrip_downloaded_path .name } " ,
262
264
)
263
265
264
- task_status .tg_uploaded = True
266
+ task_status .tg_posted = True
265
267
await self .json_store .update_task (task_info .hash , task_status )
0 commit comments