Implement a way of ingesting more documents
Move environment variables to the global scope Add a better check for vectorstore existence Introduced a new function for better readability Co-authored-by: Pulp <51127079+PulpCattel@users.noreply.github.com>
This commit is contained in:
		
							parent
							
								
									42046c5ec0
								
							
						
					
					
						commit
						7844553ca1
					
				
							
								
								
									
										67
									
								
								ingest.py
								
								
								
								
							
							
						
						
									
										67
									
								
								ingest.py
								
								
								
								
							|  | @ -24,6 +24,16 @@ from langchain.docstore.document import Document | ||||||
| from constants import CHROMA_SETTINGS | from constants import CHROMA_SETTINGS | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | load_dotenv() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # Load environment variables | ||||||
|  | persist_directory = os.environ.get('PERSIST_DIRECTORY') | ||||||
|  | source_directory = os.environ.get('SOURCE_DIRECTORY', 'source_documents') | ||||||
|  | embeddings_model_name = os.environ.get('EMBEDDINGS_MODEL_NAME') | ||||||
|  | chunk_size = 500 | ||||||
|  | chunk_overlap = 50 | ||||||
|  |      | ||||||
| # Map file extensions to document loaders and their arguments | # Map file extensions to document loaders and their arguments | ||||||
| LOADER_MAPPING = { | LOADER_MAPPING = { | ||||||
|     ".csv": (CSVLoader, {}), |     ".csv": (CSVLoader, {}), | ||||||
|  | @ -44,7 +54,6 @@ LOADER_MAPPING = { | ||||||
| 
 | 
 | ||||||
| load_dotenv() | load_dotenv() | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
| def load_single_document(file_path: str) -> Document: | def load_single_document(file_path: str) -> Document: | ||||||
|     ext = "." + file_path.rsplit(".", 1)[-1] |     ext = "." + file_path.rsplit(".", 1)[-1] | ||||||
|     if ext in LOADER_MAPPING: |     if ext in LOADER_MAPPING: | ||||||
|  | @ -55,36 +64,60 @@ def load_single_document(file_path: str) -> Document: | ||||||
|     raise ValueError(f"Unsupported file extension '{ext}'") |     raise ValueError(f"Unsupported file extension '{ext}'") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def load_documents(source_dir: str) -> List[Document]: | def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[Document]: | ||||||
|     # Loads all documents from source documents directory |     """ | ||||||
|  |     Loads all documents from the source documents directory, ignoring specified files | ||||||
|  |     """ | ||||||
|     all_files = [] |     all_files = [] | ||||||
|     for ext in LOADER_MAPPING: |     for ext in LOADER_MAPPING: | ||||||
|         all_files.extend( |         all_files.extend( | ||||||
|             glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True) |             glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True) | ||||||
|         ) |         ) | ||||||
|     return [load_single_document(file_path) for file_path in all_files] |     filtered_files = [file_path for file_path in all_files if file_path not in ignored_files] | ||||||
|  |     return [load_single_document(file_path) for file_path in filtered_files] | ||||||
| 
 | 
 | ||||||
| 
 | def process_documents(ignored_files: List[str] = []) -> List[Document]: | ||||||
| def main(): |     """ | ||||||
|     # Load environment variables |     Load documents and split in chunks | ||||||
|     persist_directory = os.environ.get('PERSIST_DIRECTORY') |     """ | ||||||
|     source_directory = os.environ.get('SOURCE_DIRECTORY', 'source_documents') |  | ||||||
|     embeddings_model_name = os.environ.get('EMBEDDINGS_MODEL_NAME') |  | ||||||
| 
 |  | ||||||
|     # Load documents and split in chunks |  | ||||||
|     print(f"Loading documents from {source_directory}") |     print(f"Loading documents from {source_directory}") | ||||||
|     chunk_size = 500 |     documents = load_documents(source_directory, ignored_files) | ||||||
|     chunk_overlap = 50 |     if not documents: | ||||||
|     documents = load_documents(source_directory) |         print("No new documents to load") | ||||||
|  |         exit(0) | ||||||
|  |     print(f"Loaded {len(documents)} new documents from {source_directory}") | ||||||
|     text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) |     text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) | ||||||
|     texts = text_splitter.split_documents(documents) |     texts = text_splitter.split_documents(documents) | ||||||
|     print(f"Loaded {len(documents)} documents from {source_directory}") |     print(f"Split into {len(texts)} chunks of text (max. 500 tokens each)") | ||||||
|     print(f"Split into {len(texts)} chunks of text (max. {chunk_size} characters each)") |     return texts | ||||||
| 
 | 
 | ||||||
|  | def does_vectorstore_exist(persist_directory: str) -> bool: | ||||||
|  |     """ | ||||||
|  |     Checks if vectorstore exists | ||||||
|  |     """ | ||||||
|  |     if os.path.exists(os.path.join(persist_directory, 'index')): | ||||||
|  |          if os.path.exists(os.path.join(persist_directory, 'chroma-collections.parquet')) and os.path.exists(os.path.join(persist_directory, 'chroma-embeddings.parquet')): | ||||||
|  |             list_index_files = glob.glob(os.path.join(persist_directory, 'index/*.bin')) | ||||||
|  |             list_index_files += glob.glob(os.path.join(persist_directory, 'index/*.pkl')) | ||||||
|  |             if len(list_index_files) == 4: | ||||||
|  |                 return True | ||||||
|  |     return False | ||||||
|  | 
 | ||||||
|  | def main(): | ||||||
|     # Create embeddings |     # Create embeddings | ||||||
|     embeddings = HuggingFaceEmbeddings(model_name=embeddings_model_name) |     embeddings = HuggingFaceEmbeddings(model_name=embeddings_model_name) | ||||||
|      |      | ||||||
|  |     if does_vectorstore_exist(persist_directory): | ||||||
|  |         # Update and store locally vectorstore | ||||||
|  |         print(f"Appending to existing vectorstore at {persist_directory}") | ||||||
|  |         db = Chroma(persist_directory=persist_directory, embedding_function=embeddings, client_settings=CHROMA_SETTINGS) | ||||||
|  |         collection = db.get() | ||||||
|  |         texts = process_documents([metadata['source'] for metadata in collection['metadatas']]) | ||||||
|  |         db.add_documents(texts) | ||||||
|  |     else: | ||||||
|         # Create and store locally vectorstore |         # Create and store locally vectorstore | ||||||
|  |         print("Creating new vectorstore") | ||||||
|  |         texts = process_documents() | ||||||
|         db = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory, client_settings=CHROMA_SETTINGS) |         db = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory, client_settings=CHROMA_SETTINGS) | ||||||
|     db.persist() |     db.persist() | ||||||
|     db = None |     db = None | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue