7
7
import signal
8
8
import sys
9
9
import logging
10
- import yaml
11
10
import textract
12
11
import functools
13
12
import progressbar
16
15
import io
17
16
from fusearch .index import Index
18
17
from fusearch .model import Document
19
- from tokenizer import get_tokenizer , tokfreq , Tokenizer
18
+ from fusearch .tokenizer import get_tokenizer , tokfreq , Tokenizer
19
+ from fusearch .util import bytes_to_str , file_generator_ext , filename_without_extension , mtime , pickle_loader
20
+ from fusearch .config import Config
20
21
from multiprocessing import Process , Queue , cpu_count
21
- import queue
22
22
import collections .abc
23
- from util import *
24
- from config import Config
25
23
26
24
progressbar_index_widgets_ = [
27
- ' [' ,
28
- progressbar .Timer (format = 'Elapsed %(elapsed)s' ), ', ' ,
29
- progressbar .SimpleProgress (), ' files'
30
- #'count: ', progressbar.Counter(),
31
- '] ' ,
25
+ " [" ,
26
+ progressbar .Timer (format = "Elapsed %(elapsed)s" ),
27
+ ", " ,
28
+ progressbar .SimpleProgress (),
29
+ " files"
30
+ #'count: ', progressbar.Counter(),
31
+ "] " ,
32
32
progressbar .Bar (),
33
- ' (' , progressbar .ETA (), ') ' ,
33
+ " (" ,
34
+ progressbar .ETA (),
35
+ ") " ,
34
36
]
35
37
36
38
37
-
38
39
def cleanup () -> None :
39
40
pass
40
41
@@ -86,37 +87,34 @@ def daemonize() -> None:
86
87
fork_exit_parent ()
87
88
os .setsid ()
88
89
fork_exit_parent ()
89
- os .chdir ('/' )
90
+ os .chdir ("/" )
90
91
config_signal_handlers ()
91
92
os .umask (0o022 )
92
93
redirect_stream (sys .stdin , None )
93
- redirect_stream (sys .stdout , open (' /tmp/fusearch.out' , 'a' ))
94
- redirect_stream (sys .stderr , open (' /tmp/fusearch.err' , 'a' ))
94
+ redirect_stream (sys .stdout , open (" /tmp/fusearch.out" , "a" ))
95
+ redirect_stream (sys .stderr , open (" /tmp/fusearch.err" , "a" ))
95
96
fusearch_main ()
96
97
97
98
98
99
def config_argparse () -> argparse .ArgumentParser :
99
100
parser = argparse .ArgumentParser (description = "fusearch daemon" , epilog = "" )
100
- parser .add_argument ('-f' , '--foreground' , action = 'store_true' ,
101
- help = "Don't daemonize" )
102
- parser .add_argument ('-c' , '--config' , type = str ,
103
- default = '/etc/fusearch/config.yaml' ,
104
- help = "config file" )
101
+ parser .add_argument ("-f" , "--foreground" , action = "store_true" , help = "Don't daemonize" )
102
+ parser .add_argument ("-c" , "--config" , type = str , default = "/etc/fusearch/config.yaml" , help = "config file" )
105
103
return parser
106
104
107
105
108
106
def to_text (file : str ) -> str :
109
107
assert os .path .isfile (file )
110
108
try :
111
- txt_b = textract .process (file , method = ' pdftotext' )
109
+ txt_b = textract .process (file , method = " pdftotext" )
112
110
# TODO more intelligent decoding? there be dragons
113
111
txt = bytes_to_str (txt_b )
114
- #print(file)
115
- #print(len(txt))
116
- #print(txt[:80])
117
- #print('-------------------')
112
+ # print(file)
113
+ # print(len(txt))
114
+ # print(txt[:80])
115
+ # print('-------------------')
118
116
except Exception as e :
119
- txt = ''
117
+ txt = ""
120
118
logging .exception ("Exception while extracting text from '%s'" , file )
121
119
# TODO mark it as failed instead of empty text
122
120
return txt
@@ -125,37 +123,28 @@ def to_text(file: str) -> str:
125
123
def document_from_file (file : str , tokenizer : Tokenizer ) -> Document :
126
124
mtime_latest = mtime (file )
127
125
filename = filename_without_extension (file )
128
- txt = filename + ' \n ' + to_text (file )
126
+ txt = filename + " \n " + to_text (file )
129
127
# Detect language and check that the document makes sense, OCR returns garbage sometimes
130
128
# TODO: add filename to content
131
- document = Document (
132
- url = file ,
133
- filename = filename ,
134
- content = txt ,
135
- tokfreq = tokfreq (tokenizer (txt )),
136
- mtime = mtime_latest )
129
+ document = Document (url = file , filename = filename , content = txt , tokfreq = tokfreq (tokenizer (txt )), mtime = mtime_latest )
137
130
return document
138
131
139
132
140
133
def needs_indexing (index : Index , file : str ) -> bool :
141
134
mtime_latest = mtime (file )
142
- #document = index.document_from_url(file)
135
+ # document = index.document_from_url(file)
143
136
mtime_last_known = index .mtime (file )
144
137
if not mtime_last_known or mtime_last_known and mtime_latest > mtime_last_known :
145
- #logging.debug("needs_indexing: need '%s'", file)
138
+ # logging.debug("needs_indexing: need '%s'", file)
146
139
return True
147
140
else :
148
- #logging.debug("needs_indexing: NOT need '%s'", file)
141
+ # logging.debug("needs_indexing: NOT need '%s'", file)
149
142
return False
150
143
151
144
152
145
def get_index (path : str , config : Config ) -> Index :
153
- index_db = os .path .join (path , '.fusearch.db' )
154
- index = Index ({
155
- 'provider' :'sqlite' ,
156
- 'filename' : index_db ,
157
- 'create_db' : True
158
- }, tokenizer = get_tokenizer (config ))
146
+ index_db = os .path .join (path , ".fusearch.db" )
147
+ index = Index ({"provider" : "sqlite" , "filename" : index_db , "create_db" : True }, tokenizer = get_tokenizer (config ))
159
148
logging .debug ("get_index: '%s' %d docs" , index_db , index .doc_count )
160
149
return index
161
150
@@ -167,7 +156,6 @@ def __init__(self, path, config):
167
156
self .index = get_index (path , config )
168
157
assert os .path .isdir (path )
169
158
170
-
171
159
def __call__ (self ) -> collections .abc .Iterable :
172
160
""":returns a generator of files which are updated from the mtime in the index"""
173
161
file_needs_indexing = functools .partial (needs_indexing , self .index )
@@ -176,22 +164,24 @@ def __call__(self) -> collections.abc.Iterable:
176
164
177
165
def file_producer (path : str , config : Config , file_queue : Queue , file_inventory : io .IOBase ) -> None :
178
166
for file in pickle_loader (file_inventory ):
179
- #logging.debug("file_producer: %s", file)
167
+ # logging.debug("file_producer: %s", file)
180
168
file_queue .put (file )
181
169
logging .debug ("file_producer is done" )
182
170
183
171
184
172
def text_extract (config : Config , file_queue : Queue , document_queue : Queue ):
185
- #logging.debug("text_extract started")
173
+ # logging.debug("text_extract started")
186
174
tokenizer = get_tokenizer (config )
187
175
while True :
188
176
file = file_queue .get ()
189
177
if file is None :
190
178
logging .debug ("text_extract is done" )
191
179
return
192
- logging .debug ("text_extract: file_queue.qsize %d document_queue.qsize %d" , file_queue .qsize (), document_queue .qsize ())
180
+ logging .debug (
181
+ "text_extract: file_queue.qsize %d document_queue.qsize %d" , file_queue .qsize (), document_queue .qsize ()
182
+ )
193
183
logging .debug ("text_extract: '%s'" , file )
194
- #logging.debug("text_extract: %s", file)
184
+ # logging.debug("text_extract: %s", file)
195
185
document = document_from_file (file , tokenizer )
196
186
document_queue .put (document )
197
187
@@ -218,6 +208,7 @@ def document_consumer(path: str, config: Config, document_queue: Queue, file_cou
218
208
pbar .update (file_i )
219
209
file_i += 1
220
210
211
+
221
212
def gather_files (path , config , file_inventory ) -> int :
222
213
""":returns file count"""
223
214
if not os .path .isdir (path ):
@@ -227,23 +218,25 @@ def gather_files(path, config, file_inventory) -> int:
227
218
logging .info ("Calculating number of files to index (.=100files)" )
228
219
if config .verbose :
229
220
widgets = [
230
- ' [' ,
231
- progressbar .Timer (format = 'Elapsed %(elapsed)s' ), ' ' ,
232
- 'count: ' , progressbar .Counter (),
233
- '] ' ,
221
+ " [" ,
222
+ progressbar .Timer (format = "Elapsed %(elapsed)s" ),
223
+ " " ,
224
+ "count: " ,
225
+ progressbar .Counter (),
226
+ "] " ,
234
227
progressbar .BouncingBar (),
235
228
]
236
229
pbar = progressbar .ProgressBar (widgets = widgets )
237
230
file_count = 0
238
231
for file in NeedsIndexFileGenerator (path , config )():
239
232
pickle .dump (file , file_inventory )
240
233
file_count += 1
241
- #if config.verbose and (file_count % 100) == 0:
234
+ # if config.verbose and (file_count % 100) == 0:
242
235
# sys.stdout.write('.')
243
236
# sys.stdout.flush()
244
237
if config .verbose :
245
238
pbar .update (file_count )
246
- #if config.verbose:
239
+ # if config.verbose:
247
240
# sys.stdout.write('\n')
248
241
if config .verbose :
249
242
pbar .finish ()
@@ -260,24 +253,31 @@ def index_do(path, config) -> None:
260
253
else :
261
254
index_serial (path , config , file_count , file_inventory )
262
255
256
+
263
257
def index_parallel (path : str , config : Config , file_count : int , file_inventory ) -> None :
264
258
#
265
259
# file_producer -> N * test_extract -> document_consumer
266
260
#
267
261
# TODO: check that processes are alive to prevent deadlocks on exceptions in children
268
- file_queue = Queue (cpu_count ()* 8 )
262
+ file_queue = Queue (cpu_count () * 8 )
269
263
document_queue = Queue (256 )
270
264
text_extract_procs = []
271
- file_producer_proc = Process (name = 'file producer' , target = file_producer , daemon = True ,
272
- args = (path , config , file_queue , file_inventory ))
265
+ file_producer_proc = Process (
266
+ name = "file producer" , target = file_producer , daemon = True , args = (path , config , file_queue , file_inventory )
267
+ )
273
268
file_producer_proc .start ()
274
269
275
- document_consumer_proc = Process (name = 'document consumer' , target = document_consumer , daemon = True ,
276
- args = (path , config , document_queue , file_count ))
270
+ document_consumer_proc = Process (
271
+ name = "document consumer" , target = document_consumer , daemon = True , args = (path , config , document_queue , file_count )
272
+ )
277
273
278
274
for i in range (cpu_count ()):
279
- p = Process (name = 'text extractor {}' .format (i ), target = text_extract , daemon = True ,
280
- args = (config , file_queue , document_queue ))
275
+ p = Process (
276
+ name = "text extractor {}" .format (i ),
277
+ target = text_extract ,
278
+ daemon = True ,
279
+ args = (config , file_queue , document_queue ),
280
+ )
281
281
text_extract_procs .append (p )
282
282
p .start ()
283
283
document_consumer_proc .start ()
@@ -297,6 +297,7 @@ def index_parallel(path: str, config: Config, file_count: int, file_inventory) -
297
297
document_consumer_proc .join ()
298
298
logging .info ("Parallel indexing finished" )
299
299
300
+
300
301
def index_serial (path , config , file_count , file_inventory ):
301
302
if config .verbose :
302
303
pbar = progressbar .ProgressBar (max_value = file_count , widgets = progressbar_index_widgets_ )
@@ -332,10 +333,10 @@ def script_name() -> str:
332
333
333
334
def config_logging () -> None :
334
335
import time
336
+
335
337
logging .getLogger ().setLevel (logging .DEBUG )
336
338
logging .getLogger ("requests" ).setLevel (logging .WARNING )
337
- logging .basicConfig (format = '{}: %(asctime)sZ %(name)s %(levelname)s %(message)s' .
338
- format (script_name ()))
339
+ logging .basicConfig (format = "{}: %(asctime)sZ %(name)s %(levelname)s %(message)s" .format (script_name ()))
339
340
logging .Formatter .converter = time .gmtime
340
341
341
342
@@ -348,5 +349,5 @@ def main() -> int:
348
349
fusearch_main (args )
349
350
350
351
351
- if __name__ == ' __main__' :
352
+ if __name__ == " __main__" :
352
353
sys .exit (main ())
0 commit comments