-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
129 lines (104 loc) · 4.85 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import io
import time
import requests
from selenium import webdriver
from PIL import Image
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
class ImageScraper:
def __init__(self):
#config = settings.Config
self.driver = webdriver.Chrome(options=self.__get_default_chrome_options())
#self.driver = webdriver.Chrome(ChromeDriverManager().install(), options=self.__get_default_chrome_options())
#self.driver = webdriver.Chrome(executable_path=config.CHROME_PATH, options=self.__get_default_chrome_options())
def get_image_urls(self, query: str, max_urls: int, sleep_between_interactions: int = 1):
search_url = "https://www.google.com/search?safe=off&site=&tbm=isch&source=hp&q={q}&oq={q}&gs_l=img"
self.driver.get(search_url.format(q=query))
image_urls = set()
image_count = 0
results_start = 0
while image_count < max_urls:
self.__scroll_to_end(sleep_between_interactions)
thumbnail_results = self.driver.find_elements(By.CSS_SELECTOR, "img.Q4LuWd")
number_results = len(thumbnail_results)
print("Found: {0} search results. Extracting links from {1}:{0}".format(number_results, results_start))
for img in thumbnail_results[results_start:number_results]:
self.__click_and_wait(img, sleep_between_interactions)
self.__add_image_urls_to_set(image_urls)
image_count = len(image_urls)
if image_count >= max_urls:
print("Found: {} image links, done!".format(len(image_urls)))
break
else:
print("Found: {} image links, looking for more ...".format(len(image_urls)))
load_more_button = self.driver.find_element_by_css_selector(".mye4qd")
if load_more_button:
print("loading more...")
self.driver.execute_script("document.querySelector('.mye4qd').click();")
# move the result startpoint further down
results_start = len(thumbnail_results)
return image_urls
def get_in_memory_image(self, url: str, format: str):
image_content = self.__download_image_content(url)
try:
image_file = io.BytesIO(image_content)
pil_image = Image.open(image_file).convert('RGB')
in_mem_file = io.BytesIO()
pil_image.save(in_mem_file, format=format)
return in_mem_file.getvalue()
except Exception as e:
print("Could not get image data: {}".format(e))
def close_connection(self):
self.driver.quit()
def __download_image_content(self, url):
try:
return requests.get(url).content
except Exception as e:
print("ERROR - Could not download {} - {}".format(url, e))
def __scroll_to_end(self, sleep_time):
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(sleep_time)
def __click_and_wait(self, img, wait_time):
try:
img.click()
time.sleep(wait_time)
except Exception:
return
def __add_image_urls_to_set(self, image_urls: set):
actual_images = self.driver.find_elements(By.CSS_SELECTOR, 'img.n3VNCb')
for actual_image in actual_images:
if actual_image.get_attribute('src') and 'http' in actual_image.get_attribute('src'):
image_urls.add(actual_image.get_attribute('src'))
def __get_default_chrome_options(self):
chrome_options = webdriver.ChromeOptions()
chrome_options.add_experimental_option('excludeSwitches', ['enable-logging'])
# remove this line to see what the browser is doing
chrome_options.add_argument('--headless')
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_prefs = {}
chrome_options.experimental_options["prefs"] = chrome_prefs
chrome_prefs["profile.default_content_settings"] = {"images": 2}
return chrome_options
def handler(event, context=None):
scr = ImageScraper()
urls = scr.get_image_urls(query=event['query'], max_urls=event['count'], sleep_between_interactions=1)
files = []
for url in urls:
img_obj = scr.get_in_memory_image(url, 'jpeg')
files.append(img_obj)
print("Successfully loaded {} images and file names {}.".format(event['count'], files))
scr.close_connection()
return {
'statusCode': 200,
'body': files,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
}
}
def main():
event = { 'query': 'ant', 'count': 3 }
return handler(event)
if __name__ == '__main__':
main()