From 56af625d71afc62f4830de07079aea2e1c938e62 Mon Sep 17 00:00:00 2001 From: lopagela Date: Thu, 30 Nov 2023 11:41:55 +0100 Subject: [PATCH] Fix the parallel ingestion mode, and make it available through conf (#1336) * Fix the parallel ingestion mode, and make it available through conf Also updated the documentation to show how to configure the ingest mode. * PR feedback: redirect to documentation --- fern/docs/pages/manual/ingestion.mdx | 37 +++++- .../components/ingest/ingest_component.py | 113 +++++++++++------- private_gpt/server/ingest/ingest_service.py | 7 +- private_gpt/settings/settings.py | 24 ++++ settings.yaml | 4 + 5 files changed, 140 insertions(+), 45 deletions(-) diff --git a/fern/docs/pages/manual/ingestion.mdx b/fern/docs/pages/manual/ingestion.mdx index eeb8fdc..39b83cc 100644 --- a/fern/docs/pages/manual/ingestion.mdx +++ b/fern/docs/pages/manual/ingestion.mdx @@ -35,7 +35,7 @@ or using the completions / chat API. ## Ingestion troubleshooting -Are you running out of memory when ingesting files? +### Running out of memory To do not run out of memory, you should ingest your documents without the LLM loaded in your (video) memory. To do so, you should change your configuration to set `llm.mode: mock`. @@ -53,7 +53,42 @@ This configuration allows you to use hardware acceleration for creating embeddin Once your documents are ingested, you can set the `llm.mode` value back to `local` (or your previous custom value). +### Ingestion speed +The ingestion speed depends on the number of documents you are ingesting, and the size of each document. +To speed up the ingestion, you can change the ingestion mode in configuration. + +The following ingestion mode exist: +* `simple`: historic behavior, ingest one document at a time, sequentially +* `batch`: read, parse, and embed multiple documents using batches (batch read, and then batch parse, and then batch embed) +* `parallel`: read, parse, and embed multiple documents in parallel. This is the fastest ingestion mode for local setup. +To change the ingestion mode, you can use the `embedding.ingest_mode` configuration value. The default value is `simple`. + +To configure the number of workers used for parallel or batched ingestion, you can use +the `embedding.count_workers` configuration value. If you set this value too high, you might run out of +memory, so be mindful when setting this value. The default value is `2`. +For `batch` mode, you can easily set this value to your number of threads available on your CPU without +running out of memory. For `parallel` mode, you should be more careful, and set this value to a lower value. + +The configuration below should be enough for users who want to stress more their hardware: +```yaml +embedding: + ingest_mode: parallel + count_workers: 4 +``` + +If your hardware is powerful enough, and that you are loading heavy documents, you can increase the number of workers. +It is recommended to do your own tests to find the optimal value for your hardware. + +If you have a `bash` shell, you can use this set of command to do your own benchmark: + +```bash +# Wipe your local data, to put yourself in a clean state +# This will delete all your ingested documents +make wipe + +time PGPT_PROFILES=mock python ./scripts/ingest_folder.py ~/my-dir/to-ingest/ +``` ## Supported file formats diff --git a/private_gpt/components/ingest/ingest_component.py b/private_gpt/components/ingest/ingest_component.py index f4abaf6..e8ec1e8 100644 --- a/private_gpt/components/ingest/ingest_component.py +++ b/private_gpt/components/ingest/ingest_component.py @@ -21,6 +21,7 @@ from llama_index.ingestion import run_transformations from private_gpt.components.ingest.ingest_helper import IngestionHelper from private_gpt.paths import local_data_path +from private_gpt.settings.settings import Settings logger = logging.getLogger(__name__) @@ -62,7 +63,7 @@ class BaseIngestComponentWithIndex(BaseIngestComponent, abc.ABC): self.show_progress = True self._index_thread_lock = ( - threading.RLock() + threading.Lock() ) # Thread lock! Not Multiprocessing lock self._index = self._initialize_index() @@ -141,18 +142,17 @@ class SimpleIngestComponent(BaseIngestComponentWithIndex): return documents -class MultiWorkerIngestComponent(BaseIngestComponentWithIndex): +class BatchIngestComponent(BaseIngestComponentWithIndex): """Parallelize the file reading and parsing on multiple CPU core. This also makes the embeddings to be computed in batches (on GPU or CPU). """ - BULK_INGEST_WORKER_NUM = max((os.cpu_count() or 1) - 1, 1) - def __init__( self, storage_context: StorageContext, service_context: ServiceContext, + count_workers: int, *args: Any, **kwargs: Any, ) -> None: @@ -162,6 +162,12 @@ class MultiWorkerIngestComponent(BaseIngestComponentWithIndex): assert ( len(self.service_context.transformations) >= 2 ), "Embeddings must be in the transformations" + assert count_workers > 0, "count_workers must be > 0" + self.count_workers = count_workers + + self._file_to_documents_work_pool = multiprocessing.Pool( + processes=self.count_workers + ) def ingest(self, file_name: str, file_data: Path) -> list[Document]: logger.info("Ingesting file_name=%s", file_name) @@ -173,12 +179,13 @@ class MultiWorkerIngestComponent(BaseIngestComponentWithIndex): return self._save_docs(documents) def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]: - with multiprocessing.Pool(processes=self.BULK_INGEST_WORKER_NUM) as pool: - documents = list( - itertools.chain.from_iterable( - pool.starmap(IngestionHelper.transform_file_into_documents, files) + documents = list( + itertools.chain.from_iterable( + self._file_to_documents_work_pool.starmap( + IngestionHelper.transform_file_into_documents, files ) ) + ) logger.info( "Transformed count=%s files into count=%s documents", len(files), @@ -195,7 +202,7 @@ class MultiWorkerIngestComponent(BaseIngestComponentWithIndex): ) # Locking the index to avoid concurrent writes with self._index_thread_lock: - logger.debug("Inserting count=%s nodes in the index", len(nodes)) + logger.info("Inserting count=%s nodes in the index", len(nodes)) self._index.insert_nodes(nodes, show_progress=True) for document in documents: self._index.docstore.set_document_hash( @@ -213,51 +220,43 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex): This use the CPU and GPU in parallel (both running at the same time), and reduce the memory pressure by not loading all the files in memory at the same time. - - FIXME: this is not working as well as planned because of the usage of - the multiprocessing worker pool. """ - BULK_INGEST_WORKER_NUM = max((os.cpu_count() or 1) - 1, 1) - def __init__( self, storage_context: StorageContext, service_context: ServiceContext, + count_workers: int, *args: Any, **kwargs: Any, ) -> None: super().__init__(storage_context, service_context, *args, **kwargs) - # Make an efficient use of the CPU and GPU, the embedding - # must be in the transformations + # To make an efficient use of the CPU and GPU, the embeddings + # must be in the transformations (to be computed in batches) assert ( len(self.service_context.transformations) >= 2 ), "Embeddings must be in the transformations" + assert count_workers > 0, "count_workers must be > 0" + self.count_workers = count_workers # We are doing our own multiprocessing # To do not collide with the multiprocessing of huggingface, we disable it os.environ["TOKENIZERS_PARALLELISM"] = "false" + self._ingest_work_pool = multiprocessing.pool.ThreadPool( + processes=self.count_workers + ) + + self._file_to_documents_work_pool = multiprocessing.Pool( + processes=self.count_workers + ) + def ingest(self, file_name: str, file_data: Path) -> list[Document]: logger.info("Ingesting file_name=%s", file_name) - # FIXME there are some cases where the process is not finished - # causing deadlocks. More information using trace: - # time PGPT_PROFILES=ingest-local python -m trace --trace \ - # ./scripts/ingest_folder.py ... &> ingestion.traces - with multiprocessing.Pool(processes=1) as pool: - # Running in a single (1) process to release the current - # thread, and take a dedicated CPU core for computation - a_documents = pool.apply_async( - IngestionHelper.transform_file_into_documents, (file_name, file_data) - ) - while True: - # FIXME ugly hack to highlight the deadlock in traces - try: - documents = list(a_documents.get(timeout=2)) - except multiprocessing.TimeoutError: - continue - break - pool.close() - pool.terminate() + # Running in a single (1) process to release the current + # thread, and take a dedicated CPU core for computation + documents = self._file_to_documents_work_pool.apply( + IngestionHelper.transform_file_into_documents, (file_name, file_data) + ) logger.info( "Transformed file=%s into count=%s documents", file_name, len(documents) ) @@ -267,12 +266,12 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex): def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]: # Lightweight threads, used for parallelize the # underlying IO calls made in the ingestion - with multiprocessing.pool.ThreadPool( - processes=self.BULK_INGEST_WORKER_NUM - ) as pool: - documents = list( - itertools.chain.from_iterable(pool.starmap(self.ingest, files)) + + documents = list( + itertools.chain.from_iterable( + self._ingest_work_pool.starmap(self.ingest, files) ) + ) return documents def _save_docs(self, documents: list[Document]) -> list[Document]: @@ -284,7 +283,7 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex): ) # Locking the index to avoid concurrent writes with self._index_thread_lock: - logger.debug("Inserting count=%s nodes in the index", len(nodes)) + logger.info("Inserting count=%s nodes in the index", len(nodes)) self._index.insert_nodes(nodes, show_progress=True) for document in documents: self._index.docstore.set_document_hash( @@ -295,3 +294,35 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex): self._save_index() logger.debug("Persisted the index and nodes") return documents + + def __del__(self) -> None: + # We need to do the appropriate cleanup of the multiprocessing pools + # when the object is deleted. Using root logger to avoid + # the logger to be deleted before the pool + logging.debug("Closing the ingest work pool") + self._ingest_work_pool.close() + self._ingest_work_pool.join() + self._ingest_work_pool.terminate() + logging.debug("Closing the file to documents work pool") + self._file_to_documents_work_pool.close() + self._file_to_documents_work_pool.join() + self._file_to_documents_work_pool.terminate() + + +def get_ingestion_component( + storage_context: StorageContext, + service_context: ServiceContext, + settings: Settings, +) -> BaseIngestComponent: + """Get the ingestion component for the given configuration.""" + ingest_mode = settings.embedding.ingest_mode + if ingest_mode == "batch": + return BatchIngestComponent( + storage_context, service_context, settings.embedding.count_workers + ) + elif ingest_mode == "parallel": + return ParallelizedIngestComponent( + storage_context, service_context, settings.embedding.count_workers + ) + else: + return SimpleIngestComponent(storage_context, service_context) diff --git a/private_gpt/server/ingest/ingest_service.py b/private_gpt/server/ingest/ingest_service.py index b0455cc..4112024 100644 --- a/private_gpt/server/ingest/ingest_service.py +++ b/private_gpt/server/ingest/ingest_service.py @@ -11,13 +11,14 @@ from llama_index import ( from llama_index.node_parser import SentenceWindowNodeParser from private_gpt.components.embedding.embedding_component import EmbeddingComponent -from private_gpt.components.ingest.ingest_component import SimpleIngestComponent +from private_gpt.components.ingest.ingest_component import get_ingestion_component from private_gpt.components.llm.llm_component import LLMComponent from private_gpt.components.node_store.node_store_component import NodeStoreComponent from private_gpt.components.vector_store.vector_store_component import ( VectorStoreComponent, ) from private_gpt.server.ingest.model import IngestedDoc +from private_gpt.settings.settings import settings logger = logging.getLogger(__name__) @@ -48,8 +49,8 @@ class IngestService: transformations=[node_parser, embedding_component.embedding_model], ) - self.ingest_component = SimpleIngestComponent( - self.storage_context, self.ingest_service_context + self.ingest_component = get_ingestion_component( + self.storage_context, self.ingest_service_context, settings=settings() ) def ingest(self, file_name: str, file_data: Path) -> list[IngestedDoc]: diff --git a/private_gpt/settings/settings.py b/private_gpt/settings/settings.py index 7308104..125396c 100644 --- a/private_gpt/settings/settings.py +++ b/private_gpt/settings/settings.py @@ -121,6 +121,30 @@ class LocalSettings(BaseModel): class EmbeddingSettings(BaseModel): mode: Literal["local", "openai", "sagemaker", "mock"] + ingest_mode: Literal["simple", "batch", "parallel"] = Field( + "simple", + description=( + "The ingest mode to use for the embedding engine:\n" + "If `simple` - ingest files sequentially and one by one. It is the historic behaviour.\n" + "If `batch` - if multiple files, parse all the files in parallel, " + "and send them in batch to the embedding model.\n" + "If `parallel` - parse the files in parallel using multiple cores, and embedd them in parallel.\n" + "`parallel` is the fastest mode for local setup, as it parallelize IO RW in the index.\n" + "For modes that leverage parallelization, you can specify the number of " + "workers to use with `count_workers`.\n" + ), + ) + count_workers: int = Field( + 2, + description=( + "The number of workers to use for file ingestion.\n" + "In `batch` mode, this is the number of workers used to parse the files.\n" + "In `parallel` mode, this is the number of workers used to parse the files and embed them.\n" + "This is only used if `ingest_mode` is not `simple`.\n" + "Do not go too high with this number, as it might cause memory issues. (especially in `parallel` mode)\n" + "Do not set it higher than your number of threads of your CPU." + ), + ) class SagemakerSettings(BaseModel): diff --git a/settings.yaml b/settings.yaml index 05aec53..815ed09 100644 --- a/settings.yaml +++ b/settings.yaml @@ -1,3 +1,6 @@ +# The default configuration file. +# More information about configuration can be found in the documentation: https://docs.privategpt.dev/ +# Syntax in `private_pgt/settings/settings.py` server: env_name: ${APP_ENV:prod} port: ${PORT:8001} @@ -26,6 +29,7 @@ llm: embedding: # Should be matching the value above in most cases mode: local + ingest_mode: simple vectorstore: database: qdrant