2
2
from collections import Counter
3
3
import queue
4
4
import logging
5
- import requests
5
+ from pathlib import Path
6
6
7
7
from resources import constant
8
- from sparql_endpoint import SparqlEndpoint
8
+ from wikidata_endpoint import WikidataEndpoint
9
+ from wikidata_endpoint import WikidataEndpointConfiguration
9
10
10
11
11
12
class Relation :
@@ -67,7 +68,9 @@ def __init__(self, thread_id, work_queue):
67
68
self ._cluster_annotations = {"relations" : Counter ()}
68
69
69
70
self ._mysql_connection = constant .create_mysql_connection ()
70
- self ._sparql_endpoint = SparqlEndpoint (constant .WIKIDATA_API_URL )
71
+ self ._wikidata_endpoint_config = WikidataEndpointConfiguration (Path ("resources/wikidata_endpoint_config.ini" ))
72
+ self ._wikidata_endpoint = WikidataEndpoint (self ._wikidata_endpoint_config )
73
+ self ._chunk_size = constant .STANDARD_CHUNK_SIZE
71
74
72
75
def run (self ):
73
76
while self ._analyze_cluster ():
@@ -85,28 +88,33 @@ def _analyze_cluster(self):
85
88
return False
86
89
87
90
def _analyze_entities (self , cluster ):
88
- chunk_size = constant .STANDARD_CHUNK_SIZE
89
91
index = 0
90
92
metrics = RelationMetrics (len (cluster .entities ))
91
93
92
94
while index < len (cluster .entities ):
93
- chunk = cluster .entities [index :index + chunk_size ]
95
+ chunk = cluster .entities [index :index + self . _chunk_size ]
94
96
query = constant .named_entity_relations_sparql_query (chunk )
95
97
96
- try :
97
- logging . info (
98
- f"Executing SPARQL query for batch [ { index } , { index + len ( chunk ) } ] on Thread # { self . _thread_id } " )
99
- relations = [Relation .from_wikidata_record (record ) for record in self . _sparql_endpoint . query ( query )]
100
- logging . info (
101
- f"Finished executing SPARQL query on Thread # { self ._thread_id } " )
102
- ClusterAnnotator . _count_relations ( relations , metrics )
103
- index += len (chunk )
104
-
105
- except requests . exceptions . Timeout :
106
- chunk_size //= 2
107
- pass
98
+ logging . info (
99
+ f"Executing SPARQL query for batch [ { index } , { index + len ( chunk ) } ] on Thread # { self . _thread_id } " )
100
+ with self . _wikidata_endpoint . request () as request :
101
+ relations = [Relation .from_wikidata_record (record ) for record in
102
+ request . post ( query ,
103
+ on_timeout = self ._on_timeout_wikidata_endpoint ,
104
+ on_error = self . _on_error_wikidata_endpoint )]
105
+ index += self . _chunk_size if len (relations ) > 0 else 0
106
+ logging . info (
107
+ f"Finished executing SPARQL query on Thread # { self . _thread_id } " )
108
+ ClusterAnnotator . _count_relations ( relations , metrics )
109
+
108
110
ClusterAnnotator ._print_relations (cluster , metrics )
109
111
112
+ def _on_timeout_wikidata_endpoint (self , request ):
113
+ self ._chunk_size = int (self ._chunk_size * (3 / 4 ))
114
+
115
+ def _on_error_wikidata_endpoint (self , request , error ):
116
+ pass
117
+
110
118
@staticmethod
111
119
def _count_relations (relations , metrics ):
112
120
for relation in relations :
0 commit comments