Fix the parallel ingestion mode, and make it available through conf (#1336)

* Fix the parallel ingestion mode, and make it available through conf

Also updated the documentation to show how to configure the ingest mode.

* PR feedback: redirect to documentation
This commit is contained in:
lopagela 2023-11-30 11:41:55 +01:00 committed by GitHub
parent b7ca7d35a0
commit 56af625d71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 140 additions and 45 deletions

View File

@ -35,7 +35,7 @@ or using the completions / chat API.
## Ingestion troubleshooting
Are you running out of memory when ingesting files?
### Running out of memory
To do not run out of memory, you should ingest your documents without the LLM loaded in your (video) memory.
To do so, you should change your configuration to set `llm.mode: mock`.
@ -53,7 +53,42 @@ This configuration allows you to use hardware acceleration for creating embeddin
Once your documents are ingested, you can set the `llm.mode` value back to `local` (or your previous custom value).
### Ingestion speed
The ingestion speed depends on the number of documents you are ingesting, and the size of each document.
To speed up the ingestion, you can change the ingestion mode in configuration.
The following ingestion mode exist:
* `simple`: historic behavior, ingest one document at a time, sequentially
* `batch`: read, parse, and embed multiple documents using batches (batch read, and then batch parse, and then batch embed)
* `parallel`: read, parse, and embed multiple documents in parallel. This is the fastest ingestion mode for local setup.
To change the ingestion mode, you can use the `embedding.ingest_mode` configuration value. The default value is `simple`.
To configure the number of workers used for parallel or batched ingestion, you can use
the `embedding.count_workers` configuration value. If you set this value too high, you might run out of
memory, so be mindful when setting this value. The default value is `2`.
For `batch` mode, you can easily set this value to your number of threads available on your CPU without
running out of memory. For `parallel` mode, you should be more careful, and set this value to a lower value.
The configuration below should be enough for users who want to stress more their hardware:
```yaml
embedding:
ingest_mode: parallel
count_workers: 4
```
If your hardware is powerful enough, and that you are loading heavy documents, you can increase the number of workers.
It is recommended to do your own tests to find the optimal value for your hardware.
If you have a `bash` shell, you can use this set of command to do your own benchmark:
```bash
# Wipe your local data, to put yourself in a clean state
# This will delete all your ingested documents
make wipe
time PGPT_PROFILES=mock python ./scripts/ingest_folder.py ~/my-dir/to-ingest/
```
## Supported file formats

View File

@ -21,6 +21,7 @@ from llama_index.ingestion import run_transformations
from private_gpt.components.ingest.ingest_helper import IngestionHelper
from private_gpt.paths import local_data_path
from private_gpt.settings.settings import Settings
logger = logging.getLogger(__name__)
@ -62,7 +63,7 @@ class BaseIngestComponentWithIndex(BaseIngestComponent, abc.ABC):
self.show_progress = True
self._index_thread_lock = (
threading.RLock()
threading.Lock()
) # Thread lock! Not Multiprocessing lock
self._index = self._initialize_index()
@ -141,18 +142,17 @@ class SimpleIngestComponent(BaseIngestComponentWithIndex):
return documents
class MultiWorkerIngestComponent(BaseIngestComponentWithIndex):
class BatchIngestComponent(BaseIngestComponentWithIndex):
"""Parallelize the file reading and parsing on multiple CPU core.
This also makes the embeddings to be computed in batches (on GPU or CPU).
"""
BULK_INGEST_WORKER_NUM = max((os.cpu_count() or 1) - 1, 1)
def __init__(
self,
storage_context: StorageContext,
service_context: ServiceContext,
count_workers: int,
*args: Any,
**kwargs: Any,
) -> None:
@ -162,6 +162,12 @@ class MultiWorkerIngestComponent(BaseIngestComponentWithIndex):
assert (
len(self.service_context.transformations) >= 2
), "Embeddings must be in the transformations"
assert count_workers > 0, "count_workers must be > 0"
self.count_workers = count_workers
self._file_to_documents_work_pool = multiprocessing.Pool(
processes=self.count_workers
)
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
@ -173,12 +179,13 @@ class MultiWorkerIngestComponent(BaseIngestComponentWithIndex):
return self._save_docs(documents)
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
with multiprocessing.Pool(processes=self.BULK_INGEST_WORKER_NUM) as pool:
documents = list(
itertools.chain.from_iterable(
pool.starmap(IngestionHelper.transform_file_into_documents, files)
documents = list(
itertools.chain.from_iterable(
self._file_to_documents_work_pool.starmap(
IngestionHelper.transform_file_into_documents, files
)
)
)
logger.info(
"Transformed count=%s files into count=%s documents",
len(files),
@ -195,7 +202,7 @@ class MultiWorkerIngestComponent(BaseIngestComponentWithIndex):
)
# Locking the index to avoid concurrent writes
with self._index_thread_lock:
logger.debug("Inserting count=%s nodes in the index", len(nodes))
logger.info("Inserting count=%s nodes in the index", len(nodes))
self._index.insert_nodes(nodes, show_progress=True)
for document in documents:
self._index.docstore.set_document_hash(
@ -213,51 +220,43 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex):
This use the CPU and GPU in parallel (both running at the same time), and
reduce the memory pressure by not loading all the files in memory at the same time.
FIXME: this is not working as well as planned because of the usage of
the multiprocessing worker pool.
"""
BULK_INGEST_WORKER_NUM = max((os.cpu_count() or 1) - 1, 1)
def __init__(
self,
storage_context: StorageContext,
service_context: ServiceContext,
count_workers: int,
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(storage_context, service_context, *args, **kwargs)
# Make an efficient use of the CPU and GPU, the embedding
# must be in the transformations
# To make an efficient use of the CPU and GPU, the embeddings
# must be in the transformations (to be computed in batches)
assert (
len(self.service_context.transformations) >= 2
), "Embeddings must be in the transformations"
assert count_workers > 0, "count_workers must be > 0"
self.count_workers = count_workers
# We are doing our own multiprocessing
# To do not collide with the multiprocessing of huggingface, we disable it
os.environ["TOKENIZERS_PARALLELISM"] = "false"
self._ingest_work_pool = multiprocessing.pool.ThreadPool(
processes=self.count_workers
)
self._file_to_documents_work_pool = multiprocessing.Pool(
processes=self.count_workers
)
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
# FIXME there are some cases where the process is not finished
# causing deadlocks. More information using trace:
# time PGPT_PROFILES=ingest-local python -m trace --trace \
# ./scripts/ingest_folder.py ... &> ingestion.traces
with multiprocessing.Pool(processes=1) as pool:
# Running in a single (1) process to release the current
# thread, and take a dedicated CPU core for computation
a_documents = pool.apply_async(
IngestionHelper.transform_file_into_documents, (file_name, file_data)
)
while True:
# FIXME ugly hack to highlight the deadlock in traces
try:
documents = list(a_documents.get(timeout=2))
except multiprocessing.TimeoutError:
continue
break
pool.close()
pool.terminate()
# Running in a single (1) process to release the current
# thread, and take a dedicated CPU core for computation
documents = self._file_to_documents_work_pool.apply(
IngestionHelper.transform_file_into_documents, (file_name, file_data)
)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
)
@ -267,12 +266,12 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex):
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
# Lightweight threads, used for parallelize the
# underlying IO calls made in the ingestion
with multiprocessing.pool.ThreadPool(
processes=self.BULK_INGEST_WORKER_NUM
) as pool:
documents = list(
itertools.chain.from_iterable(pool.starmap(self.ingest, files))
documents = list(
itertools.chain.from_iterable(
self._ingest_work_pool.starmap(self.ingest, files)
)
)
return documents
def _save_docs(self, documents: list[Document]) -> list[Document]:
@ -284,7 +283,7 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex):
)
# Locking the index to avoid concurrent writes
with self._index_thread_lock:
logger.debug("Inserting count=%s nodes in the index", len(nodes))
logger.info("Inserting count=%s nodes in the index", len(nodes))
self._index.insert_nodes(nodes, show_progress=True)
for document in documents:
self._index.docstore.set_document_hash(
@ -295,3 +294,35 @@ class ParallelizedIngestComponent(BaseIngestComponentWithIndex):
self._save_index()
logger.debug("Persisted the index and nodes")
return documents
def __del__(self) -> None:
# We need to do the appropriate cleanup of the multiprocessing pools
# when the object is deleted. Using root logger to avoid
# the logger to be deleted before the pool
logging.debug("Closing the ingest work pool")
self._ingest_work_pool.close()
self._ingest_work_pool.join()
self._ingest_work_pool.terminate()
logging.debug("Closing the file to documents work pool")
self._file_to_documents_work_pool.close()
self._file_to_documents_work_pool.join()
self._file_to_documents_work_pool.terminate()
def get_ingestion_component(
storage_context: StorageContext,
service_context: ServiceContext,
settings: Settings,
) -> BaseIngestComponent:
"""Get the ingestion component for the given configuration."""
ingest_mode = settings.embedding.ingest_mode
if ingest_mode == "batch":
return BatchIngestComponent(
storage_context, service_context, settings.embedding.count_workers
)
elif ingest_mode == "parallel":
return ParallelizedIngestComponent(
storage_context, service_context, settings.embedding.count_workers
)
else:
return SimpleIngestComponent(storage_context, service_context)

View File

@ -11,13 +11,14 @@ from llama_index import (
from llama_index.node_parser import SentenceWindowNodeParser
from private_gpt.components.embedding.embedding_component import EmbeddingComponent
from private_gpt.components.ingest.ingest_component import SimpleIngestComponent
from private_gpt.components.ingest.ingest_component import get_ingestion_component
from private_gpt.components.llm.llm_component import LLMComponent
from private_gpt.components.node_store.node_store_component import NodeStoreComponent
from private_gpt.components.vector_store.vector_store_component import (
VectorStoreComponent,
)
from private_gpt.server.ingest.model import IngestedDoc
from private_gpt.settings.settings import settings
logger = logging.getLogger(__name__)
@ -48,8 +49,8 @@ class IngestService:
transformations=[node_parser, embedding_component.embedding_model],
)
self.ingest_component = SimpleIngestComponent(
self.storage_context, self.ingest_service_context
self.ingest_component = get_ingestion_component(
self.storage_context, self.ingest_service_context, settings=settings()
)
def ingest(self, file_name: str, file_data: Path) -> list[IngestedDoc]:

View File

@ -121,6 +121,30 @@ class LocalSettings(BaseModel):
class EmbeddingSettings(BaseModel):
mode: Literal["local", "openai", "sagemaker", "mock"]
ingest_mode: Literal["simple", "batch", "parallel"] = Field(
"simple",
description=(
"The ingest mode to use for the embedding engine:\n"
"If `simple` - ingest files sequentially and one by one. It is the historic behaviour.\n"
"If `batch` - if multiple files, parse all the files in parallel, "
"and send them in batch to the embedding model.\n"
"If `parallel` - parse the files in parallel using multiple cores, and embedd them in parallel.\n"
"`parallel` is the fastest mode for local setup, as it parallelize IO RW in the index.\n"
"For modes that leverage parallelization, you can specify the number of "
"workers to use with `count_workers`.\n"
),
)
count_workers: int = Field(
2,
description=(
"The number of workers to use for file ingestion.\n"
"In `batch` mode, this is the number of workers used to parse the files.\n"
"In `parallel` mode, this is the number of workers used to parse the files and embed them.\n"
"This is only used if `ingest_mode` is not `simple`.\n"
"Do not go too high with this number, as it might cause memory issues. (especially in `parallel` mode)\n"
"Do not set it higher than your number of threads of your CPU."
),
)
class SagemakerSettings(BaseModel):

View File

@ -1,3 +1,6 @@
# The default configuration file.
# More information about configuration can be found in the documentation: https://docs.privategpt.dev/
# Syntax in `private_pgt/settings/settings.py`
server:
env_name: ${APP_ENV:prod}
port: ${PORT:8001}
@ -26,6 +29,7 @@ llm:
embedding:
# Should be matching the value above in most cases
mode: local
ingest_mode: simple
vectorstore:
database: qdrant