Skip to content

Commit

Permalink
feat(episode): implement concurrent URL fetching with rate limiting (#…
Browse files Browse the repository at this point in the history
…39)

* docs: add macOS installation instructions

* fix: handle currently airing anime episodes correctly

- Add IsAiring field to track airing status of anime
- Improve GetAnimeDataByID to fetch and handle airing status
- Fix completion logic for currently airing shows

* chore: Sprintf -> Errorf

* feat(episode): implement concurrent URL fetching with rate limiting

- Add parallel processing of source URLs with goroutines
- Implement rate limiting (50ms) to prevent API overload
- Add timeout handling (10s) for request completion
- Maintain result order using ordered collection
- Add error handling and partial result recovery
- Improve logging for better debugging
  • Loading branch information
justchokingaround authored Feb 15, 2025
1 parent 4e6d124 commit 4358e9d
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 33 deletions.
2 changes: 1 addition & 1 deletion internal/anilist.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func GetAnimeMapPreview(animeList AnimeList) map[string]RofiSelectPreview {
populateMap := func(entries []Entry) {
for _, entry := range entries {
// Only include entries with a non-empty English title
Log(fmt.Sprintf("AnimeNameLanguage: ", userCurdConfig.AnimeNameLanguage), logFile)
Log(fmt.Errorf("AnimeNameLanguage: %v", userCurdConfig.AnimeNameLanguage), logFile)
if entry.Media.Title.English != "" && userCurdConfig.AnimeNameLanguage == "english" {
animeMap[strconv.Itoa(entry.Media.ID)] = RofiSelectPreview{
Title: entry.Media.Title.English,
Expand Down
10 changes: 5 additions & 5 deletions internal/aniskip.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (

// skipTimesResponse struct to hold the response from the AniSkip API
type skipTimesResponse struct {
Found bool `json:"found"`
Results []skipResult `json:"results"`
Found bool `json:"found"`
Results []skipResult `json:"results"`
}

// skipResult struct to hold individual skip result data
Expand All @@ -32,7 +32,7 @@ func GetAniSkipData(animeMalId int, episode int) (string, error) {

resp, err := http.Get(url)
if err != nil {
Log(fmt.Sprintf("error fetching data from AniSkip API: %w", err), logFile)
Log(fmt.Errorf("error fetching data from AniSkip API: %w", err), logFile)
return "", fmt.Errorf("error fetching data from AniSkip API: %w", err)
}
defer resp.Body.Close()
Expand All @@ -44,7 +44,7 @@ func GetAniSkipData(animeMalId int, episode int) (string, error) {

body, err := io.ReadAll(resp.Body)
if err != nil {
Log(fmt.Sprintf("failed to read response body: %w", err), logFile)
Log(fmt.Errorf("failed to read response body %w", err), logFile)
return "", fmt.Errorf("failed to read response body %w", err)
}

Expand Down Expand Up @@ -100,4 +100,4 @@ func GetAndParseAniSkipData(animeMalId int, episode int, timePrecision int, anim
return err
}
return ParseAniSkipResponse(responseText, anime, timePrecision)
}
}
148 changes: 127 additions & 21 deletions internal/episode_url.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"net/url"
"regexp"
"strings"
"time"
"unicode"
)


type allanimeResponse struct {
Data struct {
Episode struct {
Expand Down Expand Up @@ -94,7 +94,7 @@ func extractLinks(provider_id string) map[string]interface{} {
return videoData
}

// Get anime episode url respective to given config
// Get anime episode url respective to given config
// If the link is found, it returns a list of links. Otherwise, it returns an error.
//
// Parameters:
Expand Down Expand Up @@ -145,35 +145,141 @@ func GetEpisodeURL(config CurdConfig, id string, epNo int) ([]string, error) {
return nil, err
}

responseStr := string(body)

// Unmarshal the JSON data into the struct
var response allanimeResponse
err = json.Unmarshal([]byte(responseStr), &response)
err = json.Unmarshal(body, &response)
if err != nil {
Log(fmt.Sprint("Error parsing JSON: ", err), logFile)
return nil, err
}

var allinks []string // This will be returned
type result struct {
index int
links []string
err error
}

// Iterate through the SourceUrls and print each URL
// Pre-count valid URLs and create slice to preserve order
validURLs := make([]string, 0)
for _, url := range response.Data.Episode.SourceUrls {
if len(url.SourceUrl) > 2 && unicode.IsDigit(rune(url.SourceUrl[2])) { // Source Url 3rd letter is a number (it stars as --32f23k31jk)
decodedProviderID := decodeProviderID(url.SourceUrl[2:]) // Decode the source url to get the provider id
extractedLinks := extractLinks(decodedProviderID) // Extract the links using provider id
if linksInterface, ok := extractedLinks["links"].([]interface{}); ok {
for _, linkInterface := range linksInterface {
if linkMap, ok := linkInterface.(map[string]interface{}); ok {
if link, ok := linkMap["link"].(string); ok {
allinks = append(allinks, link) // Add all extracted links into allinks
}
}
if len(url.SourceUrl) > 2 && unicode.IsDigit(rune(url.SourceUrl[2])) {
validURLs = append(validURLs, url.SourceUrl)
}
}

if len(validURLs) == 0 {
return nil, fmt.Errorf("no valid source URLs found in response")
}

// Create channels for results and a slice to store ordered results
results := make(chan result, len(validURLs))
orderedResults := make([][]string, len(validURLs))

// Create rate limiter
rateLimiter := time.NewTicker(50 * time.Millisecond)
defer rateLimiter.Stop()

// Launch goroutines
for i, sourceUrl := range validURLs {
go func(idx int, url string) {
<-rateLimiter.C // Rate limit the requests

decodedProviderID := decodeProviderID(url[2:])
Log(fmt.Sprintf("Processing URL %d/%d with provider ID: %s", idx+1, len(validURLs), decodedProviderID), logFile)

extractedLinks := extractLinks(decodedProviderID)

if extractedLinks == nil {
results <- result{
index: idx,
err: fmt.Errorf("failed to extract links for provider %s", decodedProviderID),
}
return
}

linksInterface, ok := extractedLinks["links"].([]interface{})
if !ok {
results <- result{
index: idx,
err: fmt.Errorf("links field is not []interface{} for provider %s", decodedProviderID),
}
return
}

var links []string
for _, linkInterface := range linksInterface {
linkMap, ok := linkInterface.(map[string]interface{})
if !ok {
Log(fmt.Sprintf("Warning: invalid link format for provider %s", decodedProviderID), logFile)
continue
}

link, ok := linkMap["link"].(string)
if !ok {
Log(fmt.Sprintf("Warning: link field is not string for provider %s", decodedProviderID), logFile)
continue
}

links = append(links, link)
}

results <- result{
index: idx,
links: links,
}
}(i, sourceUrl)
}

// Collect results with timeout
timeout := time.After(10 * time.Second)
var collectedErrors []error
successCount := 0

// Collect results maintaining order
for successCount < len(validURLs) {
select {
case res := <-results:
if res.err != nil {
Log(fmt.Sprintf("Error processing URL %d: %v", res.index+1, res.err), logFile)
collectedErrors = append(collectedErrors, fmt.Errorf("URL %d: %w", res.index+1, res.err))
} else {
Log("Links field is not of the expected type []interface{}", logFile)
orderedResults[res.index] = res.links
successCount++
Log(fmt.Sprintf("Successfully processed URL %d/%d", res.index+1, len(validURLs)), logFile)
}
case <-timeout:
if successCount > 0 {
Log(fmt.Sprintf("Timeout reached with %d/%d successful results", successCount, len(validURLs)), logFile)
// Flatten available results
return flattenResults(orderedResults), nil
}
return nil, fmt.Errorf("timeout waiting for results after %d successful responses", successCount)
}
}

return allinks, nil
}
// If we have any errors but also some successes, log errors but continue
if len(collectedErrors) > 0 {
Log(fmt.Sprintf("Completed with %d errors: %v", len(collectedErrors), collectedErrors), logFile)
}

// Flatten and return results
allLinks := flattenResults(orderedResults)
if len(allLinks) == 0 {
return nil, fmt.Errorf("no valid links found from %d URLs: %v", len(validURLs), collectedErrors)
}

return allLinks, nil
}

// converts the ordered slice of link slices into a single slice
func flattenResults(results [][]string) []string {
var totalLen int
for _, r := range results {
totalLen += len(r)
}

allLinks := make([]string, 0, totalLen)
for _, links := range results {
allLinks = append(allLinks, links...)
}
return allLinks
}
12 changes: 6 additions & 6 deletions internal/localTracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func LocalUpdateAnime(databaseFile string, anilistID int, allanimeID string, wat
CurdOut(fmt.Sprintf("Error writing record: %v", err))
}
}

return nil
}

Expand All @@ -259,7 +259,7 @@ func WatchUntracked(userCurdConfig *CurdConfig, logFile string) {
userInput, err := GetUserInputFromRofi("Enter the anime name")
if err != nil {
Log("Error getting user input: "+err.Error(), logFile)
ExitCurd(fmt.Errorf("Error getting user input: "+err.Error()))
ExitCurd(fmt.Errorf("Error getting user input: " + err.Error()))
}
query = userInput
} else {
Expand Down Expand Up @@ -298,7 +298,7 @@ func WatchUntracked(userCurdConfig *CurdConfig, logFile string) {
userInput, err := GetUserInputFromRofi("Enter the episode number")
if err != nil {
Log("Error getting episode number: "+err.Error(), logFile)
ExitCurd(fmt.Errorf("Error getting episode number: "+err.Error()))
ExitCurd(fmt.Errorf("Error getting episode number: " + err.Error()))
}
episodeNumber, err = strconv.Atoi(userInput)
if err != nil {
Expand All @@ -311,7 +311,7 @@ func WatchUntracked(userCurdConfig *CurdConfig, logFile string) {
}

anime.Ep.Number = episodeNumber

for {
// Get episode link
link, err := GetEpisodeURL(*userCurdConfig, anime.AllanimeId, anime.Ep.Number)
Expand All @@ -335,8 +335,8 @@ func WatchUntracked(userCurdConfig *CurdConfig, logFile string) {

anime.Ep.Player.SocketPath = mpvSocketPath
anime.Ep.Started = false
Log(fmt.Sprintf("Started mpvsocketpath ", anime.Ep.Player.SocketPath), logFile)

Log(fmt.Sprintf("Started mpv with socket path: %s", anime.Ep.Player.SocketPath), logFile)

// Get video duration
go func() {
Expand Down

0 comments on commit 4358e9d

Please sign in to comment.