-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils_text.py
749 lines (602 loc) · 23.3 KB
/
utils_text.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
import inspect
import logging
import math
import re
import traceback
from urllib.parse import unquote, urlparse
import goose3
import lxml
import lxml.etree
from dateutil.tz import tzutc
from goose3 import Goose
from goose3.crawler import Crawler
from goose3.extractors.publishdate import TIMEZONE_INFO
from goose3.text import get_encodings_from_content
import config
import utils_text
from multiple_tlds import is_multiple_tlds
# import trafilatura # never use; ← it has a dependency conflict with another package over the required version of `charset-normalizer`
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
CHARS_IN_DOMAINS_BREAK_BEFORE = "."
CHARS_IN_DOMAINS_BREAK_AFTER = "-"
# NONBREAKING_HYPHEN = config.settings["SYMBOLS"]["NONBREAKING_HYPHEN"]
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
EMPTY_STRING = ""
# def monkeypatched_publish_date_to_utc(self):
# try:
# publish_datetime = dateutil.parser.parse(
# self.article.publish_date, tzinfos=TIMEZONE_INFO
# )
# if publish_datetime.tzinfo:
# return publish_datetime.astimezone(tzutc())
# else:
# return publish_datetime
# except (ValueError, OverflowError):
# logger.warning(
# f"Publish date {self.article.publish_date} could not be resolved to UTC (monkeypatched_publish_date_to_utc)"
# )
# return None
# Crawler._publish_date_to_utc = monkeypatched_publish_date_to_utc
def add_singular_plural(number, unit, force_int=False):
if force_int:
if number == 0 or number == 0.0:
return f"zero {unit}s"
elif number == 1 or number == 1.0:
return f"1 {unit}"
else:
return f"{int(math.ceil(number))} {unit}s"
if number == 0 or number == 0.0:
return f"0 {unit}s"
elif number == 1 or number == 1.0:
return f"1 {unit}"
else:
x = ""
if unit in ["hour"]:
x = f"{get_frac(number, 'fourths')} {unit}s"
elif unit in ["day", "week", "month", "year"]:
x = f"{get_frac(number, 'halves')} {unit}s"
else:
x = f"{int(math.ceil(number))} {unit}s"
if x[:2] == "1 ":
return x[:-1]
else:
return x
def create_domains_slug(hostname_dict: str, log_prefix=""):
if not hostname_dict["minus_www"]:
return
log_prefix_local = log_prefix + "create_domains_slug: "
# TODO: try using urlparse and netloc to extract domain, and see which is faster/more accurate. maybe log when results of both methods differ
domains_lowercase = hostname_dict["minus_www"].lower()
domains_all_as_list = domains_lowercase.split(".")
if len(domains_all_as_list) < 2:
return
domains_for_hn_search = f"{domains_all_as_list[-2]}.{domains_all_as_list[-1]}"
index = -2
while (len(domains_all_as_list) + index > 0) and is_multiple_tlds(
domains_for_hn_search
):
index -= 1
domains_for_hn_search = domains_all_as_list[index] + "." + domains_for_hn_search
if index <= -4:
logger.info(
log_prefix_local
+ f"very long domain name constructed for search: {domains_for_hn_search} ~Tim~"
)
domains_for_search_as_list = domains_for_hn_search.split(".")
longest_domain_component = len(max(domains_for_search_as_list, key=len))
# entire hostname is short enough to stay on one line
for_display_addl_class = ""
CHAR_LENGTH_FOR_LINEWRAPPING_DOMAINS = config.settings["SLUGS"][
"CHAR_LENGTH_FOR_LINEWRAPPING_DOMAINS"
]
if len(domains_for_hn_search) < CHAR_LENGTH_FOR_LINEWRAPPING_DOMAINS:
domains_for_display = domains_for_hn_search
for_display_addl_class = " nowrap"
# hostname is composed of 3 domains, so group the secondary domain with the shorter of the other components
elif domains_for_hn_search.count(".") == 2 and longest_domain_component <= (
CHAR_LENGTH_FOR_LINEWRAPPING_DOMAINS - 4
):
if domains_for_search_as_list[0] < domains_for_search_as_list[2]:
domains_for_display = f"{domains_for_search_as_list[0]}.{domains_for_search_as_list[1]}​.{domains_for_search_as_list[2]}"
else:
domains_for_display = f"{domains_for_search_as_list[0]}​.{domains_for_search_as_list[1]}.{domains_for_search_as_list[2]}"
# insert ZeroWidthSpaces in long domains of hostname to aid in line breaking
else:
domains_for_search_as_list = split_domain_on_chars(domains_for_hn_search)
for i in range(len(domains_for_search_as_list)):
# skip over periods and hyphens in the list
if "​" in domains_for_search_as_list[i]:
continue
# if this component is too long...
if (
len(domains_for_search_as_list[i])
>= CHAR_LENGTH_FOR_LINEWRAPPING_DOMAINS
):
orig = domains_for_search_as_list[i]
logger.info(
log_prefix_local
+ f"long domain component {orig} ({len(orig)} chars) in url {hostname_dict['full']}"
)
parts = []
while len(orig) >= CHAR_LENGTH_FOR_LINEWRAPPING_DOMAINS:
if len(orig) < int(CHAR_LENGTH_FOR_LINEWRAPPING_DOMAINS * 1.5):
index_after_split = len(orig) // 2
parts.append(f"{orig[:index_after_split]}​")
parts.append(orig[index_after_split:])
orig = ""
else:
DOUBLE_OBLIQUE_HYPHEN = config.settings["SYMBOLS"][
"DOUBLE_OBLIQUE_HYPHEN"
]
parts.append(
f"{orig[:CHAR_LENGTH_FOR_LINEWRAPPING_DOMAINS]}{DOUBLE_OBLIQUE_HYPHEN}​"
)
orig = orig[CHAR_LENGTH_FOR_LINEWRAPPING_DOMAINS:]
if orig:
parts.append(orig)
domains_for_search_as_list[i] = "".join(parts)
domains_for_display = "".join(domains_for_search_as_list)
hostname_slug = f"<a class='domains-for-search{for_display_addl_class}' href='https://news.ycombinator.com/from?site={domains_for_hn_search}'>({domains_for_display})</a>"
hostname_dict["for_hn_search"] = domains_for_hn_search
hostname_dict["for_display"] = domains_for_display
hostname_dict["for_display_addl_class"] = for_display_addl_class
hostname_dict["slug"] = hostname_slug
def get_frac(number, precision):
fractions_halves = {
0.0: "",
0.5: "½",
}
fractions_fourths = {0.0: "", 0.25: "¼", 0.5: "½", 0.75: "¾"}
fractions_to_use = None
if precision == "halves":
fractions_to_use = fractions_halves
elif precision == "fourths":
fractions_to_use = fractions_fourths
else:
fractions_to_use = fractions_halves
whole_part = int(number)
frac_part = number - whole_part
min_diff = 1
best_fraction = 0
for each_fraction in fractions_to_use.keys():
cur_diff = abs(frac_part - each_fraction)
if cur_diff < min_diff:
min_diff = cur_diff
best_fraction = each_fraction
return f"{whole_part}{fractions_to_use[best_fraction]}"
def get_domains_from_url_via_urllib(url: str, log_prefix=""):
log_prefix_local = log_prefix + "get_domains_from_url_via_urllib: "
if not url:
return None
parsed_url = urlparse(url)
hostname_full = parsed_url.netloc
if hostname_full.endswith(":443"):
hostname_full = hostname_full[:-4]
logger.info(
log_prefix_local + f"removed ':443' from end of hostname_full for url {url}"
)
match = re.search(r"^w{2,3}\d?\.", hostname_full)
if match and len(match.group()) >= 3:
hostname_minus_www = hostname_full.replace(match.group(), "", 1)
else:
hostname_minus_www = hostname_full
return hostname_full, hostname_minus_www
def get_domains_from_url(url: str, log_prefix=""):
if not url:
return None, None
log_prefix_local = log_prefix + "get_domains_from_url: "
original_url = url
# lowercase scheme, if present
if "://" in url:
orig_scheme = url.split("://")[0]
new_scheme = orig_scheme.lower()
url = url.replace(orig_scheme, new_scheme, 1)
# remove scheme
match = re.match(r"^https?://", url)
if match:
url = url.replace(match.group(), "", 1)
# url decode, if applicable
url = unquote(url)
# remove:
# - first forward slash and on
# - query marker (i.e., question mark) and on
# - port number symbol and on
# - URI fragment indicator and on
special_characters = ["/", "?", ":", "#"]
for each in special_characters:
url = url.split(each)[0]
hostname_full = url
# remove the www or r"[w]{2,3}\d" subdomain, if present
if url.startswith("www."):
hostname_minus_www = hostname_full[4:]
else:
match = re.match(r"w{2,3}\d\.", hostname_full)
if match:
hostname_minus_www = hostname_full.replace(match.group(), "", 1)
else:
hostname_minus_www = hostname_full
hostname_full_via_urllib, hostname_minus_www_via_urllib = (
get_domains_from_url_via_urllib(url=original_url, log_prefix=log_prefix)
)
if hostname_full_via_urllib != hostname_full:
logger.info(
log_prefix_local + f"{hostname_full=}, {hostname_full_via_urllib=} ~Tim~"
)
if hostname_minus_www_via_urllib != hostname_minus_www:
logger.info(
log_prefix_local
+ f"{hostname_minus_www=}, {hostname_minus_www_via_urllib=} ~Tim~"
)
return hostname_full, hostname_minus_www
def get_filename_details_from_url(full_url):
if not full_url:
return None
# delete rightmost question mark and everything after it
end_index = full_url.rfind("?")
if end_index != -1:
full_url = full_url[0:end_index]
# delete rightmost percent-encoded question mark and everything after it
end_index = full_url.rfind("%3F")
if end_index != -1:
full_url = full_url[0:end_index]
# # delete rightmost open square bracket and everything after it
# end_index = full_url.rfind("[")
# if end_index != -1:
# full_url = full_url[0:end_index]
# # delete rightmost percent-encoded open square bracket and everything after it
# end_index = full_url.rfind("%5B")
# if end_index != -1:
# full_url = full_url[0:end_index]
# delete leftmost forward slash and everything before it
start_index = full_url.rfind("/")
if start_index != -1:
full_url = full_url[(start_index + 1) :]
# delete leftmost percent-encoded forward slash and everything before it
start_index = full_url.rfind("%2F")
if start_index != -1:
full_url = full_url[(start_index + 1) :]
# filename is whatever is left of the original URL after the preceding deletions
filename = full_url
# try to determine file extension
last_dot_index = filename.rfind(".")
if last_dot_index == -1:
basename = filename
extension = ""
else:
basename = filename[:last_dot_index]
extension = filename[(last_dot_index + 1) :]
# bundle up our results
filename_details = {
"filename": filename,
"base_name": basename,
"file_extension": extension,
}
return filename_details
def get_reading_time_via_goose(page_source=None, log_prefix=""):
log_prefix += "grt_via_g: "
try:
if not page_source:
logger.error(log_prefix + "page_source required")
return None
reading_time = None
g = Goose()
try:
article = g.extract(raw_html=page_source)
except lxml.etree.ParserError as exc:
logger.error(log_prefix + f"lxml.etree.ParserError: {exc}")
return None
if article:
reading_time = (
utils_text.word_count(article.cleaned_text)
// config.reading_speed_words_per_minute
)
if reading_time:
reading_time = max(reading_time, 1)
logger.info(
log_prefix
+ f"{utils_text.add_singular_plural(reading_time, 'minute', force_int=True)}"
)
return reading_time
else:
logger.info(log_prefix + "could not determine reading time")
return None
except Exception as exc:
short_exc_name = exc.__class__.__name__
exc_name = exc.__class__.__module__ + "." + short_exc_name
exc_msg = str(exc)
exc_slug = f"{exc_name}: {exc_msg}"
logger.error(log_prefix + "unexpected exception: " + exc_slug)
tb_str = traceback.format_exc()
logger.error(log_prefix + tb_str)
return None
get_reading_time = get_reading_time_via_goose
def get_text_between(
left_pattern: str,
right_pattern: str,
text: str,
okay_to_elide_right_pattern=False,
force_lowercase=False,
):
left_index = text.find(left_pattern)
if left_index == -1:
return None
right_index = text.find(
right_pattern, left_index + len(left_pattern)
) # note: lazy find
if right_index == -1:
if okay_to_elide_right_pattern:
return text[slice(left_index + len(left_pattern), len(text))]
else:
return None
# check for zero-length string between left_pattern and right_pattern
if left_index + len(left_pattern) == right_index:
return config.EMPTY_STRING
result = text[slice(left_index + len(left_pattern), right_index)]
if not result:
return None
if force_lowercase:
return result.lower()
else:
return result
def insert_possible_line_breaks(orig_title):
words_by_spaces = orig_title.split(" ")
# we will break each "word" down further, if possible or necessary
break_after_these = "/-"
break_before_these = "\\"
LINE_BREAK_HYPHEN = "⸗"
for i in range(len(words_by_spaces)):
intraword_tokens = []
cur_word = ""
for char in words_by_spaces[i]:
if char in break_after_these:
intraword_tokens.append(cur_word)
cur_word = char
cur_word += "​"
intraword_tokens.append(cur_word)
cur_word = ""
elif char in break_before_these:
intraword_tokens.append(cur_word)
cur_word = "​"
cur_word += char
intraword_tokens.append(cur_word)
cur_word = ""
else:
cur_word += char
if cur_word:
intraword_tokens.append(cur_word)
for j in range(len(intraword_tokens)):
if "​" in intraword_tokens[j]:
continue
# check if hyphenation is needed
MAX_ALLOWED_WORD_LENGTH = config.settings["SLUGS"]["MAX_SUBSTRING_LENGTH"]
if len(intraword_tokens[j]) > MAX_ALLOWED_WORD_LENGTH:
t = intraword_tokens[j]
t_conv = []
while len(t) >= MAX_ALLOWED_WORD_LENGTH:
if len(t) <= 1.5 * MAX_ALLOWED_WORD_LENGTH:
len_to_use = len(t) // 2
t_conv.append(t[slice(len_to_use)])
t_conv.append(f"{LINE_BREAK_HYPHEN}​")
t_conv.append(t[slice(len_to_use, len(t))])
# t_conv.append("-​")
t = ""
elif len(t) <= 2 * MAX_ALLOWED_WORD_LENGTH:
len_to_use = len(t) // 3
twice_len_to_use = int(2 * len_to_use)
t_conv.append(t[slice(len_to_use)])
t_conv.append(f"{LINE_BREAK_HYPHEN}​")
t_conv.append(t[slice(len_to_use, twice_len_to_use)])
t_conv.append(f"{LINE_BREAK_HYPHEN}​")
t_conv.append(t[slice(twice_len_to_use, len(t))])
# t_conv.append("-​")
t = ""
else:
t_conv.append(t[:MAX_ALLOWED_WORD_LENGTH])
t_conv.append(f"{LINE_BREAK_HYPHEN}​")
t = t[24:]
intraword_tokens[j] = "".join(t_conv)
words_by_spaces[i] = "".join(intraword_tokens)
return " ".join(words_by_spaces)
def parse_content_type_from_raw_header(
content_type_header: str, log_prefix="", context=None
):
log_prefix_local = log_prefix + "parse_content_type_from_raw_header: "
if not content_type_header:
return None
if context:
if "url" in context:
url_slug = f"for url={context['url']} "
else:
url_slug = ""
if "response_object_creator" in context:
response_object_creator_slug = f"via {context['response_object_creator']} "
else:
response_object_creator_slug = ""
if isinstance(content_type_header, list) or "," in content_type_header:
logger.info(
log_prefix_local
+ f"interesting {content_type_header=}, {type(content_type_header)=} {response_object_creator_slug}{url_slug}~Tim~"
)
ct_set = set()
if isinstance(content_type_header, list):
for each in content_type_header:
ct_set.update(re.split("[;, ]+", each))
elif isinstance(content_type_header, str):
ct_set.update(re.split("[;, ]+", content_type_header))
else:
logger.info(
log_prefix_local
+ f"unexpected type={str(type(content_type_header))} for content_type_header={str(content_type_header)} {response_object_creator_slug}{url_slug}~Tim~"
)
ct_set = {x.strip() for x in ct_set}
ct_set = {x for x in ct_set if x}
for each in ct_set.copy():
if "/" in each:
pass
elif each.startswith("charset"):
ct_set.remove(each)
else:
ct_set.remove(each)
if len(ct_set) == 1:
srct = ct_set.pop().lower()
logger.info(log_prefix_local + f"srct='{srct}' {url_slug}")
return srct
elif not ct_set:
logger.info(
log_prefix_local + f"no content-type found in http header {url_slug}~Tim~"
)
return None
else:
logger.info(
log_prefix_local
+ f"multiple content-types found in http header {ct_set=} {url_slug}~Tim~"
)
return ct_set.pop().lower()
def sanitize(s: str):
s = s.lower()
allowed_chars = "abcdefghijklmnopqrstuvwxyz 0123456789"
sanitized = ""
for char in s:
if char in allowed_chars:
sanitized += char
sanitized = re.sub(r"\s+", " ", sanitized)
return sanitized.strip()
def split_domain_on_chars(domain_string: str):
result = []
cur_part = ""
for char in domain_string:
if char in CHARS_IN_DOMAINS_BREAK_BEFORE:
result.append(cur_part)
result.append(f"​{char}")
cur_part = ""
elif char in CHARS_IN_DOMAINS_BREAK_AFTER:
result.append(cur_part)
result.append(f"{char}​")
cur_part = ""
else:
cur_part += char
if cur_part:
result.append(cur_part)
return result
def word_count(text):
return len(text.split(" "))
def heal_localhost_url(localhost_url: str, real_hostname: str):
if not localhost_url:
return None
parsed_url = urlparse(localhost_url)
if parsed_url.scheme:
scheme_part = unquote(parsed_url.scheme)
if scheme_part:
scheme_part = scheme_part + "://"
else:
scheme_part = ""
else:
scheme_part = ""
if parsed_url.hostname:
hostname_part = unquote(parsed_url.hostname)
if not hostname_part:
hostname_part = ""
else:
hostname_part = ""
port_number = parsed_url.port
if port_number:
port_number = ":" + str(port_number)
else:
port_number = ""
if parsed_url.path:
path_part = unquote(parsed_url.path)
if not path_part:
path_part = "/"
else:
path_part = "/"
if parsed_url.params:
params_part = unquote(parsed_url.params)
if params_part:
params_part = ";" + params_part
else:
params_part = ""
else:
params_part = ""
if parsed_url.query:
query_part = unquote(parsed_url.query)
if query_part:
query_part = "?" + query_part
else:
query_part = ""
else:
query_part = ""
if parsed_url.fragment:
fragment_part = unquote(parsed_url.fragment)
if fragment_part:
fragment_part = "#" + fragment_part
else:
fragment_part = ""
else:
fragment_part = ""
https_scheme_string = "https://"
possibly_healed_url = (
https_scheme_string
+ real_hostname
+ path_part
+ params_part
+ query_part
+ fragment_part
)
return possibly_healed_url
# eliminate some goose3 errors I occasionally get
def monkeypatched_extract_3_1_19(self):
return {
"description": self.get_meta_description(),
"keywords": self.get_meta_keywords(),
"lang": self.get_meta_lang(),
"favicon": self.get_favicon(),
"canonical": "www.example.com",
"domain": "example.com",
"encoding": self.get_meta_encoding(),
}
# def get_meta_encoding(self):
# """Parse the meta encoding"""
# encoding = get_encodings_from_content(self.article.raw_html)
# return encoding[0] if encoding else None
def monkeypatched_get_meta_encoding_3_1_19(self):
"""Parse the meta encoding"""
encoding = get_encodings_from_content(self.article.raw_html)
# replace every occurrence of "null" with "utf-8" in 'encoding'
disallowed_encodings = ["", "none", "null"]
encoding = [x if x not in disallowed_encodings else "utf-8" for x in encoding]
if encoding:
res = encoding[0]
else:
res = None
for each in [
"",
"none",
"null",
]:
if res == each:
logger.info(
f"monkeypatched_get_meta_encoding_3_1_19: defaulting to 'utf-8' for {self.article.final_url} since original was '{each}' ~Tim~"
)
res = "utf-8"
return res
metas_extractor = goose3.extractors.metas.MetasExtractor
metas_extractor.extract = monkeypatched_extract_3_1_19
metas_extractor.get_meta_encoding = monkeypatched_get_meta_encoding_3_1_19
if __name__ == "__main__":
try:
res = heal_localhost_url(
localhost_url="http://xxxxxx:3000/aardvark/bear.jpeg;sorrowful?a=apple&b=banana#carrot",
real_hostname="shotune.com",
)
print(res)
except Exception as exc:
exc_module = exc.__class__.__module__
exc_short_name = exc.__class__.__name__
exc_name = exc_module + "." + exc_short_name
exc_msg = str(exc)
exc_slug = exc_name + ": " + exc_msg
logger.error("unexpected exception: " + exc_slug)
tb_str = traceback.format_exc()
logger.error(tb_str)