-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: ElasticSearch : Timeout context manager should be used inside a task #2614
Comments
Is there any solution or conclusion here? I run into the same problem when using llama-index |
Ran into same issue while using llama-index |
@FlorentGrenier try running the application using uvicorn/gunicorn rather than python app.py, it gives the error but works |
@carlyrichmond any solution ? |
Hi @FlorentGrenier, I'm not a Python expert, so I'll defer to our engineers on this one. I do see one commenter suggesting this is an issue with the |
@FlorentGrenier Hi! I'm not super familiar with the LlamaIndex integration for Elasticsearch, but the issue that you have occurs because objects associated with two different async loops are getting mixed up. The reason a second loop is being used is that you are invoking a synchronous retriever interface. That forces LlamaIndex to internally create a second async loop to run the Elasticsearch code on. Once again I apologize about not being super familiar with LlamaIndex, but it does look like most query functions have an async version, which usually starts with "a", such as |
Hi @carlyrichmond, Yes I actioned the solution, but it didn't solve my problem
Hi @miguelgrinberg, LlamaIndex does indeed offer asynchronous query functions, and following a solution provided in the LlamaIndex issue, I'm using the Here's some more code, for a better context : Streamlit part if st.session_state.messages[-1]["role"] == "user":
with st.chat_message("assistant"):
with st.spinner("Je recherche dans mes documents..."):
response = asyncio.run(response_generator.chat_async(user_query=prompt, stream=False))
response_chat = st.write(response.response)
source_badges = generate_source_badges(Utils.get_sources(response.source_nodes))
st.markdown(source_badges)
st.session_state.messages.append({"role": "assistant", "content": response_chat, "source": source_badges}) Response synthesis part def load_es_index(self) :
async_client = AsyncElasticsearch(
hosts=[Configs.elastic_url],
basic_auth=(Configs.elastic_username, Configs.elastic_password)
)
vector_store = ElasticsearchStore(
index_name=Configs.index_name,
es_url=Configs.elastic_url,
es_client=async_client
)
index = VectorStoreIndex.from_vector_store(vector_store=vector_store,embed_model=Settings.embed_model)
self.index = index
def chat_async(self, user_query, similarity_top_k, stream ):
memory = ChatMemoryBuffer.from_defaults(token_limit=3000)
system_mesage = [
ChatMessage(role=MessageRole.SYSTEM, content= Utils.DEFAULT_SYSTEM_PROMPT + Utils.DEFAULT_PREFIX_PROMPT)
]
chat_engine = self.index.as_chat_engine(
chat_mode="context",
memory=memory,
llm=Settings.llm,
system_prompt = system_mesage,
context_template=Utils.DEFAULT_CONTEXT_PROMPT,
verbose=True
)
# achat() from llama-index
response = chat_engine.achat(user_query)
return response |
@FlorentGrenier your |
@miguelgrinberg I've already tried this but nothing changes |
@FlorentGrenier Please provide an updated stack trace if you are still getting errors. |
@miguelgrinberg Here :
|
The streamlit app is already asynchronous. With the changes you've made, you just moved the place in which the 2nd loop is created higher in the stack trace. You are now doing it yourself in this line: response = asyncio.run(response_generator.chat_async(user_query=prompt, stream=False)) What you need to do to avoid this issue is to use a fully async application. That likely means converting the function that has the line of code above to async, so that you can just use |
I try this (complete code of my streamlit app) import streamlit as st
#sys.path.append('C:/data/git/***/src')
from src.components.response_synthesis import ResponseSynthesis
from src.utils.utils import Utils
import time
from streamlit_pills import pills
import asyncio
import nest_asyncio
nest_asyncio.apply()
st.set_page_config(page_title="AthenAI", page_icon='🤖', layout="centered", initial_sidebar_state="auto", menu_items=None)
@st.cache_resource(show_spinner=False)
def load_data():
with st.spinner(text="Chargement des documents. Cela devrait prendre 1 à 2 minutes."):
response_generator = ResponseSynthesis(local_index=False)
return response_generator
def generate_source_badges(sources):
badges = ""
for file_name, source_info in sources.items():
badges += f" **Fichier**: `{file_name}` | **Pages**: `{source_info['page']}` | **Dernière modification**: `{source_info['last_modified']}` \n"
return badges
def response_generator_stream(response_chat):
for word in response_chat.split():
yield word + " "
time.sleep(0.05)
async def main():
response_generator = load_data()
st.title("AthenAI 💬")
with st.expander("Information"):
st.info("Version de démonstration", icon="ℹ️")
st.markdown('''
Sa base de connaissance est constitué de 4 documents :
- Un document sur le contrat d'apprentissage
- Un guide CFA
- Un document sur ParcourSup
- Un document sur la signature electronique
''')
if "messages" not in st.session_state.keys():
st.session_state.messages = [
{"role": "assistant", "content": "Posez moi une question sur la documentation de Val!"}
]
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
if "source" in message.keys():
st.markdown(message["source"])
if prompt := st.chat_input("Votre question"):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
if st.session_state.messages[-1]["role"] == "user":
with st.chat_message("assistant"):
with st.spinner("Je recherche dans mes documents..."):
response = await response_generator.chat_async(user_query=prompt, stream=False)
response_chat = response.response
st.write(response_chat)
source_badges = generate_source_badges(Utils.get_sources(response.source_nodes))
st.markdown(source_badges)
st.session_state.messages.append({"role": "assistant", "content": response_chat, "source": source_badges})
if __name__ == '__main__':
asyncio.run(main()) Trace :
|
@FlorentGrenier remove all references to |
Done, but still the same error with the same trace... 😅 |
It can't be the same stack trace if you removed nest_asyncio... Maybe similar, but not identical. Can you please share it? I don't have a test application to use to debug this, so I need to see the stack trace. |
|
You are still using nest_asyncio
|
I'm really sorry, but I don't understand where.... streamlit_app.py import streamlit as st
#sys.path.append('C:/data/git/***/src')
from src.components.response_synthesis import ResponseSynthesis
from src.utils.utils import Utils
import time
from streamlit_pills import pills
import asyncio
st.set_page_config(page_title="AthenAI", page_icon='🤖', layout="centered", initial_sidebar_state="auto", menu_items=None)
@st.cache_resource(show_spinner=False)
def load_data():
with st.spinner(text="Chargement des documents. Cela devrait prendre 1 à 2 minutes."):
response_generator = ResponseSynthesis(local_index=False)
return response_generator
def generate_source_badges(sources):
badges = ""
for file_name, source_info in sources.items():
badges += f" **Fichier**: `{file_name}` | **Pages**: `{source_info['page']}` | **Dernière modification**: `{source_info['last_modified']}` \n"
return badges
def response_generator_stream(response_chat):
for word in response_chat.split():
yield word + " "
time.sleep(0.05)
async def main():
response_generator = load_data()
st.title("AthenAI 💬")
with st.expander("Information"):
st.info("Version de démonstration", icon="ℹ️")
st.markdown('''
Sa base de connaissance est constitué de 4 documents :
- Un document sur le contrat d'apprentissage
- Un guide CFA
- Un document sur ParcourSup
- Un document sur la signature electronique
''')
if "messages" not in st.session_state.keys():
st.session_state.messages = [
{"role": "assistant", "content": "Posez moi une question sur la documentation de Val!"}
]
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
if "source" in message.keys():
st.markdown(message["source"])
if prompt := st.chat_input("Votre question"):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
if st.session_state.messages[-1]["role"] == "user":
with st.chat_message("assistant"):
with st.spinner("Je recherche dans mes documents..."):
response = await response_generator.chat_async(user_query=prompt, stream=False)
response_chat = response.response
st.write(response_chat)
source_badges = generate_source_badges(Utils.get_sources(response.source_nodes))
st.markdown(source_badges)
st.session_state.messages.append({"role": "assistant", "content": response_chat, "source": source_badges})
if __name__ == '__main__':
asyncio.run(main()) response_synthesis.py from elasticsearch import AsyncElasticsearch
from llama_index.core.chat_engine.context import ContextChatEngine
from llama_index.core.chat_engine.simple import SimpleChatEngine
from llama_index.vector_stores.elasticsearch import ElasticsearchStore
from llama_index.core.prompts.prompt_type import PromptType
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.ollama import Ollama
from llama_index.core import (
VectorStoreIndex, PromptTemplate, StorageContext,
load_index_from_storage, Settings, get_response_synthesizer
)
from llama_index.core.tools import FunctionTool
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.llms import LLM, ChatMessage, MessageRole
from typing import List, Tuple
from llama_index.core.base.response.schema import RESPONSE_TYPE, NodeWithScore
from src.variables.config import Configs
from src.utils.utils import Utils
import os
import asyncio
class ResponseSynthesis:
def __init__(self, local_index: bool) -> None :
Settings.embed_model = HuggingFaceEmbedding(model_name=Configs.embed_model, embed_batch_size=Configs.batch_size)
Settings.llm = Ollama(model=Configs.model, request_timeout=1000)
if local_index:
self.load_local_index()
else:
self.load_es_index()
def load_es_index(self) -> None :
async_client = AsyncElasticsearch(
hosts=[Configs.elastic_url],
basic_auth=(Configs.elastic_username, Configs.elastic_password)
)
try:
vector_store = ElasticsearchStore(
index_name=Configs.index_name,
es_url=Configs.elastic_url,
es_client=async_client
)
self.index = VectorStoreIndex.from_vector_store(vector_store=vector_store, embed_model=Settings.embed_model)
finally:
async_client.transport.close()
def load_local_index(self) -> None:
if os.path.exists(Configs.local_index_dir):
storage_context = StorageContext.from_defaults(persist_dir=Configs.local_index_dir)
index = load_index_from_storage(storage_context)
self.index = index
else:
raise FileNotFoundError(f"Le dossier de stockage d'index '{Configs.local_index_dir}' n'existe pas. Veuillez d'abord exécuter le script d'indexation.")
def query(self, user_query: str, similarity_top_k: int=Configs.similarity_top_k) -> RESPONSE_TYPE:
query_engine = self.index.as_query_engine(similarity_top_k=similarity_top_k, llm=Settings.llm)
query_engine.update_prompts(
{"response_synthesizer:text_qa_template": self.build_context_prompt()}
)
query_response = query_engine.query(user_query)
return query_response
def retriever(self, user_query: str, similarity_top_k: int = Configs.similarity_top_k) -> List[NodeWithScore] :
retriever = self.index.as_retriever(similarity_top_k=similarity_top_k)
retrieved_nodes = retriever.retrieve(user_query)
return retrieved_nodes
def chat(self, user_query: str, similarity_top_k: int = Configs.similarity_top_k, stream: bool = True):
memory = ChatMemoryBuffer.from_defaults(token_limit=3000)
system_mesage = [
ChatMessage(role=MessageRole.SYSTEM, content= Utils.DEFAULT_SYSTEM_PROMPT + Utils.DEFAULT_PREFIX_PROMPT)
]
chat_engine = self.index.as_chat_engine(
chat_mode="context",
memory=memory,
llm=Settings.llm,
system_prompt = system_mesage,
context_template=Utils.DEFAULT_CONTEXT_PROMPT,
verbose=True
)
if stream :
response = chat_engine.stream_chat(user_query)
else :
response = chat_engine.chat(user_query)
return response
async def chat_async(self, user_query: str, similarity_top_k: int = Configs.similarity_top_k, stream: bool = True):
memory = ChatMemoryBuffer.from_defaults(token_limit=3000)
system_mesage = [
ChatMessage(role=MessageRole.SYSTEM, content= Utils.DEFAULT_SYSTEM_PROMPT + Utils.DEFAULT_PREFIX_PROMPT)
]
# my_retriever = self.index.as_retriever(similarity_top_k=similarity_top_k)
chat_engine = self.index.as_chat_engine(
chat_mode="context",
memory=memory,
llm=Settings.llm,
system_prompt = system_mesage,
context_template=Utils.DEFAULT_CONTEXT_PROMPT,
verbose=True
)
response = await chat_engine.achat(user_query)
return response
|
Okay. I'm not exactly sure, but you do have some sync code, maybe that is causing a second loop to run. In any case, here is another possible workaround. The issues you are having are caused by async_client = AsyncElasticsearch(
hosts=[Configs.elastic_url],
basic_auth=(Configs.elastic_username, Configs.elastic_password),
node_class='httpxasync' # <--- add this argument
) Also make sure you have the I'm thinking if that does not work I'm going to have to invest some time and build a small streamlit + llama-index app to test this myself, so let me know. It may take me a couple of weeks, but I can look into it if httpx does not solve your issue. |
Hey ! I tested your solution with |
Bug Description
I'm developing a chatbot, and on a second request sent the bug appears
I opened an issue on the github of the llama_index library, but apparently the bug comes more from the elastic search library.
I opened a post on elastic search forum
Version
elasticsearch : 8.14.0
elastic-transport : 8.13.1
Steps to Reproduce
With llama_index, send a second query to the RetrieverQueryEngine, built from a VectorIndexRetriever, a VectorStoreIndex and an ElasticSearchVectorStore.
Relevant Logs/Tracbacks
The text was updated successfully, but these errors were encountered: