Merge pull request #292 from jiangzhuo/feature/multiprocessing-for-document-loading
Optimize load_documents function with multiprocessing
This commit is contained in:
		
						commit
						20554a7c9d
					
				
							
								
								
									
										13
									
								
								ingest.py
								
								
								
								
							
							
						
						
									
										13
									
								
								ingest.py
								
								
								
								
							|  | @ -3,6 +3,8 @@ import os | ||||||
| import glob | import glob | ||||||
| from typing import List | from typing import List | ||||||
| from dotenv import load_dotenv | from dotenv import load_dotenv | ||||||
|  | from multiprocessing import Pool | ||||||
|  | from tqdm import tqdm | ||||||
| 
 | 
 | ||||||
| from langchain.document_loaders import ( | from langchain.document_loaders import ( | ||||||
|     CSVLoader, |     CSVLoader, | ||||||
|  | @ -79,7 +81,6 @@ def load_single_document(file_path: str) -> Document: | ||||||
| 
 | 
 | ||||||
|     raise ValueError(f"Unsupported file extension '{ext}'") |     raise ValueError(f"Unsupported file extension '{ext}'") | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
| def load_documents(source_dir: str) -> List[Document]: | def load_documents(source_dir: str) -> List[Document]: | ||||||
|     # Loads all documents from source documents directory |     # Loads all documents from source documents directory | ||||||
|     all_files = [] |     all_files = [] | ||||||
|  | @ -87,7 +88,15 @@ def load_documents(source_dir: str) -> List[Document]: | ||||||
|         all_files.extend( |         all_files.extend( | ||||||
|             glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True) |             glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True) | ||||||
|         ) |         ) | ||||||
|     return [load_single_document(file_path) for file_path in all_files] | 
 | ||||||
|  |     with Pool(processes=os.cpu_count()) as pool: | ||||||
|  |         results = [] | ||||||
|  |         with tqdm(total=len(all_files), desc='Loading documents', ncols=80) as pbar: | ||||||
|  |             for i, doc in enumerate(pool.imap_unordered(load_single_document, all_files)): | ||||||
|  |                 results.append(doc) | ||||||
|  |                 pbar.update() | ||||||
|  | 
 | ||||||
|  |     return results | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def main(): | def main(): | ||||||
|  |  | ||||||
|  | @ -10,3 +10,4 @@ extract-msg==0.41.1 | ||||||
| tabulate==0.9.0 | tabulate==0.9.0 | ||||||
| pandoc==2.3 | pandoc==2.3 | ||||||
| pypandoc==1.11 | pypandoc==1.11 | ||||||
|  | tqdm==4.65.0 | ||||||
		Loading…
	
		Reference in New Issue