forked from microsoft/promptflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuild_index.py
64 lines (47 loc) · 2.11 KB
/
build_index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import PyPDF2
import faiss
import os
from utils.oai import OAIEmbedding
from utils.index import FAISSIndex
from utils.logging import log
from utils.lock import acquire_lock
def create_faiss_index(pdf_path: str) -> str:
chunk_size = int(os.environ.get("CHUNK_SIZE"))
chunk_overlap = int(os.environ.get("CHUNK_OVERLAP"))
log(f"Chunk size: {chunk_size}, chunk overlap: {chunk_overlap}")
index_persistent_path = ".index/" + pdf_path + f".index_{chunk_size}_{chunk_overlap}"
lock_path = index_persistent_path + ".lock"
log("Index path: " + os.path.abspath(index_persistent_path))
with acquire_lock(lock_path):
if os.path.exists(os.path.join(index_persistent_path, "index.faiss")):
log("Index already exists, bypassing index creation")
return index_persistent_path
else:
if not os.path.exists(index_persistent_path):
os.makedirs(index_persistent_path)
log("Building index")
pdf_reader = PyPDF2.PdfReader(pdf_path)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
# Chunk the words into segments of X words with Y-word overlap, X=CHUNK_SIZE, Y=OVERLAP_SIZE
segments = split_text(text, chunk_size, chunk_overlap)
log(f"Number of segments: {len(segments)}")
index = FAISSIndex(index=faiss.IndexFlatL2(1536), embedding=OAIEmbedding())
index.insert_batch(segments)
index.save(index_persistent_path)
log("Index built: " + index_persistent_path)
return index_persistent_path
# Split the text into chunks with CHUNK_SIZE and CHUNK_OVERLAP as character count
def split_text(text, chunk_size, chunk_overlap):
# Calculate the number of chunks
num_chunks = (len(text) - chunk_overlap) // (chunk_size - chunk_overlap)
# Split the text into chunks
chunks = []
for i in range(num_chunks):
start = i * (chunk_size - chunk_overlap)
end = start + chunk_size
chunks.append(text[start:end])
# Add the last chunk
chunks.append(text[num_chunks * (chunk_size - chunk_overlap):])
return chunks