217 lines
8.2 KiB
Python
217 lines
8.2 KiB
Python
import logging
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Any, AnyStr, Literal
|
|
|
|
from injector import inject, singleton
|
|
from llama_index import (
|
|
Document,
|
|
JSONReader,
|
|
ServiceContext,
|
|
StorageContext,
|
|
StringIterableReader,
|
|
VectorStoreIndex,
|
|
load_index_from_storage,
|
|
)
|
|
from llama_index.node_parser import SentenceWindowNodeParser
|
|
from llama_index.readers.file.base import DEFAULT_FILE_READER_CLS
|
|
from pydantic import BaseModel, Field
|
|
|
|
from private_gpt.components.embedding.embedding_component import EmbeddingComponent
|
|
from private_gpt.components.llm.llm_component import LLMComponent
|
|
from private_gpt.components.node_store.node_store_component import NodeStoreComponent
|
|
from private_gpt.components.vector_store.vector_store_component import (
|
|
VectorStoreComponent,
|
|
)
|
|
from private_gpt.paths import local_data_path
|
|
|
|
if TYPE_CHECKING:
|
|
from llama_index.readers.base import BaseReader
|
|
|
|
# Patching the default file reader to support other file types
|
|
FILE_READER_CLS = DEFAULT_FILE_READER_CLS.copy()
|
|
FILE_READER_CLS.update(
|
|
{
|
|
".json": JSONReader,
|
|
}
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IngestedDoc(BaseModel):
|
|
object: Literal["ingest.document"]
|
|
doc_id: str = Field(examples=["c202d5e6-7b69-4869-81cc-dd574ee8ee11"])
|
|
doc_metadata: dict[str, Any] | None = Field(
|
|
examples=[
|
|
{
|
|
"page_label": "2",
|
|
"file_name": "Sales Report Q3 2023.pdf",
|
|
}
|
|
]
|
|
)
|
|
|
|
@staticmethod
|
|
def curate_metadata(metadata: dict[str, Any]) -> dict[str, Any]:
|
|
"""Remove unwanted metadata keys."""
|
|
metadata.pop("doc_id", None)
|
|
metadata.pop("window", None)
|
|
metadata.pop("original_text", None)
|
|
return metadata
|
|
|
|
|
|
@singleton
|
|
class IngestService:
|
|
@inject
|
|
def __init__(
|
|
self,
|
|
llm_component: LLMComponent,
|
|
vector_store_component: VectorStoreComponent,
|
|
embedding_component: EmbeddingComponent,
|
|
node_store_component: NodeStoreComponent,
|
|
) -> None:
|
|
self.llm_service = llm_component
|
|
self.storage_context = StorageContext.from_defaults(
|
|
vector_store=vector_store_component.vector_store,
|
|
docstore=node_store_component.doc_store,
|
|
index_store=node_store_component.index_store,
|
|
)
|
|
self.ingest_service_context = ServiceContext.from_defaults(
|
|
llm=self.llm_service.llm,
|
|
embed_model=embedding_component.embedding_model,
|
|
node_parser=SentenceWindowNodeParser.from_defaults(),
|
|
)
|
|
|
|
def ingest(self, file_name: str, file_data: AnyStr | Path) -> list[IngestedDoc]:
|
|
logger.info("Ingesting file_name=%s", file_name)
|
|
extension = Path(file_name).suffix
|
|
reader_cls = FILE_READER_CLS.get(extension)
|
|
documents: list[Document]
|
|
if reader_cls is None:
|
|
logger.debug(
|
|
"No reader found for extension=%s, using default string reader",
|
|
extension,
|
|
)
|
|
# Read as a plain text
|
|
string_reader = StringIterableReader()
|
|
if isinstance(file_data, Path):
|
|
text = file_data.read_text()
|
|
documents = string_reader.load_data([text])
|
|
elif isinstance(file_data, bytes):
|
|
documents = string_reader.load_data([file_data.decode("utf-8")])
|
|
elif isinstance(file_data, str):
|
|
documents = string_reader.load_data([file_data])
|
|
else:
|
|
raise ValueError(f"Unsupported data type {type(file_data)}")
|
|
else:
|
|
logger.debug("Specific reader found for extension=%s", extension)
|
|
reader: BaseReader = reader_cls()
|
|
if isinstance(file_data, Path):
|
|
# Already a path, nothing to do
|
|
documents = reader.load_data(file_data)
|
|
else:
|
|
# llama-index mainly supports reading from files, so
|
|
# we have to create a tmp file to read for it to work
|
|
# delete=False to avoid a Windows 11 permission error.
|
|
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
|
try:
|
|
path_to_tmp = Path(tmp.name)
|
|
if isinstance(file_data, bytes):
|
|
path_to_tmp.write_bytes(file_data)
|
|
else:
|
|
path_to_tmp.write_text(str(file_data))
|
|
documents = reader.load_data(path_to_tmp)
|
|
finally:
|
|
path_to_tmp.unlink()
|
|
logger.info(
|
|
"Transformed file=%s into count=%s documents", file_name, len(documents)
|
|
)
|
|
for document in documents:
|
|
document.metadata["file_name"] = file_name
|
|
return self._save_docs(documents)
|
|
|
|
def _save_docs(self, documents: list[Document]) -> list[IngestedDoc]:
|
|
for document in documents:
|
|
document.metadata["doc_id"] = document.doc_id
|
|
# We don't want the Embeddings search to receive this metadata
|
|
document.excluded_embed_metadata_keys = ["doc_id"]
|
|
# We don't want the LLM to receive these metadata in the context
|
|
document.excluded_llm_metadata_keys = ["file_name", "doc_id", "page_label"]
|
|
|
|
try:
|
|
# Load the index from storage and insert new documents,
|
|
index = load_index_from_storage(
|
|
storage_context=self.storage_context,
|
|
service_context=self.ingest_service_context,
|
|
store_nodes_override=True, # Force store nodes in index and document stores
|
|
show_progress=True,
|
|
)
|
|
for doc in documents:
|
|
index.insert(doc)
|
|
except ValueError:
|
|
# Or create a new one if there is none
|
|
VectorStoreIndex.from_documents(
|
|
documents,
|
|
storage_context=self.storage_context,
|
|
service_context=self.ingest_service_context,
|
|
store_nodes_override=True, # Force store nodes in index and document stores
|
|
show_progress=True,
|
|
)
|
|
|
|
# persist the index and nodes
|
|
self.storage_context.persist(persist_dir=local_data_path)
|
|
return [
|
|
IngestedDoc(
|
|
object="ingest.document",
|
|
doc_id=document.doc_id,
|
|
doc_metadata=IngestedDoc.curate_metadata(document.metadata),
|
|
)
|
|
for document in documents
|
|
]
|
|
|
|
def list_ingested(self) -> list[IngestedDoc]:
|
|
ingested_docs = []
|
|
try:
|
|
docstore = self.storage_context.docstore
|
|
ingested_docs_ids: set[str] = set()
|
|
|
|
for node in docstore.docs.values():
|
|
if node.ref_doc_id is not None:
|
|
ingested_docs_ids.add(node.ref_doc_id)
|
|
|
|
for doc_id in ingested_docs_ids:
|
|
ref_doc_info = docstore.get_ref_doc_info(ref_doc_id=doc_id)
|
|
doc_metadata = None
|
|
if ref_doc_info is not None and ref_doc_info.metadata is not None:
|
|
doc_metadata = IngestedDoc.curate_metadata(ref_doc_info.metadata)
|
|
ingested_docs.append(
|
|
IngestedDoc(
|
|
object="ingest.document",
|
|
doc_id=doc_id,
|
|
doc_metadata=doc_metadata,
|
|
)
|
|
)
|
|
except ValueError:
|
|
logger.warning("Got an exception when getting list of docs", exc_info=True)
|
|
pass
|
|
logger.debug("Found count=%s ingested documents", len(ingested_docs))
|
|
return ingested_docs
|
|
|
|
def delete(self, doc_id: str) -> None:
|
|
"""Delete an ingested document.
|
|
|
|
:raises ValueError: if the document does not exist
|
|
"""
|
|
logger.info(
|
|
"Deleting the ingested document=%s in the doc and index store", doc_id
|
|
)
|
|
|
|
# Load the index with store_nodes_override=True to be able to delete them
|
|
index = load_index_from_storage(self.storage_context, store_nodes_override=True)
|
|
|
|
# Delete the document from the index
|
|
index.delete_ref_doc(doc_id, delete_from_docstore=True)
|
|
|
|
# Save the index
|
|
self.storage_context.persist(persist_dir=local_data_path)
|