From 91163a247b5e18f8daf976a8fe11484cc3cf266d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Mart=C3=ADnez?= Date: Thu, 31 Aug 2023 16:36:19 +0200 Subject: [PATCH] Batch embeddings to be processed by chromadb --- .gitignore | 2 +- ingest.py | 35 ++++++++++++++++++++++++++--------- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 1f7d3a1..96b9a16 100644 --- a/.gitignore +++ b/.gitignore @@ -169,5 +169,5 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +# vscode .vscode/launch.json -persist_directory/chroma.sqlite3 diff --git a/ingest.py b/ingest.py index d7a6635..c139389 100755 --- a/ingest.py +++ b/ingest.py @@ -31,6 +31,7 @@ if not load_dotenv(): from constants import CHROMA_SETTINGS import chromadb +from chromadb.api.segment import API # Load environment variables persist_directory = os.environ.get('PERSIST_DIRECTORY') @@ -126,9 +127,20 @@ def process_documents(ignored_files: List[str] = []) -> List[Document]: exit(0) print(f"Loaded {len(documents)} new documents from {source_directory}") text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) - texts = text_splitter.split_documents(documents) - print(f"Split into {len(texts)} chunks of text (max. {chunk_size} tokens each)") - return texts + documents = text_splitter.split_documents(documents) + print(f"Split into {len(documents)} chunks of text (max. {chunk_size} tokens each)") + return documents + +def batch_chromadb_insertions(chroma_client: API, documents: List[Document]) -> List[Document]: + """ + Split the total documents to be inserted into batches of documents that the local chroma client can process + """ + # Get max batch size. + # Note: temp hack given max_batch_size is not yet exposed by ChromaDB API (WIP). + max_batch_size = chroma_client._producer.max_batch_size + for i in range(0, len(documents), max_batch_size): + yield documents[i:i + max_batch_size] + def does_vectorstore_exist(persist_directory: str, embeddings: HuggingFaceEmbeddings) -> bool: """ @@ -150,17 +162,22 @@ def main(): print(f"Appending to existing vectorstore at {persist_directory}") db = Chroma(persist_directory=persist_directory, embedding_function=embeddings, client_settings=CHROMA_SETTINGS, client=chroma_client) collection = db.get() - texts = process_documents([metadata['source'] for metadata in collection['metadatas']]) + documents = process_documents([metadata['source'] for metadata in collection['metadatas']]) print(f"Creating embeddings. May take some minutes...") - db.add_documents(texts) + for batched_chromadb_insertion in batch_chromadb_insertions(chroma_client, documents): + db.add_documents(batched_chromadb_insertion) else: # Create and store locally vectorstore print("Creating new vectorstore") - texts = process_documents() + documents = process_documents() print(f"Creating embeddings. May take some minutes...") - db = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory, client_settings=CHROMA_SETTINGS, client=chroma_client) - db.persist() - db = None + # Create the db with the first batch of documents to insert + batched_chromadb_insertions = batch_chromadb_insertions(chroma_client, documents) + first_insertion = next(batched_chromadb_insertions) + db = Chroma.from_documents(first_insertion, embeddings, persist_directory=persist_directory, client_settings=CHROMA_SETTINGS, client=chroma_client) + # Add the rest of batches of documents + for batched_chromadb_insertion in batched_chromadb_insertions: + db.add_documents(batched_chromadb_insertion) print(f"Ingestion complete! You can now run privateGPT.py to query your documents")