Skip to content

Commit

Permalink
Merge pull request #23 from sean1832/0.3
Browse files Browse the repository at this point in the history
feat: implement pinterest search api calling
  • Loading branch information
sean1832 authored Nov 23, 2024
2 parents 18a9f9f + 0b13ac3 commit 4053beb
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 22 deletions.
4 changes: 2 additions & 2 deletions pinterest_dl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.2.0"
__version__ = "0.3.0"
__description__ = "An unofficial Pinterest image downloader"

from typing import Literal
Expand All @@ -12,7 +12,7 @@ class PinterestDL(_ScraperBase):
"""

@staticmethod
def with_api(timeout: float = 5, verbose: bool = False) -> "_ScraperAPI":
def with_api(timeout: float = 10, verbose: bool = False) -> "_ScraperAPI":
"""Scrape pinterest using unofficial API. This is faster than but may be less reliable.
Args:
Expand Down
40 changes: 39 additions & 1 deletion pinterest_dl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def get_parser() -> argparse.ArgumentParser:
scrape_cmd.add_argument("-c", "--cookies", type=str, help="Path to cookies file. Use this to scrape private boards.")
scrape_cmd.add_argument("-l", "--limit", type=int, default=100, help="Max number of image to scrape (default: 100)")
scrape_cmd.add_argument("-r", "--resolution", type=str, help="Minimum resolution to keep (e.g. 512x512).")
scrape_cmd.add_argument("--timeout", type=int, default=3, help="Timeout in seconds for requests (default: 3)")
scrape_cmd.add_argument("--timeout", type=int, default=10, help="Timeout in seconds for requests (default: 10)")
scrape_cmd.add_argument("--json", action="store_true", help="Write urls to json file")
scrape_cmd.add_argument("--verbose", action="store_true", help="Print verbose output")
scrape_cmd.add_argument("--dry-run", action="store_true", help="Run without download")
Expand All @@ -59,6 +59,22 @@ def get_parser() -> argparse.ArgumentParser:
scrape_cmd.add_argument("--incognito", action="store_true", help="Incognito mode (only for chrome/firefox)")
scrape_cmd.add_argument("--headful", action="store_true", help="Run in headful mode with browser window (only for chrome/firefox)")

# seach command
search_cmd = cmd.add_parser("search", help="Search images from Pinterest")
search_cmd.add_argument("query", help="Search query")
search_cmd.add_argument("output", help="Output directory")
search_cmd.add_argument("-c", "--cookies", type=str, help="Path to cookies file. Use this to scrape private boards.")
search_cmd.add_argument("-l", "--limit", type=int, default=100, help="Max number of image to scrape (default: 100)")
search_cmd.add_argument("-r", "--resolution", type=str, help="Minimum resolution to keep (e.g. 512x512).")
search_cmd.add_argument("--timeout", type=int, default=10, help="Timeout in seconds for requests (default: 10)")
search_cmd.add_argument("--json", action="store_true", help="Write urls to json file")
search_cmd.add_argument("--verbose", action="store_true", help="Print verbose output")
search_cmd.add_argument("--dry-run", action="store_true", help="Run without download")

search_cmd.add_argument("--client", default="api", choices=["api", "chrome", "firefox"], help="Client to use for scraping. Chrome/Firefox is slower but more reliable.")
search_cmd.add_argument("--incognito", action="store_true", help="Incognito mode (only for chrome/firefox)")
search_cmd.add_argument("--headful", action="store_true", help="Run in headful mode with browser window (only for chrome/firefox)")

# download command
download_cmd = cmd.add_parser("download", help="Download images")
download_cmd.add_argument("input", help="Input json file containing image urls")
Expand Down Expand Up @@ -139,6 +155,28 @@ def main() -> None:
add_captions=True,
)

print("\nDone.")
elif args.cmd == "search":
if args.client in ["chrome", "firefox"]:
raise NotImplementedError("Search is currently not available for browser clients.")
else:
if args.incognito or args.headful:
print(
"Warning: Incognito and headful mode is only available for Chrome/Firefox."
)

PinterestDL.with_api(timeout=args.timeout, verbose=args.verbose).with_cookies(
args.cookies
).search_and_download(
args.query,
args.output,
args.limit,
min_resolution=parse_resolution(args.resolution) if args.resolution else (0, 0),
json_output=construct_json_output(args.output) if args.json else None,
dry_run=args.dry_run,
add_captions=True,
)

print("\nDone.")
elif args.cmd == "download":
# prepare image url data
Expand Down
15 changes: 5 additions & 10 deletions pinterest_dl/data_model/pinterest_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import pyexiv2
from PIL import Image

from pinterest_dl.low_level.api.pinterest_response import PinResponse


class PinterestImage:
def __init__(
Expand Down Expand Up @@ -80,18 +78,15 @@ def from_dict(data: Dict[str, Any]) -> "PinterestImage":
return PinterestImage(data["src"], data["alt"], data["origin"], data["fallback_urls"])

@staticmethod
def from_response(response: PinResponse, resolution: Tuple[int, int]) -> List["PinterestImage"]:
data_raws = response.resource_response["data"]

if data_raws is None or not data_raws:
response.dump_at("no_data.json")
raise ValueError(f"No data found in response. {response.request_url}")
def from_response(response_data: list, resolution: Tuple[int, int]) -> List["PinterestImage"]:
if response_data is None or not response_data:
raise ValueError("No data found in response.")

if not isinstance(data_raws, list):
if not isinstance(response_data, list):
raise ValueError("Invalid response data")

images_data = []
for data_raw in data_raws:
for data_raw in response_data:
try:
image = data_raw["images"]["orig"]
if int(image["width"]) < resolution[0] or int(image["height"]) < resolution[1]:
Expand Down
4 changes: 4 additions & 0 deletions pinterest_dl/low_level/api/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ class Endpoint:

GET_BOARD_FEED_RESOURCE = f"{_BASE}/resource/BoardFeedResource/get/"
"""Get board feed. This can be used to get board images. Requires `board_id`."""

GET_SEARCH_RESOURCE = f"{_BASE}/resource/BaseSearchResource/get/"
"""Get search results. This can be used to search images based text queries."""

53 changes: 48 additions & 5 deletions pinterest_dl/low_level/api/pinterest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ class PinterestAPI:
)

def __init__(
self, url: str, cookies: Optional[PinterestCookieJar] = None, timeout: float = 5
self,
url: str,
cookies: Optional[PinterestCookieJar] = None,
timeout: float = 5,
) -> None:
"""Pinterest API client.
Expand All @@ -32,6 +35,7 @@ def __init__(
self.pin_id = self._parse_pin_id(self.url)
except ValueError:
self.pin_id = None
self.query = self._parse_search_query(self.url)

try:
self.username, self.boardname = self._parse_board_url(self.url)
Expand Down Expand Up @@ -130,10 +134,7 @@ def get_board(self) -> PinResponse:
return PinResponse(request_url, response_raw.json())

def get_board_feed(self, board_id: str, num: int, bookmark: List[str]) -> PinResponse:
if num < 1:
raise ValueError("Number of images must be greater than 0")
if num > 50:
raise ValueError("Number of images must not exceed 50 per request")
self._validate_num(num)

board_url = f"/{self.username}/{self.boardname}/"

Expand All @@ -160,6 +161,40 @@ def get_board_feed(self, board_id: str, num: int, bookmark: List[str]) -> PinRes

return PinResponse(request_url, response_raw.json())

def get_search(self, num: int, bookmark: List[str]) -> PinResponse:
if not self.query:
raise ValueError("Invalid Pinterest search URL")
self._validate_num(num)

source_url = f"/search/pins/?q={self.query}rs=typed"

endpoint = self.endpoint.GET_SEARCH_RESOURCE
options = {
"appliedProductFilters": "---",
"auto_correction_disabled": False,
"bookmarks": bookmark,
"page_size": num,
"query": self.query,
"redux_normalize_feed": True,
"rs": "typed", # is user typed or not
"scope": "pins",
"source_url": source_url,
}

try:
request_url = self._req_builder.build_get(endpoint, options, source_url)
response_raw = self._session.get(request_url, timeout=self.timeout)
except requests.exceptions.RequestException as e:
raise requests.RequestException(f"Failed to request search: {e}")

return PinResponse(request_url, response_raw.json())

def _validate_num(self, num: int) -> None:
if num < 1:
raise ValueError("Number of images must be greater than 0")
if num > 50:
raise ValueError("Number of images must not exceed 50 per request")

@staticmethod
def _get_default_cookies(url: str) -> dict:
try:
Expand All @@ -175,6 +210,14 @@ def _parse_pin_id(url: str) -> str:
raise ValueError(f"Invalid Pinterest URL: {url}")
return result.group(1)

@staticmethod
def _parse_search_query(url: str) -> str:
# /search/pins/?q={query}%26rs=typed
result = re.search(r"/search/pins/\?q=([A-Za-z0-9%]+)&rs=typed", url)
if not result:
raise ValueError(f"Invalid Pinterest search URL: {url}")
return result.group(1)

@staticmethod
def _parse_board_url(url: str) -> Tuple[str, str]:
"""Parse Pinterest board URL to username and boardname.
Expand Down
2 changes: 2 additions & 0 deletions pinterest_dl/low_level/ops/bookmark_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def clear(self) -> None:
self.bookmarks.clear()

def get(self) -> List[str]:
if len(self.bookmarks) < self.last:
return self.bookmarks
return self.bookmarks[self.last :]

def get_all(self) -> List[str]:
Expand Down
123 changes: 119 additions & 4 deletions pinterest_dl/scrapers/scraper_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,98 @@ def scrape_and_download(
downloaded_imgs = self.download_images(scraped_imgs, output_dir, self.verbose)

valid_indices = []


if add_captions:
self.add_captions(downloaded_imgs, valid_indices, self.verbose)

return downloaded_imgs

def search(
self,
query: str,
api: PinterestAPI,
limit: int,
min_resolution: Tuple[int, int],
delay: float,
bookmarks: BookmarkManager,
) -> List[PinterestImage]:
"""Scrape pins from a Pinterest search URL."""
images = []
remains = limit
batch_count = 0

with tqdm(total=limit, desc="Scraping Search", disable=self.verbose) as pbar:
while remains > 0:
batch_size = min(50, remains)
current_img_batch, bookmarks = self._search_images(
api, batch_size, bookmarks, min_resolution, query
)

images.extend(current_img_batch)
remains -= len(current_img_batch)
pbar.update(len(current_img_batch))

if "-end-" in bookmarks.get():
break

if self.verbose:
for img in current_img_batch:
print(f"[Batch {batch_count}] ({img.src})")
print(f"[Batch {batch_count}] bookmarks: {bookmarks.get()}")

time.sleep(delay)
remains = self._handle_missing_images(
api, batch_size, remains, bookmarks, min_resolution, images, pbar, delay
)
batch_count += 1

return images

def search_and_download(
self,
query: str,
output_dir: Union[str, Path],
limit: int,
min_resolution: Tuple[int, int] = (0, 0),
json_output: Optional[Union[str, Path]] = None,
dry_run: bool = False,
add_captions: bool = False,
) -> Optional[List[PinterestImage]]:
"""Search for images on Pinterest and download them.
Args:
url (str): Pinterest URL to scrape.
output_dir (Union[str, Path]): Directory to store downloaded images.
limit (int): Maximum number of images to scrape.
min_resolution (Tuple[int, int]): Minimum resolution for pruning. (width, height). (0, 0) to download all images.
json_output (Optional[Union[str, Path]]): Path to save scraped data as JSON.
dry_run (bool): Only scrape URLs without downloading images.
add_captions (bool): Add captions to downloaded images.
Returns:
Optional[List[PinterestImage]]: List of downloaded PinterestImage objects.
"""
url = f"https://www.pinterest.com/search/pins/?q={query}&rs=typed"

api = PinterestAPI(url, self.cookies, timeout=self.timeout)
bookmarks = BookmarkManager(2)

scraped_imgs = self.search(query, api, limit, min_resolution, 0.2, bookmarks)

if json_output:
output_path = Path(json_output)
imgs_dict = [img.to_dict() for img in scraped_imgs]
io.write_json(imgs_dict, output_path, indent=4)

if dry_run:
# if self.verbose:
# print("Scraped data (dry run):", imgs_dict)
return None

downloaded_imgs = self.download_images(scraped_imgs, output_dir, self.verbose)

valid_indices = []

if add_captions:
self.add_captions(downloaded_imgs, valid_indices, self.verbose)

Expand Down Expand Up @@ -151,7 +242,8 @@ def _scrape_pins(

if "-end-" in bookmarks.get():
break

if self.verbose:
print(f"bookmarks: {bookmarks.get()}")
time.sleep(delay)
remains = self._handle_missing_images(
api, batch_size, remains, bookmarks, min_resolution, images, pbar, delay
Expand Down Expand Up @@ -221,7 +313,29 @@ def _get_images(
if not board_id
else api.get_board_feed(board_id, batch_size, bookmarks.get())
)
current_img_batch = PinterestImage.from_response(response, min_resolution)

# parse response data
response_data = response.resource_response.get("data", [])

current_img_batch = PinterestImage.from_response(response_data, min_resolution)
bookmarks.add_all(response.get_bookmarks())
return current_img_batch, bookmarks

def _search_images(
self,
api: PinterestAPI,
batch_size: int,
bookmarks: BookmarkManager,
min_resolution: Tuple[int, int],
query: str,
) -> Tuple[List[PinterestImage], BookmarkManager]:
"""Fetch images based on API response, either from a pin or a board."""
response = api.get_search(batch_size, bookmarks.get())

# parse response data
response_data = response.resource_response.get("data", {}).get("results", [])

current_img_batch = PinterestImage.from_response(response_data, min_resolution)
bookmarks.add_all(response.get_bookmarks())
return current_img_batch, bookmarks

Expand All @@ -245,7 +359,8 @@ def _handle_missing_images(
if not board_id
else api.get_board_feed(board_id, difference, bookmarks.get())
)
additional_images = PinterestImage.from_response(next_response, min_resolution)
next_response_data = next_response.resource_response.get("data", [])
additional_images = PinterestImage.from_response(next_response_data, min_resolution)
images.extend(additional_images)
bookmarks.add_all(next_response.get_bookmarks())
remains -= len(additional_images)
Expand Down

0 comments on commit 4053beb

Please sign in to comment.