跳转到内容

存储上下文

存储上下文 dataclass #

存储上下文。

存储上下文容器是一个用于存储节点、索引和向量的实用容器。它包含以下内容: - docstore: BaseDocumentStore - index_store: BaseIndexStore - vector_store: BasePydanticVectorStore - graph_store: GraphStore - property_graph_store: PropertyGraphStore(延迟初始化)

参数:

名称 类型 描述 默认
docstore BaseDocumentStore
required
index_store BaseIndexStore
required
vector_stores Dict[str, Annotated[BasePydanticVectorStore, SerializeAsAny]]
required
graph_store GraphStore
required
property_graph_store PropertyGraphStore | None
None
workflows/handler.py 中的源代码llama_index/core/storage/storage_context.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
@dataclass
class StorageContext:
    """
    Storage context.

    The storage context container is a utility container for storing nodes,
    indices, and vectors. It contains the following:
    - docstore: BaseDocumentStore
    - index_store: BaseIndexStore
    - vector_store: BasePydanticVectorStore
    - graph_store: GraphStore
    - property_graph_store: PropertyGraphStore (lazily initialized)

    """

    docstore: BaseDocumentStore
    index_store: BaseIndexStore
    vector_stores: Dict[str, SerializeAsAny[BasePydanticVectorStore]]
    graph_store: GraphStore
    property_graph_store: Optional[PropertyGraphStore] = None

    @classmethod
    def from_defaults(
        cls,
        docstore: Optional[BaseDocumentStore] = None,
        index_store: Optional[BaseIndexStore] = None,
        vector_store: Optional[BasePydanticVectorStore] = None,
        image_store: Optional[BasePydanticVectorStore] = None,
        vector_stores: Optional[Dict[str, BasePydanticVectorStore]] = None,
        graph_store: Optional[GraphStore] = None,
        property_graph_store: Optional[PropertyGraphStore] = None,
        persist_dir: Optional[str] = None,
        fs: Optional[fsspec.AbstractFileSystem] = None,
    ) -> "StorageContext":
        """
        Create a StorageContext from defaults.

        Args:
            docstore (Optional[BaseDocumentStore]): document store
            index_store (Optional[BaseIndexStore]): index store
            vector_store (Optional[BasePydanticVectorStore]): vector store
            graph_store (Optional[GraphStore]): graph store
            image_store (Optional[BasePydanticVectorStore]): image store

        """
        if persist_dir is None:
            docstore = docstore or SimpleDocumentStore()
            index_store = index_store or SimpleIndexStore()
            graph_store = graph_store or SimpleGraphStore()
            image_store = image_store or SimpleVectorStore()

            if vector_store:
                vector_stores = {DEFAULT_VECTOR_STORE: vector_store}
            else:
                vector_stores = vector_stores or {
                    DEFAULT_VECTOR_STORE: SimpleVectorStore()
                }
            if image_store:
                # append image store to vector stores
                vector_stores[IMAGE_VECTOR_STORE_NAMESPACE] = image_store
        else:
            docstore = docstore or SimpleDocumentStore.from_persist_dir(
                persist_dir, fs=fs
            )
            index_store = index_store or SimpleIndexStore.from_persist_dir(
                persist_dir, fs=fs
            )
            graph_store = graph_store or SimpleGraphStore.from_persist_dir(
                persist_dir, fs=fs
            )

            try:
                property_graph_store = (
                    property_graph_store
                    or SimplePropertyGraphStore.from_persist_dir(persist_dir, fs=fs)
                )
            except FileNotFoundError:
                property_graph_store = None

            if vector_store:
                vector_stores = {DEFAULT_VECTOR_STORE: vector_store}
            elif vector_stores:
                vector_stores = vector_stores
            else:
                vector_stores = SimpleVectorStore.from_namespaced_persist_dir(
                    persist_dir, fs=fs
                )
            if image_store:
                # append image store to vector stores
                vector_stores[IMAGE_VECTOR_STORE_NAMESPACE] = image_store  # type: ignore

        return cls(
            docstore=docstore,
            index_store=index_store,
            vector_stores=vector_stores,  # type: ignore
            graph_store=graph_store,
            property_graph_store=property_graph_store,
        )

    def persist(
        self,
        persist_dir: Union[str, os.PathLike] = DEFAULT_PERSIST_DIR,
        docstore_fname: str = DOCSTORE_FNAME,
        index_store_fname: str = INDEX_STORE_FNAME,
        vector_store_fname: str = VECTOR_STORE_FNAME,
        image_store_fname: str = IMAGE_STORE_FNAME,
        graph_store_fname: str = GRAPH_STORE_FNAME,
        pg_graph_store_fname: str = PG_FNAME,
        fs: Optional[fsspec.AbstractFileSystem] = None,
    ) -> None:
        """
        Persist the storage context.

        Args:
            persist_dir (str): directory to persist the storage context

        """
        if fs is not None:
            persist_dir = str(persist_dir)  # NOTE: doesn't support Windows here
            docstore_path = concat_dirs(persist_dir, docstore_fname)
            index_store_path = concat_dirs(persist_dir, index_store_fname)
            graph_store_path = concat_dirs(persist_dir, graph_store_fname)
            pg_graph_store_path = concat_dirs(persist_dir, pg_graph_store_fname)
        else:
            persist_dir = Path(persist_dir)
            docstore_path = str(persist_dir / docstore_fname)
            index_store_path = str(persist_dir / index_store_fname)
            graph_store_path = str(persist_dir / graph_store_fname)
            pg_graph_store_path = str(persist_dir / pg_graph_store_fname)

        self.docstore.persist(persist_path=docstore_path, fs=fs)
        self.index_store.persist(persist_path=index_store_path, fs=fs)
        self.graph_store.persist(persist_path=graph_store_path, fs=fs)

        if self.property_graph_store:
            self.property_graph_store.persist(persist_path=pg_graph_store_path, fs=fs)

        # save each vector store under it's namespace
        for vector_store_name, vector_store in self.vector_stores.items():
            if fs is not None:
                vector_store_path = concat_dirs(
                    str(persist_dir),
                    f"{vector_store_name}{NAMESPACE_SEP}{vector_store_fname}",
                )
            else:
                vector_store_path = str(
                    Path(persist_dir)
                    / f"{vector_store_name}{NAMESPACE_SEP}{vector_store_fname}"
                )

            vector_store.persist(persist_path=vector_store_path, fs=fs)

    def to_dict(self) -> dict:
        all_simple = (
            isinstance(self.docstore, SimpleDocumentStore)
            and isinstance(self.index_store, SimpleIndexStore)
            and isinstance(self.graph_store, SimpleGraphStore)
            and isinstance(
                self.property_graph_store, (SimplePropertyGraphStore, type(None))
            )
            and all(
                isinstance(vs, SimpleVectorStore) for vs in self.vector_stores.values()
            )
        )
        if not all_simple:
            raise ValueError(
                "to_dict only available when using simple doc/index/vector stores"
            )

        assert isinstance(self.docstore, SimpleDocumentStore)
        assert isinstance(self.index_store, SimpleIndexStore)
        assert isinstance(self.graph_store, SimpleGraphStore)
        assert isinstance(
            self.property_graph_store, (SimplePropertyGraphStore, type(None))
        )

        return {
            VECTOR_STORE_KEY: {
                key: vector_store.to_dict()
                for key, vector_store in self.vector_stores.items()
                if isinstance(vector_store, SimpleVectorStore)
            },
            DOC_STORE_KEY: self.docstore.to_dict(),
            INDEX_STORE_KEY: self.index_store.to_dict(),
            GRAPH_STORE_KEY: self.graph_store.to_dict(),
            PG_STORE_KEY: (
                self.property_graph_store.to_dict()
                if self.property_graph_store
                else None
            ),
        }

    @classmethod
    def from_dict(cls, save_dict: dict) -> "StorageContext":
        """Create a StorageContext from dict."""
        docstore = SimpleDocumentStore.from_dict(save_dict[DOC_STORE_KEY])
        index_store = SimpleIndexStore.from_dict(save_dict[INDEX_STORE_KEY])
        graph_store = SimpleGraphStore.from_dict(save_dict[GRAPH_STORE_KEY])
        property_graph_store = (
            SimplePropertyGraphStore.from_dict(save_dict[PG_STORE_KEY])
            if save_dict[PG_STORE_KEY]
            else None
        )

        vector_stores: Dict[str, BasePydanticVectorStore] = {}
        for key, vector_store_dict in save_dict[VECTOR_STORE_KEY].items():
            vector_stores[key] = SimpleVectorStore.from_dict(vector_store_dict)

        return cls(
            docstore=docstore,
            index_store=index_store,
            vector_stores=vector_stores,
            graph_store=graph_store,
            property_graph_store=property_graph_store,
        )

    @property
    def vector_store(self) -> BasePydanticVectorStore:
        """Backwrds compatibility for vector_store property."""
        return self.vector_stores[DEFAULT_VECTOR_STORE]

    def add_vector_store(
        self, vector_store: BasePydanticVectorStore, namespace: str
    ) -> None:
        """Add a vector store to the storage context."""
        self.vector_stores[namespace] = vector_store

vector_store property #

向量存储属性的向后兼容性。

from_defaults classmethod #

from_defaults(docstore: Optional[BaseDocumentStore] = None, index_store: Optional[BaseIndexStore] = None, vector_store: Optional[BasePydanticVectorStore] = None, image_store: Optional[BasePydanticVectorStore] = None, vector_stores: Optional[Dict[str, BasePydanticVectorStore]] = None, graph_store: Optional[GraphStore] = None, property_graph_store: Optional[PropertyGraphStore] = None, persist_dir: Optional[str] = None, fs: Optional[AbstractFileSystem] = None) -> StorageContext

从默认值创建一个存储上下文。

参数:

名称 类型 描述 默认
docstore Optional[BaseDocumentStore]

文档存储

None
index_store Optional[BaseIndexStore]

索引存储

None
vector_store Optional[BasePydanticVectorStore]

向量存储

None
graph_store Optional[GraphStore]

图存储

None
image_store Optional[BasePydanticVectorStore]

图像存储

None
workflows/handler.py 中的源代码llama_index/core/storage/storage_context.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
@classmethod
def from_defaults(
    cls,
    docstore: Optional[BaseDocumentStore] = None,
    index_store: Optional[BaseIndexStore] = None,
    vector_store: Optional[BasePydanticVectorStore] = None,
    image_store: Optional[BasePydanticVectorStore] = None,
    vector_stores: Optional[Dict[str, BasePydanticVectorStore]] = None,
    graph_store: Optional[GraphStore] = None,
    property_graph_store: Optional[PropertyGraphStore] = None,
    persist_dir: Optional[str] = None,
    fs: Optional[fsspec.AbstractFileSystem] = None,
) -> "StorageContext":
    """
    Create a StorageContext from defaults.

    Args:
        docstore (Optional[BaseDocumentStore]): document store
        index_store (Optional[BaseIndexStore]): index store
        vector_store (Optional[BasePydanticVectorStore]): vector store
        graph_store (Optional[GraphStore]): graph store
        image_store (Optional[BasePydanticVectorStore]): image store

    """
    if persist_dir is None:
        docstore = docstore or SimpleDocumentStore()
        index_store = index_store or SimpleIndexStore()
        graph_store = graph_store or SimpleGraphStore()
        image_store = image_store or SimpleVectorStore()

        if vector_store:
            vector_stores = {DEFAULT_VECTOR_STORE: vector_store}
        else:
            vector_stores = vector_stores or {
                DEFAULT_VECTOR_STORE: SimpleVectorStore()
            }
        if image_store:
            # append image store to vector stores
            vector_stores[IMAGE_VECTOR_STORE_NAMESPACE] = image_store
    else:
        docstore = docstore or SimpleDocumentStore.from_persist_dir(
            persist_dir, fs=fs
        )
        index_store = index_store or SimpleIndexStore.from_persist_dir(
            persist_dir, fs=fs
        )
        graph_store = graph_store or SimpleGraphStore.from_persist_dir(
            persist_dir, fs=fs
        )

        try:
            property_graph_store = (
                property_graph_store
                or SimplePropertyGraphStore.from_persist_dir(persist_dir, fs=fs)
            )
        except FileNotFoundError:
            property_graph_store = None

        if vector_store:
            vector_stores = {DEFAULT_VECTOR_STORE: vector_store}
        elif vector_stores:
            vector_stores = vector_stores
        else:
            vector_stores = SimpleVectorStore.from_namespaced_persist_dir(
                persist_dir, fs=fs
            )
        if image_store:
            # append image store to vector stores
            vector_stores[IMAGE_VECTOR_STORE_NAMESPACE] = image_store  # type: ignore

    return cls(
        docstore=docstore,
        index_store=index_store,
        vector_stores=vector_stores,  # type: ignore
        graph_store=graph_store,
        property_graph_store=property_graph_store,
    )

persist #

persist(persist_dir: Union[str, PathLike] = DEFAULT_PERSIST_DIR, docstore_fname: str = DEFAULT_PERSIST_FNAME, index_store_fname: str = DEFAULT_PERSIST_FNAME, vector_store_fname: str = DEFAULT_PERSIST_FNAME, image_store_fname: str = IMAGE_STORE_FNAME, graph_store_fname: str = DEFAULT_PERSIST_FNAME, pg_graph_store_fname: str = DEFAULT_PG_PERSIST_FNAME, fs: Optional[AbstractFileSystem] = None) -> None

持久化存储上下文。

参数:

名称 类型 描述 默认
persist_dir str

持久化存储上下文的目录

DEFAULT_PERSIST_DIR
workflows/handler.py 中的源代码llama_index/core/storage/storage_context.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def persist(
    self,
    persist_dir: Union[str, os.PathLike] = DEFAULT_PERSIST_DIR,
    docstore_fname: str = DOCSTORE_FNAME,
    index_store_fname: str = INDEX_STORE_FNAME,
    vector_store_fname: str = VECTOR_STORE_FNAME,
    image_store_fname: str = IMAGE_STORE_FNAME,
    graph_store_fname: str = GRAPH_STORE_FNAME,
    pg_graph_store_fname: str = PG_FNAME,
    fs: Optional[fsspec.AbstractFileSystem] = None,
) -> None:
    """
    Persist the storage context.

    Args:
        persist_dir (str): directory to persist the storage context

    """
    if fs is not None:
        persist_dir = str(persist_dir)  # NOTE: doesn't support Windows here
        docstore_path = concat_dirs(persist_dir, docstore_fname)
        index_store_path = concat_dirs(persist_dir, index_store_fname)
        graph_store_path = concat_dirs(persist_dir, graph_store_fname)
        pg_graph_store_path = concat_dirs(persist_dir, pg_graph_store_fname)
    else:
        persist_dir = Path(persist_dir)
        docstore_path = str(persist_dir / docstore_fname)
        index_store_path = str(persist_dir / index_store_fname)
        graph_store_path = str(persist_dir / graph_store_fname)
        pg_graph_store_path = str(persist_dir / pg_graph_store_fname)

    self.docstore.persist(persist_path=docstore_path, fs=fs)
    self.index_store.persist(persist_path=index_store_path, fs=fs)
    self.graph_store.persist(persist_path=graph_store_path, fs=fs)

    if self.property_graph_store:
        self.property_graph_store.persist(persist_path=pg_graph_store_path, fs=fs)

    # save each vector store under it's namespace
    for vector_store_name, vector_store in self.vector_stores.items():
        if fs is not None:
            vector_store_path = concat_dirs(
                str(persist_dir),
                f"{vector_store_name}{NAMESPACE_SEP}{vector_store_fname}",
            )
        else:
            vector_store_path = str(
                Path(persist_dir)
                / f"{vector_store_name}{NAMESPACE_SEP}{vector_store_fname}"
            )

        vector_store.persist(persist_path=vector_store_path, fs=fs)

from_dict classmethod #

from_dict(save_dict: dict) -> StorageContext

从字典创建存储上下文。

workflows/handler.py 中的源代码llama_index/core/storage/storage_context.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@classmethod
def from_dict(cls, save_dict: dict) -> "StorageContext":
    """Create a StorageContext from dict."""
    docstore = SimpleDocumentStore.from_dict(save_dict[DOC_STORE_KEY])
    index_store = SimpleIndexStore.from_dict(save_dict[INDEX_STORE_KEY])
    graph_store = SimpleGraphStore.from_dict(save_dict[GRAPH_STORE_KEY])
    property_graph_store = (
        SimplePropertyGraphStore.from_dict(save_dict[PG_STORE_KEY])
        if save_dict[PG_STORE_KEY]
        else None
    )

    vector_stores: Dict[str, BasePydanticVectorStore] = {}
    for key, vector_store_dict in save_dict[VECTOR_STORE_KEY].items():
        vector_stores[key] = SimpleVectorStore.from_dict(vector_store_dict)

    return cls(
        docstore=docstore,
        index_store=index_store,
        vector_stores=vector_stores,
        graph_store=graph_store,
        property_graph_store=property_graph_store,
    )

add_vector_store #

add_vector_store(vector_store: BasePydanticVectorStore, namespace: str) -> None

向存储上下文添加向量存储。

workflows/handler.py 中的源代码llama_index/core/storage/storage_context.py
273
274
275
276
277
def add_vector_store(
    self, vector_store: BasePydanticVectorStore, namespace: str
) -> None:
    """Add a vector store to the storage context."""
    self.vector_stores[namespace] = vector_store

选项: 成员:- StorageContext

LlamaIndex 的顶层导入。

响应 dataclass #

响应对象。

如果 streaming=False 则返回。

参数:

名称 类型 描述 默认
response str | None
required
source_nodes List[NodeWithScore]

内置可变序列。

如果未提供参数,构造函数将创建一个新的空列表。 如果指定了参数,则它必须是一个可迭代对象。

<dynamic>
metadata Dict[str, Any] | None
None

属性:

名称 类型 描述
response

响应文本。

workflows/handler.py 中的源代码llama_index/core/base/response/schema.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@dataclass
class Response:
    """
    Response object.

    Returned if streaming=False.

    Attributes:
        response: The response text.

    """

    response: Optional[str]
    source_nodes: List[NodeWithScore] = field(default_factory=list)
    metadata: Optional[Dict[str, Any]] = None

    def __str__(self) -> str:
        """Convert to string representation."""
        return self.response or "None"

    def get_formatted_sources(self, length: int = 100) -> str:
        """Get formatted sources text."""
        texts = []
        for source_node in self.source_nodes:
            fmt_text_chunk = truncate_text(source_node.node.get_content(), length)
            doc_id = source_node.node.node_id or "None"
            source_text = f"> Source (Doc id: {doc_id}): {fmt_text_chunk}"
            texts.append(source_text)
        return "\n\n".join(texts)

get_formatted_sources #

get_formatted_sources(length: int = 100) -> str

获取格式化后的源文本。

workflows/handler.py 中的源代码llama_index/core/base/response/schema.py
34
35
36
37
38
39
40
41
42
def get_formatted_sources(self, length: int = 100) -> str:
    """Get formatted sources text."""
    texts = []
    for source_node in self.source_nodes:
        fmt_text_chunk = truncate_text(source_node.node.get_content(), length)
        doc_id = source_node.node.node_id or "None"
        source_text = f"> Source (Doc id: {doc_id}): {fmt_text_chunk}"
        texts.append(source_text)
    return "\n\n".join(texts)

索引结构类型 #

Bases: str, Enum

索引结构类型。用于标识索引的“类型”。

属性:

名称 类型 描述
TREE tree

树状索引。请参阅 :ref:Ref-Indices-Tree 了解树状索引。

LIST list

摘要索引。请参阅 :ref:Ref-Indices-List 了解摘要索引。

KEYWORD_TABLE keyword_table

关键词表索引。请参阅 :ref:Ref-Indices-Table 获取关键词表索引。

DICT dict

Faiss向量存储索引。有关Faiss向量存储索引的更多信息,请参阅 :ref:Ref-Indices-VectorStore

SIMPLE_DICT simple_dict

简单向量存储索引。更多关于简单向量存储索引的信息请参阅 :ref:Ref-Indices-VectorStore

WEAVIATE weaviate

Weaviate 向量存储索引。 更多关于 Weaviate 向量存储索引的信息,请参阅:Ref-Indices-VectorStore

PINECONE pinecone

Pinecone 向量存储索引。 更多关于 Pinecone 向量存储索引的信息,请参阅:Ref-Indices-VectorStore

DEEPLAKE deeplake

DeepLake 向量存储索引。 有关 Pinecone 向量存储索引的更多信息,请参阅:Ref-Indices-VectorStore

QDRANT qdrant

Qdrant向量存储索引。 请参阅:Ref-Indices-VectorStore 获取有关Qdrant向量存储索引的更多信息。

LANCEDB lancedb

LanceDB 向量存储索引 请参阅:Ref-Indices-VectorStore 获取有关 LanceDB 向量存储索引的更多信息。

MILVUS milvus

Milvus向量存储索引。 更多关于Milvus向量存储索引的信息,请参阅:Ref-Indices-VectorStore

CHROMA chroma

Chroma 向量存储索引。 请参阅 :ref:Ref-Indices-VectorStore 获取有关 Chroma 向量存储索引的更多信息。

OPENSEARCH opensearch

Opensearch 向量存储索引。 请参阅 :ref:Ref-Indices-VectorStore 获取有关 Opensearch 向量存储索引的更多信息。

MYSCALE myscale

MyScale 向量存储索引。 更多关于 MyScale 向量存储索引的信息,请参阅:Ref-Indices-VectorStore

CLICKHOUSE clickhouse

ClickHouse 向量存储索引。 请参阅 :ref:Ref-Indices-VectorStore 获取有关 ClickHouse 向量存储索引的更多信息。

EPSILLA epsilla

Epsilla 向量存储索引。 更多关于 Epsilla 向量存储索引的信息,请参阅:Ref-Indices-VectorStore

CHATGPT_RETRIEVAL_PLUGIN chatgpt_retrieval_plugin

ChatGPT检索插件索引。

SQL SQL

SQL结构化存储索引。 有关SQL向量存储索引的更多信息,请参阅:Ref-Indices-StructStore

DASHVECTOR dashvector

DashVector 向量存储索引。 更多关于 DashVector 向量存储索引的信息,请参阅:Ref-Indices-VectorStore

KG kg

知识图谱索引。 请参阅 :ref:Ref-Indices-Knowledge-Graph 获取知识图谱索引。

DOCUMENT_SUMMARY document_summary

文档摘要索引。 请参阅 :ref:Ref-Indices-Document-Summary 了解摘要索引。

workflows/handler.py 中的源代码llama_index/core/data_structs/struct_type.py
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class IndexStructType(str, Enum):
    """
    Index struct type. Identifier for a "type" of index.

    Attributes:
        TREE ("tree"): Tree index. See :ref:`Ref-Indices-Tree` for tree indices.
        LIST ("list"): Summary index. See :ref:`Ref-Indices-List` for summary indices.
        KEYWORD_TABLE ("keyword_table"): Keyword table index. See
            :ref:`Ref-Indices-Table`
            for keyword table indices.
        DICT ("dict"): Faiss Vector Store Index. See
            :ref:`Ref-Indices-VectorStore`
            for more information on the faiss vector store index.
        SIMPLE_DICT ("simple_dict"): Simple Vector Store Index. See
            :ref:`Ref-Indices-VectorStore`
            for more information on the simple vector store index.
        WEAVIATE ("weaviate"): Weaviate Vector Store Index.
            See :ref:`Ref-Indices-VectorStore`
            for more information on the Weaviate vector store index.
        PINECONE ("pinecone"): Pinecone Vector Store Index.
            See :ref:`Ref-Indices-VectorStore`
            for more information on the Pinecone vector store index.
        DEEPLAKE ("deeplake"): DeepLake Vector Store Index.
            See :ref:`Ref-Indices-VectorStore`
            for more information on the Pinecone vector store index.
        QDRANT ("qdrant"): Qdrant Vector Store Index.
            See :ref:`Ref-Indices-VectorStore`
            for more information on the Qdrant vector store index.
        LANCEDB ("lancedb"): LanceDB Vector Store Index
            See :ref:`Ref-Indices-VectorStore`
            for more information on the LanceDB vector store index.
        MILVUS ("milvus"): Milvus Vector Store Index.
            See :ref:`Ref-Indices-VectorStore`
            for more information on the Milvus vector store index.
        CHROMA ("chroma"): Chroma Vector Store Index.
            See :ref:`Ref-Indices-VectorStore`
            for more information on the Chroma vector store index.
        OPENSEARCH ("opensearch"): Opensearch Vector Store Index.
            See :ref:`Ref-Indices-VectorStore`
            for more information on the Opensearch vector store index.
        MYSCALE ("myscale"): MyScale Vector Store Index.
            See :ref:`Ref-Indices-VectorStore`
            for more information on the MyScale vector store index.
        CLICKHOUSE ("clickhouse"): ClickHouse Vector Store Index.
            See :ref:`Ref-Indices-VectorStore`
            for more information on the ClickHouse vector store index.
        EPSILLA ("epsilla"): Epsilla Vector Store Index.
            See :ref:`Ref-Indices-VectorStore`
            for more information on the Epsilla vector store index.
        CHATGPT_RETRIEVAL_PLUGIN ("chatgpt_retrieval_plugin"): ChatGPT
            retrieval plugin index.
        SQL ("SQL"): SQL Structured Store Index.
            See :ref:`Ref-Indices-StructStore`
            for more information on the SQL vector store index.
        DASHVECTOR ("dashvector"): DashVector Vector Store Index.
            See :ref:`Ref-Indices-VectorStore`
            for more information on the Dashvecotor vector store index.
        KG ("kg"): Knowledge Graph index.
            See :ref:`Ref-Indices-Knowledge-Graph` for KG indices.
        DOCUMENT_SUMMARY ("document_summary"): Document Summary Index.
            See :ref:`Ref-Indices-Document-Summary` for Summary Indices.

    """

    # TODO: refactor so these are properties on the base class

    NODE = "node"
    TREE = "tree"
    LIST = "list"
    KEYWORD_TABLE = "keyword_table"

    # faiss
    DICT = "dict"
    # simple
    SIMPLE_DICT = "simple_dict"
    WEAVIATE = "weaviate"
    PINECONE = "pinecone"
    QDRANT = "qdrant"
    LANCEDB = "lancedb"
    MILVUS = "milvus"
    CHROMA = "chroma"
    MYSCALE = "myscale"
    CLICKHOUSE = "clickhouse"
    VECTOR_STORE = "vector_store"
    OPENSEARCH = "opensearch"
    DASHVECTOR = "dashvector"
    CHATGPT_RETRIEVAL_PLUGIN = "chatgpt_retrieval_plugin"
    DEEPLAKE = "deeplake"
    EPSILLA = "epsilla"
    # multimodal
    MULTIMODAL_VECTOR_STORE = "multimodal"
    # for SQL index
    SQL = "sql"
    # for KG index
    KG = "kg"
    SIMPLE_KG = "simple_kg"
    SIMPLE_LPG = "simple_lpg"
    NEBULAGRAPH = "nebulagraph"
    FALKORDB = "falkordb"

    # EMPTY
    EMPTY = "empty"
    COMPOSITE = "composite"

    PANDAS = "pandas"

    DOCUMENT_SUMMARY = "document_summary"

    # Managed
    VECTARA = "vectara"
    POSTGRESML = "postgresml"

模拟嵌入 #

基类:EventBaseEmbedding

模拟嵌入。

用于令牌预测。

参数:

名称 类型 描述 默认
embed_dim int

嵌入维度

required
workflows/handler.py 中的源代码llama_index/core/embeddings/mock_embed_model.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class MockEmbedding(BaseEmbedding):
    """
    Mock embedding.

    Used for token prediction.

    Args:
        embed_dim (int): embedding dimension

    """

    embed_dim: int

    def __init__(self, embed_dim: int, **kwargs: Any) -> None:
        """Init params."""
        super().__init__(embed_dim=embed_dim, **kwargs)

    @classmethod
    def class_name(cls) -> str:
        return "MockEmbedding"

    def _get_vector(self) -> List[float]:
        return [0.5] * self.embed_dim

    async def _aget_text_embedding(self, text: str) -> List[float]:
        return self._get_vector()

    async def _aget_query_embedding(self, query: str) -> List[float]:
        return self._get_vector()

    def _get_query_embedding(self, query: str) -> List[float]:
        """Get query embedding."""
        return self._get_vector()

    def _get_text_embedding(self, text: str) -> List[float]:
        """Get text embedding."""
        return self._get_vector()

可组合图 #

可组合图。

workflows/handler.py 中的源代码llama_index/core/indices/composability/graph.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class ComposableGraph:
    """Composable graph."""

    def __init__(
        self,
        all_indices: Dict[str, BaseIndex],
        root_id: str,
        storage_context: Optional[StorageContext] = None,
    ) -> None:
        """Init params."""
        self._all_indices = all_indices
        self._root_id = root_id
        self.storage_context = storage_context

    @property
    def root_id(self) -> str:
        return self._root_id

    @property
    def all_indices(self) -> Dict[str, BaseIndex]:
        return self._all_indices

    @property
    def root_index(self) -> BaseIndex:
        return self._all_indices[self._root_id]

    @property
    def index_struct(self) -> IndexStruct:
        return self._all_indices[self._root_id].index_struct

    @classmethod
    def from_indices(
        cls,
        root_index_cls: Type[BaseIndex],
        children_indices: Sequence[BaseIndex],
        index_summaries: Optional[Sequence[str]] = None,
        storage_context: Optional[StorageContext] = None,
        **kwargs: Any,
    ) -> "ComposableGraph":  # type: ignore
        """Create composable graph using this index class as the root."""
        from llama_index.core import Settings

        with Settings.callback_manager.as_trace("graph_construction"):
            if index_summaries is None:
                for index in children_indices:
                    if index.index_struct.summary is None:
                        raise ValueError(
                            "Summary must be set for children indices. "
                            "If the index does a summary "
                            "(through index.index_struct.summary), then "
                            "it must be specified with then `index_summaries` "
                            "argument in this function. We will support "
                            "automatically setting the summary in the future."
                        )
                index_summaries = [
                    index.index_struct.summary for index in children_indices
                ]
            else:
                # set summaries for each index
                for index, summary in zip(children_indices, index_summaries):
                    index.index_struct.summary = summary

            if len(children_indices) != len(index_summaries):
                raise ValueError("indices and index_summaries must have same length!")

            # construct index nodes
            index_nodes = []
            for index, summary in zip(children_indices, index_summaries):
                assert isinstance(index.index_struct, IndexStruct)
                index_node = IndexNode(
                    text=summary,
                    index_id=index.index_id,
                    relationships={
                        NodeRelationship.SOURCE: RelatedNodeInfo(
                            node_id=index.index_id, node_type=ObjectType.INDEX
                        )
                    },
                )
                index_nodes.append(index_node)

            # construct root index
            root_index = root_index_cls(
                nodes=index_nodes,
                storage_context=storage_context,
                **kwargs,
            )
            # type: ignore
            all_indices: List[BaseIndex] = [
                *cast(List[BaseIndex], children_indices),
                root_index,
            ]

            return cls(
                all_indices={index.index_id: index for index in all_indices},
                root_id=root_index.index_id,
                storage_context=storage_context,
            )

    def get_index(self, index_struct_id: Optional[str] = None) -> BaseIndex:
        """Get index from index struct id."""
        if index_struct_id is None:
            index_struct_id = self._root_id
        return self._all_indices[index_struct_id]

    def as_query_engine(self, **kwargs: Any) -> BaseQueryEngine:
        # NOTE: lazy import
        from llama_index.core.query_engine.graph_query_engine import (
            ComposableGraphQueryEngine,
        )

        return ComposableGraphQueryEngine(self, **kwargs)

from_indices classmethod #

from_indices(root_index_cls: Type[BaseIndex], children_indices: Sequence[BaseIndex], index_summaries: Optional[Sequence[str]] = None, storage_context: Optional[StorageContext] = None, **kwargs: Any) -> ComposableGraph

使用此索引类作为根节点创建可组合图。

workflows/handler.py 中的源代码llama_index/core/indices/composability/graph.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@classmethod
def from_indices(
    cls,
    root_index_cls: Type[BaseIndex],
    children_indices: Sequence[BaseIndex],
    index_summaries: Optional[Sequence[str]] = None,
    storage_context: Optional[StorageContext] = None,
    **kwargs: Any,
) -> "ComposableGraph":  # type: ignore
    """Create composable graph using this index class as the root."""
    from llama_index.core import Settings

    with Settings.callback_manager.as_trace("graph_construction"):
        if index_summaries is None:
            for index in children_indices:
                if index.index_struct.summary is None:
                    raise ValueError(
                        "Summary must be set for children indices. "
                        "If the index does a summary "
                        "(through index.index_struct.summary), then "
                        "it must be specified with then `index_summaries` "
                        "argument in this function. We will support "
                        "automatically setting the summary in the future."
                    )
            index_summaries = [
                index.index_struct.summary for index in children_indices
            ]
        else:
            # set summaries for each index
            for index, summary in zip(children_indices, index_summaries):
                index.index_struct.summary = summary

        if len(children_indices) != len(index_summaries):
            raise ValueError("indices and index_summaries must have same length!")

        # construct index nodes
        index_nodes = []
        for index, summary in zip(children_indices, index_summaries):
            assert isinstance(index.index_struct, IndexStruct)
            index_node = IndexNode(
                text=summary,
                index_id=index.index_id,
                relationships={
                    NodeRelationship.SOURCE: RelatedNodeInfo(
                        node_id=index.index_id, node_type=ObjectType.INDEX
                    )
                },
            )
            index_nodes.append(index_node)

        # construct root index
        root_index = root_index_cls(
            nodes=index_nodes,
            storage_context=storage_context,
            **kwargs,
        )
        # type: ignore
        all_indices: List[BaseIndex] = [
            *cast(List[BaseIndex], children_indices),
            root_index,
        ]

        return cls(
            all_indices={index.index_id: index for index in all_indices},
            root_id=root_index.index_id,
            storage_context=storage_context,
        )

get_index #

get_index(index_struct_id: Optional[str] = None) -> BaseIndex

根据索引结构ID获取索引。

workflows/handler.py 中的源代码llama_index/core/indices/composability/graph.py
115
116
117
118
119
def get_index(self, index_struct_id: Optional[str] = None) -> BaseIndex:
    """Get index from index struct id."""
    if index_struct_id is None:
        index_struct_id = self._root_id
    return self._all_indices[index_struct_id]

文档摘要索引 #

基类:EventBaseIndex[IndexDocumentSummary]

文档摘要索引。

参数:

名称 类型 描述 默认
response_synthesizer BaseSynthesizer

一个用于生成摘要的响应合成器。

None
summary_query str

用于为每个文档生成摘要的查询。

DEFAULT_SUMMARY_QUERY
show_progress bool

是否显示tqdm进度条。 默认为 False。

False
embed_summaries bool

是否嵌入摘要。 运行基于嵌入的默认检索器时需要此项。 默认为 True。

True
workflows/handler.py 中的源代码llama_index/core/indices/document_summary/base.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
class DocumentSummaryIndex(BaseIndex[IndexDocumentSummary]):
    """
    Document Summary Index.

    Args:
        response_synthesizer (BaseSynthesizer): A response synthesizer for generating
            summaries.
        summary_query (str): The query to use to generate the summary for each document.
        show_progress (bool): Whether to show tqdm progress bars.
            Defaults to False.
        embed_summaries (bool): Whether to embed the summaries.
            This is required for running the default embedding-based retriever.
            Defaults to True.

    """

    index_struct_cls = IndexDocumentSummary

    def __init__(
        self,
        nodes: Optional[Sequence[BaseNode]] = None,
        objects: Optional[Sequence[IndexNode]] = None,
        index_struct: Optional[IndexDocumentSummary] = None,
        llm: Optional[LLM] = None,
        embed_model: Optional[BaseEmbedding] = None,
        storage_context: Optional[StorageContext] = None,
        response_synthesizer: Optional[BaseSynthesizer] = None,
        summary_query: str = DEFAULT_SUMMARY_QUERY,
        show_progress: bool = False,
        embed_summaries: bool = True,
        **kwargs: Any,
    ) -> None:
        """Initialize params."""
        self._llm = llm or Settings.llm
        self._embed_model = embed_model or Settings.embed_model
        self._response_synthesizer = response_synthesizer or get_response_synthesizer(
            llm=self._llm, response_mode=ResponseMode.TREE_SUMMARIZE
        )
        self._summary_query = summary_query
        self._embed_summaries = embed_summaries

        super().__init__(
            nodes=nodes,
            index_struct=index_struct,
            storage_context=storage_context,
            show_progress=show_progress,
            objects=objects,
            **kwargs,
        )

    @property
    def vector_store(self) -> BasePydanticVectorStore:
        return self._vector_store

    def as_retriever(
        self,
        retriever_mode: Union[str, _RetrieverMode] = _RetrieverMode.EMBEDDING,
        **kwargs: Any,
    ) -> BaseRetriever:
        """
        Get retriever.

        Args:
            retriever_mode (Union[str, DocumentSummaryRetrieverMode]): A retriever mode.
                Defaults to DocumentSummaryRetrieverMode.EMBEDDING.

        """
        from llama_index.core.indices.document_summary.retrievers import (
            DocumentSummaryIndexEmbeddingRetriever,
            DocumentSummaryIndexLLMRetriever,
        )

        LLMRetriever = DocumentSummaryIndexLLMRetriever
        EmbeddingRetriever = DocumentSummaryIndexEmbeddingRetriever

        if retriever_mode == _RetrieverMode.EMBEDDING:
            if not self._embed_summaries:
                raise ValueError(
                    "Cannot use embedding retriever if embed_summaries is False"
                )

            return EmbeddingRetriever(
                self,
                object_map=self._object_map,
                embed_model=self._embed_model,
                **kwargs,
            )
        if retriever_mode == _RetrieverMode.LLM:
            return LLMRetriever(
                self, object_map=self._object_map, llm=self._llm, **kwargs
            )
        else:
            raise ValueError(f"Unknown retriever mode: {retriever_mode}")

    def get_document_summary(self, doc_id: str) -> str:
        """
        Get document summary by doc id.

        Args:
            doc_id (str): A document id.

        """
        if doc_id not in self._index_struct.doc_id_to_summary_id:
            raise ValueError(f"doc_id {doc_id} not in index")
        summary_id = self._index_struct.doc_id_to_summary_id[doc_id]
        return self.docstore.get_node(summary_id).get_content()

    def _add_nodes_to_index(
        self,
        index_struct: IndexDocumentSummary,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
    ) -> None:
        """Add nodes to index."""
        doc_id_to_nodes = defaultdict(list)
        for node in nodes:
            if node.ref_doc_id is None:
                raise ValueError(
                    "ref_doc_id of node cannot be None when building a document "
                    "summary index"
                )
            doc_id_to_nodes[node.ref_doc_id].append(node)

        summary_node_dict = {}
        items = doc_id_to_nodes.items()
        iterable_with_progress = get_tqdm_iterable(
            items, show_progress, "Summarizing documents"
        )

        for doc_id, nodes in iterable_with_progress:
            print(f"current doc id: {doc_id}")
            nodes_with_scores = [NodeWithScore(node=n) for n in nodes]
            # get the summary for each doc_id
            summary_response = self._response_synthesizer.synthesize(
                query=self._summary_query,
                nodes=nodes_with_scores,
            )
            summary_response = cast(Response, summary_response)
            docid_first_node = doc_id_to_nodes.get(doc_id, [TextNode()])[0]
            summary_node_dict[doc_id] = TextNode(
                text=summary_response.response,
                relationships={
                    NodeRelationship.SOURCE: RelatedNodeInfo(node_id=doc_id)
                },
                metadata=docid_first_node.metadata,
                excluded_embed_metadata_keys=docid_first_node.excluded_embed_metadata_keys,
                excluded_llm_metadata_keys=docid_first_node.excluded_llm_metadata_keys,
            )
            self.docstore.add_documents([summary_node_dict[doc_id]])
            logger.info(
                f"> Generated summary for doc {doc_id}: {summary_response.response}"
            )

        for doc_id, nodes in doc_id_to_nodes.items():
            index_struct.add_summary_and_nodes(summary_node_dict[doc_id], nodes)

        if self._embed_summaries:
            summary_nodes = list(summary_node_dict.values())
            id_to_embed_map = embed_nodes(
                summary_nodes, self._embed_model, show_progress=show_progress
            )

            summary_nodes_with_embedding = []
            for node in summary_nodes:
                node_with_embedding = node.model_copy()
                node_with_embedding.embedding = id_to_embed_map[node.node_id]
                summary_nodes_with_embedding.append(node_with_embedding)
            self._vector_store.add(summary_nodes_with_embedding)

    def _build_index_from_nodes(
        self,
        nodes: Sequence[BaseNode],
        **build_kwargs: Any,
    ) -> IndexDocumentSummary:
        """Build index from nodes."""
        # first get doc_id to nodes_dict, generate a summary for each doc_id,
        # then build the index struct
        index_struct = IndexDocumentSummary()
        self._add_nodes_to_index(index_struct, nodes, self._show_progress)
        return index_struct

    def _insert(self, nodes: Sequence[BaseNode], **insert_kwargs: Any) -> None:
        """Insert a document."""
        self._add_nodes_to_index(self._index_struct, nodes)

    def _delete_node(self, node_id: str, **delete_kwargs: Any) -> None:
        pass

    def delete_nodes(
        self,
        node_ids: List[str],
        delete_from_docstore: bool = False,
        **delete_kwargs: Any,
    ) -> None:
        """
        Delete a list of nodes from the index.

        Args:
            node_ids (List[str]): A list of node_ids from the nodes to delete

        """
        index_nodes = self._index_struct.node_id_to_summary_id.keys()
        for node in node_ids:
            if node not in index_nodes:
                logger.warning(f"node_id {node} not found, will not be deleted.")
                node_ids.remove(node)

        self._index_struct.delete_nodes(node_ids)

        remove_summary_ids = [
            summary_id
            for summary_id in self._index_struct.summary_id_to_node_ids
            if len(self._index_struct.summary_id_to_node_ids[summary_id]) == 0
        ]

        remove_docs = [
            doc_id
            for doc_id in self._index_struct.doc_id_to_summary_id
            if self._index_struct.doc_id_to_summary_id[doc_id] in remove_summary_ids
        ]

        for doc_id in remove_docs:
            self.delete_ref_doc(doc_id)

    def delete_ref_doc(
        self, ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any
    ) -> None:
        """
        Delete a document from the index.
        All nodes in the index related to the document will be deleted.
        """
        ref_doc_info = self.docstore.get_ref_doc_info(ref_doc_id)
        if ref_doc_info is None:
            logger.warning(f"ref_doc_id {ref_doc_id} not found, nothing deleted.")
            return
        self._index_struct.delete(ref_doc_id)
        self._vector_store.delete(ref_doc_id)

        if delete_from_docstore:
            self.docstore.delete_ref_doc(ref_doc_id, raise_error=False)

        self._storage_context.index_store.add_index_struct(self._index_struct)

    @property
    def ref_doc_info(self) -> Dict[str, RefDocInfo]:
        """Retrieve a dict mapping of ingested documents and their nodes+metadata."""
        ref_doc_ids = list(self._index_struct.doc_id_to_summary_id.keys())

        all_ref_doc_info = {}
        for ref_doc_id in ref_doc_ids:
            ref_doc_info = self.docstore.get_ref_doc_info(ref_doc_id)
            if not ref_doc_info:
                continue

            all_ref_doc_info[ref_doc_id] = ref_doc_info
        return all_ref_doc_info

ref_doc_info property #

ref_doc_info: Dict[str, RefDocInfo]

检索已摄取文档及其节点+元数据的字典映射。

as_retriever #

as_retriever(retriever_mode: Union[str, _RetrieverMode] = EMBEDDING, **kwargs: Any) -> BaseRetriever

获取检索器。

参数:

名称 类型 描述 默认
retriever_mode Union[str, DocumentSummaryRetrieverMode]

检索器模式。 默认为 DocumentSummaryRetrieverMode.EMBEDDING。

EMBEDDING
workflows/handler.py 中的源代码llama_index/core/indices/document_summary/base.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def as_retriever(
    self,
    retriever_mode: Union[str, _RetrieverMode] = _RetrieverMode.EMBEDDING,
    **kwargs: Any,
) -> BaseRetriever:
    """
    Get retriever.

    Args:
        retriever_mode (Union[str, DocumentSummaryRetrieverMode]): A retriever mode.
            Defaults to DocumentSummaryRetrieverMode.EMBEDDING.

    """
    from llama_index.core.indices.document_summary.retrievers import (
        DocumentSummaryIndexEmbeddingRetriever,
        DocumentSummaryIndexLLMRetriever,
    )

    LLMRetriever = DocumentSummaryIndexLLMRetriever
    EmbeddingRetriever = DocumentSummaryIndexEmbeddingRetriever

    if retriever_mode == _RetrieverMode.EMBEDDING:
        if not self._embed_summaries:
            raise ValueError(
                "Cannot use embedding retriever if embed_summaries is False"
            )

        return EmbeddingRetriever(
            self,
            object_map=self._object_map,
            embed_model=self._embed_model,
            **kwargs,
        )
    if retriever_mode == _RetrieverMode.LLM:
        return LLMRetriever(
            self, object_map=self._object_map, llm=self._llm, **kwargs
        )
    else:
        raise ValueError(f"Unknown retriever mode: {retriever_mode}")

get_document_summary #

get_document_summary(doc_id: str) -> str

根据文档ID获取文档摘要。

参数:

名称 类型 描述 默认
doc_id str

一个文档标识符。

required
workflows/handler.py 中的源代码llama_index/core/indices/document_summary/base.py
152
153
154
155
156
157
158
159
160
161
162
163
def get_document_summary(self, doc_id: str) -> str:
    """
    Get document summary by doc id.

    Args:
        doc_id (str): A document id.

    """
    if doc_id not in self._index_struct.doc_id_to_summary_id:
        raise ValueError(f"doc_id {doc_id} not in index")
    summary_id = self._index_struct.doc_id_to_summary_id[doc_id]
    return self.docstore.get_node(summary_id).get_content()

delete_nodes #

delete_nodes(node_ids: List[str], delete_from_docstore: bool = False, **delete_kwargs: Any) -> None

从索引中删除节点列表。

参数:

名称 类型 描述 默认
node_ids List[str]

要删除的节点对应的节点ID列表

required
workflows/handler.py 中的源代码llama_index/core/indices/document_summary/base.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def delete_nodes(
    self,
    node_ids: List[str],
    delete_from_docstore: bool = False,
    **delete_kwargs: Any,
) -> None:
    """
    Delete a list of nodes from the index.

    Args:
        node_ids (List[str]): A list of node_ids from the nodes to delete

    """
    index_nodes = self._index_struct.node_id_to_summary_id.keys()
    for node in node_ids:
        if node not in index_nodes:
            logger.warning(f"node_id {node} not found, will not be deleted.")
            node_ids.remove(node)

    self._index_struct.delete_nodes(node_ids)

    remove_summary_ids = [
        summary_id
        for summary_id in self._index_struct.summary_id_to_node_ids
        if len(self._index_struct.summary_id_to_node_ids[summary_id]) == 0
    ]

    remove_docs = [
        doc_id
        for doc_id in self._index_struct.doc_id_to_summary_id
        if self._index_struct.doc_id_to_summary_id[doc_id] in remove_summary_ids
    ]

    for doc_id in remove_docs:
        self.delete_ref_doc(doc_id)

delete_ref_doc #

delete_ref_doc(ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any) -> None

从索引中删除文档。 索引中与该文档相关的所有节点将被删除。

workflows/handler.py 中的源代码llama_index/core/indices/document_summary/base.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def delete_ref_doc(
    self, ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any
) -> None:
    """
    Delete a document from the index.
    All nodes in the index related to the document will be deleted.
    """
    ref_doc_info = self.docstore.get_ref_doc_info(ref_doc_id)
    if ref_doc_info is None:
        logger.warning(f"ref_doc_id {ref_doc_id} not found, nothing deleted.")
        return
    self._index_struct.delete(ref_doc_id)
    self._vector_store.delete(ref_doc_id)

    if delete_from_docstore:
        self.docstore.delete_ref_doc(ref_doc_id, raise_error=False)

    self._storage_context.index_store.add_index_struct(self._index_struct)

关键词表索引 #

基类:EventBaseKeywordTableIndex

关键词表索引。

该索引使用GPT模型从文本中提取关键词。

workflows/handler.py 中的源代码llama_index/core/indices/keyword_table/base.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
class KeywordTableIndex(BaseKeywordTableIndex):
    """
    Keyword Table Index.

    This index uses a GPT model to extract keywords from the text.

    """

    def _extract_keywords(self, text: str) -> Set[str]:
        """Extract keywords from text."""
        response = self._llm.predict(
            self.keyword_extract_template,
            text=text,
        )
        return extract_keywords_given_response(response, start_token="KEYWORDS:")

    async def _async_extract_keywords(self, text: str) -> Set[str]:
        """Extract keywords from text."""
        response = await self._llm.apredict(
            self.keyword_extract_template,
            text=text,
        )
        return extract_keywords_given_response(response, start_token="KEYWORDS:")

知识图谱索引 #

基类:EventBaseIndex[KG]

知识图谱索引。

通过提取三元组构建知识图谱,并在查询时利用该知识图谱。

参数:

名称 类型 描述 默认
kg_triplet_extract_template BasePromptTemplate

用于提取三元组的提示词。

None
max_triplets_per_chunk int

要提取的三元组最大数量。

10
storage_context Optional[StorageContext]

要使用的存储上下文。

None
graph_store Optional[GraphStore]

要使用的图存储。

required
show_progress bool

是否显示tqdm进度条。默认为False。

False
include_embeddings bool

是否在索引中包含嵌入向量。 默认为 False。

False
max_object_length int

三元组中对象的最大长度。 默认为128。

128
kg_triplet_extract_fn Optional[Callable]

用于提取三元组的函数。默认为 None。

None
workflows/handler.py 中的源代码llama_index/core/indices/knowledge_graph/base.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
@deprecated.deprecated(
    version="0.10.53",
    reason=(
        "The KnowledgeGraphIndex class has been deprecated. "
        "Please use the new PropertyGraphIndex class instead. "
        "If a certain graph store integration is missing in the new class, "
        "please open an issue on the GitHub repository or contribute it!"
    ),
)
class KnowledgeGraphIndex(BaseIndex[KG]):
    """
    Knowledge Graph Index.

    Build a KG by extracting triplets, and leveraging the KG during query-time.

    Args:
        kg_triplet_extract_template (BasePromptTemplate): The prompt to use for
            extracting triplets.
        max_triplets_per_chunk (int): The maximum number of triplets to extract.
        storage_context (Optional[StorageContext]): The storage context to use.
        graph_store (Optional[GraphStore]): The graph store to use.
        show_progress (bool): Whether to show tqdm progress bars. Defaults to False.
        include_embeddings (bool): Whether to include embeddings in the index.
            Defaults to False.
        max_object_length (int): The maximum length of the object in a triplet.
            Defaults to 128.
        kg_triplet_extract_fn (Optional[Callable]): The function to use for
            extracting triplets. Defaults to None.

    """

    index_struct_cls = KG

    def __init__(
        self,
        nodes: Optional[Sequence[BaseNode]] = None,
        objects: Optional[Sequence[IndexNode]] = None,
        index_struct: Optional[KG] = None,
        llm: Optional[LLM] = None,
        embed_model: Optional[BaseEmbedding] = None,
        storage_context: Optional[StorageContext] = None,
        kg_triplet_extract_template: Optional[BasePromptTemplate] = None,
        max_triplets_per_chunk: int = 10,
        include_embeddings: bool = False,
        show_progress: bool = False,
        max_object_length: int = 128,
        kg_triplet_extract_fn: Optional[Callable] = None,
        **kwargs: Any,
    ) -> None:
        """Initialize params."""
        # need to set parameters before building index in base class.
        self.include_embeddings = include_embeddings
        self.max_triplets_per_chunk = max_triplets_per_chunk
        self.kg_triplet_extract_template = (
            kg_triplet_extract_template or DEFAULT_KG_TRIPLET_EXTRACT_PROMPT
        )
        # NOTE: Partially format keyword extract template here.
        self.kg_triplet_extract_template = (
            self.kg_triplet_extract_template.partial_format(
                max_knowledge_triplets=self.max_triplets_per_chunk
            )
        )
        self._max_object_length = max_object_length
        self._kg_triplet_extract_fn = kg_triplet_extract_fn

        self._llm = llm or Settings.llm
        self._embed_model = embed_model or Settings.embed_model

        super().__init__(
            nodes=nodes,
            index_struct=index_struct,
            storage_context=storage_context,
            show_progress=show_progress,
            objects=objects,
            **kwargs,
        )

        # TODO: legacy conversion - remove in next release
        if (
            len(self.index_struct.table) > 0
            and isinstance(self.graph_store, SimpleGraphStore)
            and len(self.graph_store._data.graph_dict) == 0
        ):
            logger.warning("Upgrading previously saved KG index to new storage format.")
            self.graph_store._data.graph_dict = self.index_struct.rel_map

    @property
    def graph_store(self) -> GraphStore:
        return self._graph_store

    def as_retriever(
        self,
        retriever_mode: Optional[str] = None,
        embed_model: Optional[BaseEmbedding] = None,
        **kwargs: Any,
    ) -> BaseRetriever:
        from llama_index.core.indices.knowledge_graph.retrievers import (
            KGRetrieverMode,
            KGTableRetriever,
        )

        if len(self.index_struct.embedding_dict) > 0 and retriever_mode is None:
            retriever_mode = KGRetrieverMode.HYBRID
        elif retriever_mode is None:
            retriever_mode = KGRetrieverMode.KEYWORD
        elif isinstance(retriever_mode, str):
            retriever_mode = KGRetrieverMode(retriever_mode)
        else:
            retriever_mode = retriever_mode

        return KGTableRetriever(
            self,
            object_map=self._object_map,
            llm=self._llm,
            embed_model=embed_model or self._embed_model,
            retriever_mode=retriever_mode,
            **kwargs,
        )

    def _extract_triplets(self, text: str) -> List[Tuple[str, str, str]]:
        if self._kg_triplet_extract_fn is not None:
            return self._kg_triplet_extract_fn(text)
        else:
            return self._llm_extract_triplets(text)

    def _llm_extract_triplets(self, text: str) -> List[Tuple[str, str, str]]:
        """Extract keywords from text."""
        response = self._llm.predict(
            self.kg_triplet_extract_template,
            text=text,
        )
        return self._parse_triplet_response(
            response, max_length=self._max_object_length
        )

    @staticmethod
    def _parse_triplet_response(
        response: str, max_length: int = 128
    ) -> List[Tuple[str, str, str]]:
        knowledge_strs = response.strip().split("\n")
        results = []
        for text in knowledge_strs:
            if "(" not in text or ")" not in text or text.index(")") < text.index("("):
                # skip empty lines and non-triplets
                continue
            triplet_part = text[text.index("(") + 1 : text.index(")")]
            tokens = triplet_part.split(",")
            if len(tokens) != 3:
                continue

            if any(len(s.encode("utf-8")) > max_length for s in tokens):
                # We count byte-length instead of len() for UTF-8 chars,
                # will skip if any of the tokens are too long.
                # This is normally due to a poorly formatted triplet
                # extraction, in more serious KG building cases
                # we'll need NLP models to better extract triplets.
                continue

            subj, pred, obj = map(str.strip, tokens)
            if not subj or not pred or not obj:
                # skip partial triplets
                continue

            # Strip double quotes and Capitalize triplets for disambiguation
            subj, pred, obj = (
                entity.strip('"').capitalize() for entity in [subj, pred, obj]
            )

            results.append((subj, pred, obj))
        return results

    def _build_index_from_nodes(
        self, nodes: Sequence[BaseNode], **build_kwargs: Any
    ) -> KG:
        """Build the index from nodes."""
        # do simple concatenation
        index_struct = self.index_struct_cls()
        nodes_with_progress = get_tqdm_iterable(
            nodes, self._show_progress, "Processing nodes"
        )
        for n in nodes_with_progress:
            triplets = self._extract_triplets(
                n.get_content(metadata_mode=MetadataMode.LLM)
            )
            logger.debug(f"> Extracted triplets: {triplets}")
            for triplet in triplets:
                subj, _, obj = triplet
                self.upsert_triplet(triplet)
                index_struct.add_node([subj, obj], n)

            if self.include_embeddings:
                triplet_texts = [str(t) for t in triplets]

                embed_outputs = self._embed_model.get_text_embedding_batch(
                    triplet_texts, show_progress=self._show_progress
                )
                for rel_text, rel_embed in zip(triplet_texts, embed_outputs):
                    index_struct.add_to_embedding_dict(rel_text, rel_embed)

        return index_struct

    def _insert(self, nodes: Sequence[BaseNode], **insert_kwargs: Any) -> None:
        """Insert a document."""
        for n in nodes:
            triplets = self._extract_triplets(
                n.get_content(metadata_mode=MetadataMode.LLM)
            )
            logger.debug(f"Extracted triplets: {triplets}")
            for triplet in triplets:
                subj, _, obj = triplet
                triplet_str = str(triplet)
                self.upsert_triplet(triplet)
                self._index_struct.add_node([subj, obj], n)
                if (
                    self.include_embeddings
                    and triplet_str not in self._index_struct.embedding_dict
                ):
                    rel_embedding = self._embed_model.get_text_embedding(triplet_str)
                    self._index_struct.add_to_embedding_dict(triplet_str, rel_embedding)

        # Update the storage context's index_store
        self._storage_context.index_store.add_index_struct(self._index_struct)

    def upsert_triplet(
        self, triplet: Tuple[str, str, str], include_embeddings: bool = False
    ) -> None:
        """
        Insert triplets and optionally embeddings.

        Used for manual insertion of KG triplets (in the form
        of (subject, relationship, object)).

        Args:
            triplet (tuple): Knowledge triplet
            embedding (Any, optional): Embedding option for the triplet. Defaults to None.

        """
        self._graph_store.upsert_triplet(*triplet)
        triplet_str = str(triplet)
        if include_embeddings:
            set_embedding = self._embed_model.get_text_embedding(triplet_str)
            self._index_struct.add_to_embedding_dict(str(triplet), set_embedding)
            self._storage_context.index_store.add_index_struct(self._index_struct)

    def add_node(self, keywords: List[str], node: BaseNode) -> None:
        """
        Add node.

        Used for manual insertion of nodes (keyed by keywords).

        Args:
            keywords (List[str]): Keywords to index the node.
            node (Node): Node to be indexed.

        """
        self._index_struct.add_node(keywords, node)
        self._docstore.add_documents([node], allow_update=True)

    def upsert_triplet_and_node(
        self,
        triplet: Tuple[str, str, str],
        node: BaseNode,
        include_embeddings: bool = False,
    ) -> None:
        """
        Upsert KG triplet and node.

        Calls both upsert_triplet and add_node.
        Behavior is idempotent; if Node already exists,
        only triplet will be added.

        Args:
            keywords (List[str]): Keywords to index the node.
            node (Node): Node to be indexed.
            include_embeddings (bool): Option to add embeddings for triplets. Defaults to False

        """
        subj, _, obj = triplet
        self.upsert_triplet(triplet)
        self.add_node([subj, obj], node)
        triplet_str = str(triplet)
        if include_embeddings:
            set_embedding = self._embed_model.get_text_embedding(triplet_str)
            self._index_struct.add_to_embedding_dict(str(triplet), set_embedding)
            self._storage_context.index_store.add_index_struct(self._index_struct)

    def _delete_node(self, node_id: str, **delete_kwargs: Any) -> None:
        """Delete a node."""
        raise NotImplementedError("Delete is not supported for KG index yet.")

    @property
    def ref_doc_info(self) -> Dict[str, RefDocInfo]:
        """Retrieve a dict mapping of ingested documents and their nodes+metadata."""
        node_doc_ids_sets = list(self._index_struct.table.values())
        node_doc_ids = list(set().union(*node_doc_ids_sets))
        nodes = self.docstore.get_nodes(node_doc_ids)

        all_ref_doc_info = {}
        for node in nodes:
            ref_node = node.source_node
            if not ref_node:
                continue

            ref_doc_info = self.docstore.get_ref_doc_info(ref_node.node_id)
            if not ref_doc_info:
                continue

            all_ref_doc_info[ref_node.node_id] = ref_doc_info
        return all_ref_doc_info

    def get_networkx_graph(self, limit: int = 100) -> Any:
        """
        Get networkx representation of the graph structure.

        Args:
            limit (int): Number of starting nodes to be included in the graph.

        NOTE: This function requires networkx to be installed.
        NOTE: This is a beta feature.

        """
        try:
            import networkx as nx
        except ImportError:
            raise ImportError(
                "Please install networkx to visualize the graph: `pip install networkx`"
            )

        g = nx.Graph()
        subjs = list(self.index_struct.table.keys())

        # add edges
        rel_map = self._graph_store.get_rel_map(subjs=subjs, depth=1, limit=limit)

        added_nodes = set()
        for keyword in rel_map:
            for path in rel_map[keyword]:
                subj = keyword
                for i in range(0, len(path), 2):
                    if i + 2 >= len(path):
                        break

                    if subj not in added_nodes:
                        g.add_node(subj)
                        added_nodes.add(subj)

                    rel = path[i + 1]
                    obj = path[i + 2]

                    g.add_edge(subj, obj, label=rel, title=rel)
                    subj = obj
        return g

    @property
    def query_context(self) -> Dict[str, Any]:
        return {GRAPH_STORE_KEY: self._graph_store}

ref_doc_info property #

ref_doc_info: Dict[str, RefDocInfo]

检索已摄取文档及其节点+元数据的字典映射。

upsert_triplet #

upsert_triplet(triplet: Tuple[str, str, str], include_embeddings: bool = False) -> None

插入三元组及可选的嵌入向量。

用于手动插入知识图谱三元组(形式为(主体,关系,客体))。

参数:

名称 类型 描述 默认
triplet tuple

知识三元组

required
embedding Any

三元组的嵌入选项。默认为 None。

required
workflows/handler.py 中的源代码llama_index/core/indices/knowledge_graph/base.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def upsert_triplet(
    self, triplet: Tuple[str, str, str], include_embeddings: bool = False
) -> None:
    """
    Insert triplets and optionally embeddings.

    Used for manual insertion of KG triplets (in the form
    of (subject, relationship, object)).

    Args:
        triplet (tuple): Knowledge triplet
        embedding (Any, optional): Embedding option for the triplet. Defaults to None.

    """
    self._graph_store.upsert_triplet(*triplet)
    triplet_str = str(triplet)
    if include_embeddings:
        set_embedding = self._embed_model.get_text_embedding(triplet_str)
        self._index_struct.add_to_embedding_dict(str(triplet), set_embedding)
        self._storage_context.index_store.add_index_struct(self._index_struct)

add_node #

add_node(keywords: List[str], node: BaseNode) -> None

添加节点。

用于手动插入节点(通过关键词索引)。

参数:

名称 类型 描述 默认
keywords List[str]

用于索引节点的关键词。

required
node Node

待索引的节点。

required
workflows/handler.py 中的源代码llama_index/core/indices/knowledge_graph/base.py
277
278
279
280
281
282
283
284
285
286
287
288
289
def add_node(self, keywords: List[str], node: BaseNode) -> None:
    """
    Add node.

    Used for manual insertion of nodes (keyed by keywords).

    Args:
        keywords (List[str]): Keywords to index the node.
        node (Node): Node to be indexed.

    """
    self._index_struct.add_node(keywords, node)
    self._docstore.add_documents([node], allow_update=True)

upsert_triplet_and_node #

upsert_triplet_and_node(triplet: Tuple[str, str, str], node: BaseNode, include_embeddings: bool = False) -> None

更新知识图谱三元组与节点。

同时调用 upsert_triplet 和 add_node。 该行为是幂等的;如果节点已存在, 则仅会添加三元组。

参数:

名称 类型 描述 默认
keywords List[str]

用于索引节点的关键词。

required
node Node

待索引的节点。

required
include_embeddings bool

为三元组添加嵌入的选项。默认为 False

False
workflows/handler.py 中的源代码llama_index/core/indices/knowledge_graph/base.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def upsert_triplet_and_node(
    self,
    triplet: Tuple[str, str, str],
    node: BaseNode,
    include_embeddings: bool = False,
) -> None:
    """
    Upsert KG triplet and node.

    Calls both upsert_triplet and add_node.
    Behavior is idempotent; if Node already exists,
    only triplet will be added.

    Args:
        keywords (List[str]): Keywords to index the node.
        node (Node): Node to be indexed.
        include_embeddings (bool): Option to add embeddings for triplets. Defaults to False

    """
    subj, _, obj = triplet
    self.upsert_triplet(triplet)
    self.add_node([subj, obj], node)
    triplet_str = str(triplet)
    if include_embeddings:
        set_embedding = self._embed_model.get_text_embedding(triplet_str)
        self._index_struct.add_to_embedding_dict(str(triplet), set_embedding)
        self._storage_context.index_store.add_index_struct(self._index_struct)

get_networkx_graph #

get_networkx_graph(limit: int = 100) -> Any

获取图结构的networkx表示。

参数:

名称 类型 描述 默认
limit int

图中包含的起始节点数量。

100

注意:此功能需要安装 networkx。 注意:这是一个测试版功能。

workflows/handler.py 中的源代码llama_index/core/indices/knowledge_graph/base.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def get_networkx_graph(self, limit: int = 100) -> Any:
    """
    Get networkx representation of the graph structure.

    Args:
        limit (int): Number of starting nodes to be included in the graph.

    NOTE: This function requires networkx to be installed.
    NOTE: This is a beta feature.

    """
    try:
        import networkx as nx
    except ImportError:
        raise ImportError(
            "Please install networkx to visualize the graph: `pip install networkx`"
        )

    g = nx.Graph()
    subjs = list(self.index_struct.table.keys())

    # add edges
    rel_map = self._graph_store.get_rel_map(subjs=subjs, depth=1, limit=limit)

    added_nodes = set()
    for keyword in rel_map:
        for path in rel_map[keyword]:
            subj = keyword
            for i in range(0, len(path), 2):
                if i + 2 >= len(path):
                    break

                if subj not in added_nodes:
                    g.add_node(subj)
                    added_nodes.add(subj)

                rel = path[i + 1]
                obj = path[i + 2]

                g.add_edge(subj, obj, label=rel, title=rel)
                subj = obj
    return g

属性图索引 #

基类:EventBaseIndex[IndexLPG]

属性图的索引。

参数:

名称 类型 描述 默认
nodes Optional[Sequence[BaseNode]]

要插入索引的节点列表。

None
llm Optional[大语言模型]

用于提取三元组的语言模型。默认为 Settings.llm

None
kg_extractors Optional[List[TransformComponent]]

应用于节点以提取三元组的转换列表。 默认为 [SimpleLLMPathExtractor(llm=llm), ImplicitEdgeExtractor()]

None
property_graph_store Optional[PropertyGraphStore]

要使用的属性图存储。如果未提供,将创建一个新的 SimplePropertyGraphStore

None
vector_store Optional[BasePydanticVectorStore]

如果图存储不支持向量查询,则使用的向量存储索引。

None
use_async bool

是否对转换使用异步模式。默认为 True

True
embed_model Optional[EmbedType]

用于嵌入节点的嵌入模型。 如果未提供,当embed_kg_nodes=True时将使用Settings.embed_model

None
embed_kg_nodes bool

是否嵌入知识图谱节点。默认为 True

True
callback_manager Optional[CallbackManager]

要使用的回调管理器。

None
transformations Optional[List[TransformComponent]]

在将节点插入索引之前应用的一系列转换。 这些转换在 kg_extractors 之前应用。

None
storage_context Optional[StorageContext]

要使用的存储上下文。

None
show_progress bool

是否显示转换进度条。默认为 False

False
workflows/handler.py 中的源代码llama_index/core/indices/property_graph/base.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
class PropertyGraphIndex(BaseIndex[IndexLPG]):
    """
    An index for a property graph.

    Args:
        nodes (Optional[Sequence[BaseNode]]):
            A list of nodes to insert into the index.
        llm (Optional[LLM]):
            The language model to use for extracting triplets. Defaults to `Settings.llm`.
        kg_extractors (Optional[List[TransformComponent]]):
            A list of transformations to apply to the nodes to extract triplets.
            Defaults to `[SimpleLLMPathExtractor(llm=llm), ImplicitEdgeExtractor()]`.
        property_graph_store (Optional[PropertyGraphStore]):
            The property graph store to use. If not provided, a new `SimplePropertyGraphStore` will be created.
        vector_store (Optional[BasePydanticVectorStore]):
            The vector store index to use, if the graph store does not support vector queries.
        use_async (bool):
            Whether to use async for transformations. Defaults to `True`.
        embed_model (Optional[EmbedType]):
            The embedding model to use for embedding nodes.
            If not provided, `Settings.embed_model` will be used if `embed_kg_nodes=True`.
        embed_kg_nodes (bool):
            Whether to embed the KG nodes. Defaults to `True`.
        callback_manager (Optional[CallbackManager]):
            The callback manager to use.
        transformations (Optional[List[TransformComponent]]):
            A list of transformations to apply to the nodes before inserting them into the index.
            These are applied prior to the `kg_extractors`.
        storage_context (Optional[StorageContext]):
            The storage context to use.
        show_progress (bool):
            Whether to show progress bars for transformations. Defaults to `False`.

    """

    index_struct_cls = IndexLPG

    def __init__(
        self,
        nodes: Optional[Sequence[BaseNode]] = None,
        llm: Optional[LLM] = None,
        kg_extractors: Optional[List[TransformComponent]] = None,
        property_graph_store: Optional[PropertyGraphStore] = None,
        # vector related params
        vector_store: Optional[BasePydanticVectorStore] = None,
        use_async: bool = True,
        embed_model: Optional[EmbedType] = None,
        embed_kg_nodes: bool = True,
        # parent class params
        callback_manager: Optional[CallbackManager] = None,
        transformations: Optional[List[TransformComponent]] = None,
        storage_context: Optional[StorageContext] = None,
        show_progress: bool = False,
        **kwargs: Any,
    ) -> None:
        """Init params."""
        storage_context = storage_context or StorageContext.from_defaults(
            property_graph_store=property_graph_store
        )

        # lazily initialize the graph store on the storage context
        if property_graph_store is not None:
            storage_context.property_graph_store = property_graph_store
        elif storage_context.property_graph_store is None:
            storage_context.property_graph_store = SimplePropertyGraphStore()

        if vector_store is not None:
            storage_context.vector_stores[DEFAULT_VECTOR_STORE] = vector_store

        if embed_kg_nodes and (
            storage_context.property_graph_store.supports_vector_queries
            or embed_kg_nodes
        ):
            self._embed_model = (
                resolve_embed_model(embed_model)
                if embed_model
                else Settings.embed_model
            )
        else:
            self._embed_model = None  # type: ignore

        self._kg_extractors = kg_extractors or [
            SimpleLLMPathExtractor(llm=llm or Settings.llm),
            ImplicitPathExtractor(),
        ]
        self._use_async = use_async
        self._llm = llm
        self._embed_kg_nodes = embed_kg_nodes
        self._override_vector_store = (
            vector_store is not None
            or not storage_context.property_graph_store.supports_vector_queries
        )

        super().__init__(
            nodes=nodes,
            callback_manager=callback_manager,
            storage_context=storage_context,
            transformations=transformations,
            show_progress=show_progress,
            **kwargs,
        )

    @classmethod
    def from_existing(
        cls: Type["PropertyGraphIndex"],
        property_graph_store: PropertyGraphStore,
        vector_store: Optional[BasePydanticVectorStore] = None,
        # general params
        llm: Optional[LLM] = None,
        kg_extractors: Optional[List[TransformComponent]] = None,
        # vector related params
        use_async: bool = True,
        embed_model: Optional[EmbedType] = None,
        embed_kg_nodes: bool = True,
        # parent class params
        callback_manager: Optional[CallbackManager] = None,
        transformations: Optional[List[TransformComponent]] = None,
        storage_context: Optional[StorageContext] = None,
        show_progress: bool = False,
        **kwargs: Any,
    ) -> "PropertyGraphIndex":
        """Create an index from an existing property graph store (and optional vector store)."""
        return cls(
            nodes=[],  # no nodes to insert
            property_graph_store=property_graph_store,
            vector_store=vector_store,
            llm=llm,
            kg_extractors=kg_extractors,
            use_async=use_async,
            embed_model=embed_model,
            embed_kg_nodes=embed_kg_nodes,
            callback_manager=callback_manager,
            transformations=transformations,
            storage_context=storage_context,
            show_progress=show_progress,
            **kwargs,
        )

    @property
    def property_graph_store(self) -> PropertyGraphStore:
        """Get the labelled property graph store."""
        assert self.storage_context.property_graph_store is not None

        return self.storage_context.property_graph_store

    @property
    def vector_store(self) -> Optional[BasePydanticVectorStore]:
        if self._embed_kg_nodes and self._override_vector_store:
            return self.storage_context.vector_store
        else:
            return None

    def _insert_nodes(self, nodes: Sequence[BaseNode]) -> Sequence[BaseNode]:
        """Insert nodes to the index struct."""
        if len(nodes) == 0:
            return nodes

        # run transformations on nodes to extract triplets
        if self._use_async:
            nodes = asyncio.run(
                arun_transformations(
                    nodes, self._kg_extractors, show_progress=self._show_progress
                )
            )
        else:
            nodes = run_transformations(
                nodes, self._kg_extractors, show_progress=self._show_progress
            )

        # ensure all nodes have nodes and/or relations in metadata
        assert all(
            node.metadata.get(KG_NODES_KEY) is not None
            or node.metadata.get(KG_RELATIONS_KEY) is not None
            for node in nodes
        )

        kg_nodes_to_insert: List[LabelledNode] = []
        kg_rels_to_insert: List[Relation] = []
        for node in nodes:
            # remove nodes and relations from metadata
            kg_nodes = node.metadata.pop(KG_NODES_KEY, [])
            kg_rels = node.metadata.pop(KG_RELATIONS_KEY, [])

            # add source id to properties
            for kg_node in kg_nodes:
                kg_node.properties[TRIPLET_SOURCE_KEY] = node.id_
            for kg_rel in kg_rels:
                kg_rel.properties[TRIPLET_SOURCE_KEY] = node.id_

            # add nodes and relations to insert lists
            kg_nodes_to_insert.extend(kg_nodes)
            kg_rels_to_insert.extend(kg_rels)

        # filter out duplicate kg nodes
        kg_node_ids = {node.id for node in kg_nodes_to_insert}
        existing_kg_nodes = self.property_graph_store.get(ids=list(kg_node_ids))
        existing_kg_node_ids = {node.id for node in existing_kg_nodes}
        kg_nodes_to_insert = [
            node for node in kg_nodes_to_insert if node.id not in existing_kg_node_ids
        ]

        # filter out duplicate llama nodes
        existing_nodes = self.property_graph_store.get_llama_nodes(
            [node.id_ for node in nodes]
        )
        existing_node_hashes = {node.hash for node in existing_nodes}
        nodes = [node for node in nodes if node.hash not in existing_node_hashes]

        # embed nodes (if needed)
        if self._embed_kg_nodes:
            # embed llama-index nodes
            node_texts = [
                node.get_content(metadata_mode=MetadataMode.EMBED) for node in nodes
            ]

            if self._use_async:
                embeddings = asyncio.run(
                    self._embed_model.aget_text_embedding_batch(
                        node_texts, show_progress=self._show_progress
                    )
                )
            else:
                embeddings = self._embed_model.get_text_embedding_batch(
                    node_texts, show_progress=self._show_progress
                )

            for node, embedding in zip(nodes, embeddings):
                node.embedding = embedding

            # embed kg nodes
            kg_node_texts = [str(kg_node) for kg_node in kg_nodes_to_insert]

            if self._use_async:
                kg_embeddings = asyncio.run(
                    self._embed_model.aget_text_embedding_batch(
                        kg_node_texts, show_progress=self._show_progress
                    )
                )
            else:
                kg_embeddings = self._embed_model.get_text_embedding_batch(
                    kg_node_texts,
                    show_progress=self._show_progress,
                )

            for kg_node, embedding in zip(kg_nodes_to_insert, kg_embeddings):
                kg_node.embedding = embedding

        # if graph store doesn't support vectors, or the vector index was provided, use it
        if self.vector_store is not None and len(kg_nodes_to_insert) > 0:
            self._insert_nodes_to_vector_index(kg_nodes_to_insert)

        if len(nodes) > 0:
            self.property_graph_store.upsert_llama_nodes(nodes)

        if len(kg_nodes_to_insert) > 0:
            self.property_graph_store.upsert_nodes(kg_nodes_to_insert)

        # important: upsert relations after nodes
        if len(kg_rels_to_insert) > 0:
            self.property_graph_store.upsert_relations(kg_rels_to_insert)

        # refresh schema if needed
        if self.property_graph_store.supports_structured_queries:
            self.property_graph_store.get_schema(refresh=True)

        return nodes

    def _insert_nodes_to_vector_index(self, nodes: List[LabelledNode]) -> None:
        """Insert vector nodes."""
        assert self.vector_store is not None

        llama_nodes: List[TextNode] = []
        for node in nodes:
            if node.embedding is not None:
                llama_nodes.append(
                    TextNode(
                        text=str(node),
                        metadata={VECTOR_SOURCE_KEY: node.id, **node.properties},
                        embedding=[*node.embedding],
                    )
                )
                if not self.vector_store.stores_text:
                    llama_nodes[-1].id_ = node.id

            # clear the embedding to save memory, its not used now
            node.embedding = None

        self.vector_store.add(llama_nodes)

    def _build_index_from_nodes(
        self, nodes: Optional[Sequence[BaseNode]], **build_kwargs: Any
    ) -> IndexLPG:
        """Build index from nodes."""
        nodes = self._insert_nodes(nodes or [])

        # this isn't really used or needed
        return IndexLPG()

    def as_retriever(
        self,
        sub_retrievers: Optional[List["BasePGRetriever"]] = None,
        include_text: bool = True,
        **kwargs: Any,
    ) -> BaseRetriever:
        """
        Return a retriever for the index.

        Args:
            sub_retrievers (Optional[List[BasePGRetriever]]):
                A list of sub-retrievers to use. If not provided, a default list will be used:
                `[LLMSynonymRetriever, VectorContextRetriever]` if the graph store supports vector queries.
            include_text (bool):
                Whether to include source-text in the retriever results.
            **kwargs:
                Additional kwargs to pass to the retriever.

        """
        from llama_index.core.indices.property_graph.retriever import (
            PGRetriever,
        )
        from llama_index.core.indices.property_graph.sub_retrievers.vector import (
            VectorContextRetriever,
        )
        from llama_index.core.indices.property_graph.sub_retrievers.llm_synonym import (
            LLMSynonymRetriever,
        )

        if sub_retrievers is None:
            sub_retrievers = [
                LLMSynonymRetriever(
                    graph_store=self.property_graph_store,
                    include_text=include_text,
                    llm=self._llm,
                    **kwargs,
                ),
            ]

            if self._embed_model and (
                self.property_graph_store.supports_vector_queries or self.vector_store
            ):
                sub_retrievers.append(
                    VectorContextRetriever(
                        graph_store=self.property_graph_store,
                        vector_store=self.vector_store,
                        include_text=include_text,
                        embed_model=self._embed_model,
                        **kwargs,
                    )
                )

        return PGRetriever(sub_retrievers, use_async=self._use_async, **kwargs)

    def _delete_node(self, node_id: str, **delete_kwargs: Any) -> None:
        """Delete a node."""
        self.property_graph_store.delete(ids=[node_id])

    def _insert(self, nodes: Sequence[BaseNode], **insert_kwargs: Any) -> None:
        """Index-specific logic for inserting nodes to the index struct."""
        self._insert_nodes(nodes)

    @property
    def ref_doc_info(self) -> Dict[str, RefDocInfo]:
        """Retrieve a dict mapping of ingested documents and their nodes+metadata."""
        raise NotImplementedError(
            "Ref doc info not implemented for PropertyGraphIndex. "
            "All inserts are already upserts."
        )

property_graph_store property #

property_graph_store: PropertyGraphStore

获取带标签的属性图存储。

ref_doc_info property #

ref_doc_info: Dict[str, RefDocInfo]

检索已摄取文档及其节点+元数据的字典映射。

from_existing classmethod #

from_existing(property_graph_store: PropertyGraphStore, vector_store: Optional[BasePydanticVectorStore] = None, llm: Optional[大语言模型] = None, kg_extractors: Optional[List[TransformComponent]] = None, use_async: bool = True, embed_model: Optional[EmbedType] = None, embed_kg_nodes: bool = True, callback_manager: Optional[CallbackManager] = None, transformations: Optional[List[TransformComponent]] = None, storage_context: Optional[StorageContext] = None, show_progress: bool = False, **kwargs: Any) -> PropertyGraphIndex

从现有的属性图存储(及可选的向量存储)创建索引。

workflows/handler.py 中的源代码llama_index/core/indices/property_graph/base.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
@classmethod
def from_existing(
    cls: Type["PropertyGraphIndex"],
    property_graph_store: PropertyGraphStore,
    vector_store: Optional[BasePydanticVectorStore] = None,
    # general params
    llm: Optional[LLM] = None,
    kg_extractors: Optional[List[TransformComponent]] = None,
    # vector related params
    use_async: bool = True,
    embed_model: Optional[EmbedType] = None,
    embed_kg_nodes: bool = True,
    # parent class params
    callback_manager: Optional[CallbackManager] = None,
    transformations: Optional[List[TransformComponent]] = None,
    storage_context: Optional[StorageContext] = None,
    show_progress: bool = False,
    **kwargs: Any,
) -> "PropertyGraphIndex":
    """Create an index from an existing property graph store (and optional vector store)."""
    return cls(
        nodes=[],  # no nodes to insert
        property_graph_store=property_graph_store,
        vector_store=vector_store,
        llm=llm,
        kg_extractors=kg_extractors,
        use_async=use_async,
        embed_model=embed_model,
        embed_kg_nodes=embed_kg_nodes,
        callback_manager=callback_manager,
        transformations=transformations,
        storage_context=storage_context,
        show_progress=show_progress,
        **kwargs,
    )

as_retriever #

as_retriever(sub_retrievers: Optional[List[BasePGRetriever]] = None, include_text: bool = True, **kwargs: Any) -> BaseRetriever

返回索引的检索器。

参数:

名称 类型 描述 默认
sub_retrievers Optional[List[BasePGRetriever]]

要使用的子检索器列表。如果未提供,将使用默认列表:如果图存储支持向量查询,则使用 [LLMSynonymRetriever, VectorContextRetriever]

None
include_text bool

是否在检索器结果中包含源文本。

True
**kwargs Any

传递给检索器的额外关键字参数。

{}
workflows/handler.py 中的源代码llama_index/core/indices/property_graph/base.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def as_retriever(
    self,
    sub_retrievers: Optional[List["BasePGRetriever"]] = None,
    include_text: bool = True,
    **kwargs: Any,
) -> BaseRetriever:
    """
    Return a retriever for the index.

    Args:
        sub_retrievers (Optional[List[BasePGRetriever]]):
            A list of sub-retrievers to use. If not provided, a default list will be used:
            `[LLMSynonymRetriever, VectorContextRetriever]` if the graph store supports vector queries.
        include_text (bool):
            Whether to include source-text in the retriever results.
        **kwargs:
            Additional kwargs to pass to the retriever.

    """
    from llama_index.core.indices.property_graph.retriever import (
        PGRetriever,
    )
    from llama_index.core.indices.property_graph.sub_retrievers.vector import (
        VectorContextRetriever,
    )
    from llama_index.core.indices.property_graph.sub_retrievers.llm_synonym import (
        LLMSynonymRetriever,
    )

    if sub_retrievers is None:
        sub_retrievers = [
            LLMSynonymRetriever(
                graph_store=self.property_graph_store,
                include_text=include_text,
                llm=self._llm,
                **kwargs,
            ),
        ]

        if self._embed_model and (
            self.property_graph_store.supports_vector_queries or self.vector_store
        ):
            sub_retrievers.append(
                VectorContextRetriever(
                    graph_store=self.property_graph_store,
                    vector_store=self.vector_store,
                    include_text=include_text,
                    embed_model=self._embed_model,
                    **kwargs,
                )
            )

    return PGRetriever(sub_retrievers, use_async=self._use_async, **kwargs)

RAKE关键词表索引 #

基类:EventBaseKeywordTableIndex

RAKE关键词表索引。

该索引使用RAKE关键词提取器从文本中提取关键词。

workflows/handler.py 中的源代码llama_index/core/indices/keyword_table/rake_base.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class RAKEKeywordTableIndex(BaseKeywordTableIndex):
    """
    RAKE Keyword Table Index.

    This index uses a RAKE keyword extractor to extract keywords from the text.

    """

    def _extract_keywords(self, text: str) -> Set[str]:
        """Extract keywords from text."""
        return rake_extract_keywords(text, max_keywords=self.max_keywords_per_chunk)

    def as_retriever(
        self,
        retriever_mode: Union[
            str, KeywordTableRetrieverMode
        ] = KeywordTableRetrieverMode.RAKE,
        **kwargs: Any,
    ) -> BaseRetriever:
        return super().as_retriever(retriever_mode=retriever_mode, **kwargs)

简单关键词表索引 #

基类:EventBaseKeywordTableIndex

简单关键词表格索引。

该索引使用简单的正则表达式提取器从文本中提取关键词。

workflows/handler.py 中的源代码llama_index/core/indices/keyword_table/simple_base.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class SimpleKeywordTableIndex(BaseKeywordTableIndex):
    """
    Simple Keyword Table Index.

    This index uses a simple regex extractor to extract keywords from the text.

    """

    def _extract_keywords(self, text: str) -> Set[str]:
        """Extract keywords from text."""
        return simple_extract_keywords(text, self.max_keywords_per_chunk)

    def as_retriever(
        self,
        retriever_mode: Union[
            str, KeywordTableRetrieverMode
        ] = KeywordTableRetrieverMode.SIMPLE,
        **kwargs: Any,
    ) -> BaseRetriever:
        return super().as_retriever(retriever_mode=retriever_mode, **kwargs)

摘要索引 #

基类:EventBaseIndex[IndexList]

摘要索引。

摘要索引是一种简单的数据结构,其中节点按顺序存储。在索引构建过程中,文档文本被分割成块,转换为节点,并存储在列表中。

在查询期间,摘要索引会遍历所有节点 (可使用一些可选筛选参数),并从所有节点中 综合生成答案。

参数:

名称 类型 描述 默认
text_qa_template Optional[BasePromptTemplate]

问答提示 (参见 :ref:Prompt-Templates)。 注意:这是一个已弃用的字段。

required
show_progress bool

是否显示tqdm进度条。默认为False。

False
workflows/handler.py 中的源代码llama_index/core/indices/list/base.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class SummaryIndex(BaseIndex[IndexList]):
    """
    Summary Index.

    The summary index is a simple data structure where nodes are stored in
    a sequence. During index construction, the document texts are
    chunked up, converted to nodes, and stored in a list.

    During query time, the summary index iterates through the nodes
    with some optional filter parameters, and synthesizes an
    answer from all the nodes.

    Args:
        text_qa_template (Optional[BasePromptTemplate]): A Question-Answer Prompt
            (see :ref:`Prompt-Templates`).
            NOTE: this is a deprecated field.
        show_progress (bool): Whether to show tqdm progress bars. Defaults to False.

    """

    index_struct_cls = IndexList

    def __init__(
        self,
        nodes: Optional[Sequence[BaseNode]] = None,
        objects: Optional[Sequence[IndexNode]] = None,
        index_struct: Optional[IndexList] = None,
        show_progress: bool = False,
        **kwargs: Any,
    ) -> None:
        """Initialize params."""
        super().__init__(
            nodes=nodes,
            index_struct=index_struct,
            show_progress=show_progress,
            objects=objects,
            **kwargs,
        )

    def as_retriever(
        self,
        retriever_mode: Union[str, ListRetrieverMode] = ListRetrieverMode.DEFAULT,
        llm: Optional[LLM] = None,
        embed_model: Optional[BaseEmbedding] = None,
        **kwargs: Any,
    ) -> BaseRetriever:
        from llama_index.core.indices.list.retrievers import (
            SummaryIndexEmbeddingRetriever,
            SummaryIndexLLMRetriever,
            SummaryIndexRetriever,
        )

        if retriever_mode == ListRetrieverMode.DEFAULT:
            return SummaryIndexRetriever(self, object_map=self._object_map, **kwargs)
        elif retriever_mode == ListRetrieverMode.EMBEDDING:
            embed_model = embed_model or Settings.embed_model
            return SummaryIndexEmbeddingRetriever(
                self, object_map=self._object_map, embed_model=embed_model, **kwargs
            )
        elif retriever_mode == ListRetrieverMode.LLM:
            llm = llm or Settings.llm
            return SummaryIndexLLMRetriever(
                self, object_map=self._object_map, llm=llm, **kwargs
            )
        else:
            raise ValueError(f"Unknown retriever mode: {retriever_mode}")

    def _build_index_from_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **build_kwargs: Any,
    ) -> IndexList:
        """
        Build the index from documents.

        Args:
            documents (List[BaseDocument]): A list of documents.

        Returns:
            IndexList: The created summary index.

        """
        index_struct = IndexList()
        nodes_with_progress = get_tqdm_iterable(
            nodes, show_progress, "Processing nodes"
        )
        for n in nodes_with_progress:
            index_struct.add_node(n)
        return index_struct

    def _insert(self, nodes: Sequence[BaseNode], **insert_kwargs: Any) -> None:
        """Insert a document."""
        for n in nodes:
            self._index_struct.add_node(n)

    def _delete_node(self, node_id: str, **delete_kwargs: Any) -> None:
        """Delete a node."""
        cur_node_ids = self._index_struct.nodes
        cur_nodes = self._docstore.get_nodes(cur_node_ids)
        nodes_to_keep = [n for n in cur_nodes if n.node_id != node_id]
        self._index_struct.nodes = [n.node_id for n in nodes_to_keep]

    @property
    def ref_doc_info(self) -> Dict[str, RefDocInfo]:
        """Retrieve a dict mapping of ingested documents and their nodes+metadata."""
        node_doc_ids = self._index_struct.nodes
        nodes = self.docstore.get_nodes(node_doc_ids)

        all_ref_doc_info = {}
        for node in nodes:
            ref_node = node.source_node
            if not ref_node:
                continue

            ref_doc_info = self.docstore.get_ref_doc_info(ref_node.node_id)
            if not ref_doc_info:
                continue

            all_ref_doc_info[ref_node.node_id] = ref_doc_info
        return all_ref_doc_info

ref_doc_info property #

ref_doc_info: Dict[str, RefDocInfo]

检索已摄取文档及其节点+元数据的字典映射。

树状索引 #

基类:EventBaseIndex[IndexGraph]

树状索引。

树状索引是一种树形结构的索引,其中每个节点都是子节点的摘要。在索引构建过程中,该树以自底向上的方式构建,直到我们得到一组根节点。

在查询时有几种不同的选项(参见:Ref-Query)。 主要选项是从根节点向下遍历树。 另一种方法是从根节点直接合成答案。

参数:

名称 类型 描述 默认
summary_template Optional[BasePromptTemplate]

一个摘要生成提示 (参见 :ref:Prompt-Templates)。

None
insert_prompt Optional[BasePromptTemplate]

树形插入提示 (参见 :ref:Prompt-Templates)。

None
num_children int

每个节点应拥有的子节点数量。

10
build_tree bool

是否在索引构建期间构建树。

True
show_progress bool

是否显示进度条。默认为 False。

False
workflows/handler.py 中的源代码llama_index/core/indices/tree/base.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class TreeIndex(BaseIndex[IndexGraph]):
    """
    Tree Index.

    The tree index is a tree-structured index, where each node is a summary of
    the children nodes. During index construction, the tree is constructed
    in a bottoms-up fashion until we end up with a set of root_nodes.

    There are a few different options during query time (see :ref:`Ref-Query`).
    The main option is to traverse down the tree from the root nodes.
    A secondary answer is to directly synthesize the answer from the root nodes.

    Args:
        summary_template (Optional[BasePromptTemplate]): A Summarization Prompt
            (see :ref:`Prompt-Templates`).
        insert_prompt (Optional[BasePromptTemplate]): An Tree Insertion Prompt
            (see :ref:`Prompt-Templates`).
        num_children (int): The number of children each node should have.
        build_tree (bool): Whether to build the tree during index construction.
        show_progress (bool): Whether to show progress bars. Defaults to False.

    """

    index_struct_cls = IndexGraph

    def __init__(
        self,
        nodes: Optional[Sequence[BaseNode]] = None,
        objects: Optional[Sequence[IndexNode]] = None,
        index_struct: Optional[IndexGraph] = None,
        llm: Optional[LLM] = None,
        summary_template: Optional[BasePromptTemplate] = None,
        insert_prompt: Optional[BasePromptTemplate] = None,
        num_children: int = 10,
        build_tree: bool = True,
        use_async: bool = False,
        show_progress: bool = False,
        **kwargs: Any,
    ) -> None:
        """Initialize params."""
        # need to set parameters before building index in base class.
        self.num_children = num_children
        self.summary_template = summary_template or DEFAULT_SUMMARY_PROMPT
        self.insert_prompt: BasePromptTemplate = insert_prompt or DEFAULT_INSERT_PROMPT
        self.build_tree = build_tree
        self._use_async = use_async
        self._llm = llm or Settings.llm
        super().__init__(
            nodes=nodes,
            index_struct=index_struct,
            show_progress=show_progress,
            objects=objects,
            **kwargs,
        )

    def as_retriever(
        self,
        retriever_mode: Union[str, TreeRetrieverMode] = TreeRetrieverMode.SELECT_LEAF,
        embed_model: Optional[BaseEmbedding] = None,
        **kwargs: Any,
    ) -> BaseRetriever:
        # NOTE: lazy import
        from llama_index.core.indices.tree.all_leaf_retriever import (
            TreeAllLeafRetriever,
        )
        from llama_index.core.indices.tree.select_leaf_embedding_retriever import (
            TreeSelectLeafEmbeddingRetriever,
        )
        from llama_index.core.indices.tree.select_leaf_retriever import (
            TreeSelectLeafRetriever,
        )
        from llama_index.core.indices.tree.tree_root_retriever import (
            TreeRootRetriever,
        )

        self._validate_build_tree_required(TreeRetrieverMode(retriever_mode))

        if retriever_mode == TreeRetrieverMode.SELECT_LEAF:
            return TreeSelectLeafRetriever(self, object_map=self._object_map, **kwargs)
        elif retriever_mode == TreeRetrieverMode.SELECT_LEAF_EMBEDDING:
            embed_model = embed_model or Settings.embed_model
            return TreeSelectLeafEmbeddingRetriever(
                self, embed_model=embed_model, object_map=self._object_map, **kwargs
            )
        elif retriever_mode == TreeRetrieverMode.ROOT:
            return TreeRootRetriever(self, object_map=self._object_map, **kwargs)
        elif retriever_mode == TreeRetrieverMode.ALL_LEAF:
            return TreeAllLeafRetriever(self, object_map=self._object_map, **kwargs)
        else:
            raise ValueError(f"Unknown retriever mode: {retriever_mode}")

    def _validate_build_tree_required(self, retriever_mode: TreeRetrieverMode) -> None:
        """Check if index supports modes that require trees."""
        if retriever_mode in REQUIRE_TREE_MODES and not self.build_tree:
            raise ValueError(
                "Index was constructed without building trees, "
                f"but retriever mode {retriever_mode} requires trees."
            )

    def _build_index_from_nodes(
        self, nodes: Sequence[BaseNode], **build_kwargs: Any
    ) -> IndexGraph:
        """Build the index from nodes."""
        index_builder = GPTTreeIndexBuilder(
            self.num_children,
            self.summary_template,
            llm=self._llm,
            use_async=self._use_async,
            show_progress=self._show_progress,
            docstore=self._docstore,
        )
        return index_builder.build_from_nodes(nodes, build_tree=self.build_tree)

    def _insert(self, nodes: Sequence[BaseNode], **insert_kwargs: Any) -> None:
        """Insert a document."""
        # TODO: allow to customize insert prompt
        inserter = TreeIndexInserter(
            self.index_struct,
            llm=self._llm,
            num_children=self.num_children,
            insert_prompt=self.insert_prompt,
            summary_prompt=self.summary_template,
            docstore=self._docstore,
        )
        inserter.insert(nodes)

    def _delete_node(self, node_id: str, **delete_kwargs: Any) -> None:
        """Delete a node."""
        raise NotImplementedError("Delete not implemented for tree index.")

    @property
    def ref_doc_info(self) -> Dict[str, RefDocInfo]:
        """Retrieve a dict mapping of ingested documents and their nodes+metadata."""
        node_doc_ids = list(self.index_struct.all_nodes.values())
        nodes = self.docstore.get_nodes(node_doc_ids)

        all_ref_doc_info = {}
        for node in nodes:
            ref_node = node.source_node
            if not ref_node:
                continue

            ref_doc_info = self.docstore.get_ref_doc_info(ref_node.node_id)
            if not ref_doc_info:
                continue

            all_ref_doc_info[ref_node.node_id] = ref_doc_info
        return all_ref_doc_info

ref_doc_info property #

ref_doc_info: Dict[str, RefDocInfo]

检索已摄取文档及其节点+元数据的字典映射。

向量存储索引 #

基类:EventBaseIndex[IndexDict]

向量存储索引。

参数:

名称 类型 描述 默认
use_async bool

是否使用异步调用。默认为 False。

False
show_progress bool

是否显示tqdm进度条。默认为False。

False
store_nodes_override bool

设置为True以始终在索引存储和文档存储中存储节点对象,即使向量存储保留文本。默认为False

False
workflows/handler.py 中的源代码llama_index/core/indices/vector_store/base.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
class VectorStoreIndex(BaseIndex[IndexDict]):
    """
    Vector Store Index.

    Args:
        use_async (bool): Whether to use asynchronous calls. Defaults to False.
        show_progress (bool): Whether to show tqdm progress bars. Defaults to False.
        store_nodes_override (bool): set to True to always store Node objects in index
            store and document store even if vector store keeps text. Defaults to False

    """

    index_struct_cls = IndexDict

    def __init__(
        self,
        nodes: Optional[Sequence[BaseNode]] = None,
        # vector store index params
        use_async: bool = False,
        store_nodes_override: bool = False,
        embed_model: Optional[EmbedType] = None,
        insert_batch_size: int = 2048,
        # parent class params
        objects: Optional[Sequence[IndexNode]] = None,
        index_struct: Optional[IndexDict] = None,
        storage_context: Optional[StorageContext] = None,
        callback_manager: Optional[CallbackManager] = None,
        transformations: Optional[List[TransformComponent]] = None,
        show_progress: bool = False,
        **kwargs: Any,
    ) -> None:
        """Initialize params."""
        self._use_async = use_async
        self._store_nodes_override = store_nodes_override
        self._embed_model = resolve_embed_model(
            embed_model or Settings.embed_model, callback_manager=callback_manager
        )

        self._insert_batch_size = insert_batch_size
        super().__init__(
            nodes=nodes,
            index_struct=index_struct,
            storage_context=storage_context,
            show_progress=show_progress,
            objects=objects,
            callback_manager=callback_manager,
            transformations=transformations,
            **kwargs,
        )

    @classmethod
    def from_vector_store(
        cls,
        vector_store: BasePydanticVectorStore,
        embed_model: Optional[EmbedType] = None,
        **kwargs: Any,
    ) -> "VectorStoreIndex":
        if not vector_store.stores_text:
            raise ValueError(
                "Cannot initialize from a vector store that does not store text."
            )

        kwargs.pop("storage_context", None)
        storage_context = StorageContext.from_defaults(vector_store=vector_store)

        return cls(
            nodes=[],
            embed_model=embed_model,
            storage_context=storage_context,
            **kwargs,
        )

    @property
    def vector_store(self) -> BasePydanticVectorStore:
        return self._vector_store

    def as_retriever(self, **kwargs: Any) -> BaseRetriever:
        # NOTE: lazy import
        from llama_index.core.indices.vector_store.retrievers import (
            VectorIndexRetriever,
        )

        return VectorIndexRetriever(
            self,
            node_ids=list(self.index_struct.nodes_dict.values()),
            callback_manager=self._callback_manager,
            object_map=self._object_map,
            **kwargs,
        )

    def _get_node_with_embedding(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
    ) -> List[BaseNode]:
        """
        Get tuples of id, node, and embedding.

        Allows us to store these nodes in a vector store.
        Embeddings are called in batches.

        """
        id_to_embed_map = embed_nodes(
            nodes, self._embed_model, show_progress=show_progress
        )

        results = []
        for node in nodes:
            embedding = id_to_embed_map[node.node_id]
            result = node.model_copy()
            result.embedding = embedding
            results.append(result)
        return results

    async def _aget_node_with_embedding(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
    ) -> List[BaseNode]:
        """
        Asynchronously get tuples of id, node, and embedding.

        Allows us to store these nodes in a vector store.
        Embeddings are called in batches.

        """
        id_to_embed_map = await async_embed_nodes(
            nodes=nodes,
            embed_model=self._embed_model,
            show_progress=show_progress,
        )

        results = []
        for node in nodes:
            embedding = id_to_embed_map[node.node_id]
            result = node.model_copy()
            result.embedding = embedding
            results.append(result)
        return results

    async def _async_add_nodes_to_index(
        self,
        index_struct: IndexDict,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **insert_kwargs: Any,
    ) -> None:
        """Asynchronously add nodes to index."""
        if not nodes:
            return

        for nodes_batch in iter_batch(nodes, self._insert_batch_size):
            nodes_batch = await self._aget_node_with_embedding(
                nodes_batch, show_progress
            )
            new_ids = await self._vector_store.async_add(nodes_batch, **insert_kwargs)

            # if the vector store doesn't store text, we need to add the nodes to the
            # index struct and document store
            if not self._vector_store.stores_text or self._store_nodes_override:
                for node, new_id in zip(nodes_batch, new_ids):
                    # NOTE: remove embedding from node to avoid duplication
                    node_without_embedding = node.model_copy()
                    node_without_embedding.embedding = None

                    index_struct.add_node(node_without_embedding, text_id=new_id)
                    await self._docstore.async_add_documents(
                        [node_without_embedding], allow_update=True
                    )
            else:
                # NOTE: if the vector store keeps text,
                # we only need to add image and index nodes
                for node, new_id in zip(nodes_batch, new_ids):
                    if isinstance(node, (ImageNode, IndexNode)):
                        # NOTE: remove embedding from node to avoid duplication
                        node_without_embedding = node.model_copy()
                        node_without_embedding.embedding = None

                        index_struct.add_node(node_without_embedding, text_id=new_id)
                        await self._docstore.async_add_documents(
                            [node_without_embedding], allow_update=True
                        )

    def _add_nodes_to_index(
        self,
        index_struct: IndexDict,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **insert_kwargs: Any,
    ) -> None:
        """Add document to index."""
        if not nodes:
            return

        for nodes_batch in iter_batch(nodes, self._insert_batch_size):
            nodes_batch = self._get_node_with_embedding(nodes_batch, show_progress)
            new_ids = self._vector_store.add(nodes_batch, **insert_kwargs)

            if not self._vector_store.stores_text or self._store_nodes_override:
                # NOTE: if the vector store doesn't store text,
                # we need to add the nodes to the index struct and document store
                for node, new_id in zip(nodes_batch, new_ids):
                    # NOTE: remove embedding from node to avoid duplication
                    node_without_embedding = node.model_copy()
                    node_without_embedding.embedding = None

                    index_struct.add_node(node_without_embedding, text_id=new_id)
                    self._docstore.add_documents(
                        [node_without_embedding], allow_update=True
                    )
            else:
                # NOTE: if the vector store keeps text,
                # we only need to add image and index nodes
                for node, new_id in zip(nodes_batch, new_ids):
                    if isinstance(node, (ImageNode, IndexNode)):
                        # NOTE: remove embedding from node to avoid duplication
                        node_without_embedding = node.model_copy()
                        node_without_embedding.embedding = None

                        index_struct.add_node(node_without_embedding, text_id=new_id)
                        self._docstore.add_documents(
                            [node_without_embedding], allow_update=True
                        )

    def _build_index_from_nodes(
        self,
        nodes: Sequence[BaseNode],
        **insert_kwargs: Any,
    ) -> IndexDict:
        """Build index from nodes."""
        index_struct = self.index_struct_cls()
        if self._use_async:
            tasks = [
                self._async_add_nodes_to_index(
                    index_struct,
                    nodes,
                    show_progress=self._show_progress,
                    **insert_kwargs,
                )
            ]
            run_async_tasks(tasks)
        else:
            self._add_nodes_to_index(
                index_struct,
                nodes,
                show_progress=self._show_progress,
                **insert_kwargs,
            )
        return index_struct

    def build_index_from_nodes(
        self,
        nodes: Sequence[BaseNode],
        **insert_kwargs: Any,
    ) -> IndexDict:
        """
        Build the index from nodes.

        NOTE: Overrides BaseIndex.build_index_from_nodes.
            VectorStoreIndex only stores nodes in document store
            if vector store does not store text
        """
        # Filter out the nodes that don't have content
        content_nodes = [
            node
            for node in nodes
            if node.get_content(metadata_mode=MetadataMode.EMBED) != ""
        ]

        # Report if some nodes are missing content
        if len(content_nodes) != len(nodes):
            print("Some nodes are missing content, skipping them...")

        return self._build_index_from_nodes(content_nodes, **insert_kwargs)

    def _insert(self, nodes: Sequence[BaseNode], **insert_kwargs: Any) -> None:
        """Insert a document."""
        self._add_nodes_to_index(self._index_struct, nodes, **insert_kwargs)

    def _validate_serializable(self, nodes: Sequence[BaseNode]) -> None:
        """Validate that the nodes are serializable."""
        for node in nodes:
            if isinstance(node, IndexNode):
                try:
                    node.dict()
                except ValueError:
                    self._object_map[node.index_id] = node.obj
                    node.obj = None

    async def ainsert_nodes(
        self, nodes: Sequence[BaseNode], **insert_kwargs: Any
    ) -> None:
        """
        Insert nodes.

        NOTE: overrides BaseIndex.ainsert_nodes.
            VectorStoreIndex only stores nodes in document store
            if vector store does not store text
        """
        self._validate_serializable(nodes)

        with self._callback_manager.as_trace("insert_nodes"):
            await self._async_add_nodes_to_index(
                self._index_struct, nodes, **insert_kwargs
            )
            self._storage_context.index_store.add_index_struct(self._index_struct)

    def insert_nodes(self, nodes: Sequence[BaseNode], **insert_kwargs: Any) -> None:
        """
        Insert nodes.

        NOTE: overrides BaseIndex.insert_nodes.
            VectorStoreIndex only stores nodes in document store
            if vector store does not store text
        """
        self._validate_serializable(nodes)

        with self._callback_manager.as_trace("insert_nodes"):
            self._insert(nodes, **insert_kwargs)
            self._storage_context.index_store.add_index_struct(self._index_struct)

    def _delete_node(self, node_id: str, **delete_kwargs: Any) -> None:
        pass

    async def adelete_nodes(
        self,
        node_ids: List[str],
        delete_from_docstore: bool = False,
        **delete_kwargs: Any,
    ) -> None:
        """
        Delete a list of nodes from the index.

        Args:
            node_ids (List[str]): A list of node_ids from the nodes to delete

        """
        # delete nodes from vector store
        await self._vector_store.adelete_nodes(node_ids, **delete_kwargs)

        # delete from docstore only if needed
        if (
            not self._vector_store.stores_text or self._store_nodes_override
        ) and delete_from_docstore:
            for node_id in node_ids:
                self._index_struct.delete(node_id)
                await self._docstore.adelete_document(node_id, raise_error=False)
            self._storage_context.index_store.add_index_struct(self._index_struct)

    def delete_nodes(
        self,
        node_ids: List[str],
        delete_from_docstore: bool = False,
        **delete_kwargs: Any,
    ) -> None:
        """
        Delete a list of nodes from the index.

        Args:
            node_ids (List[str]): A list of node_ids from the nodes to delete

        """
        # delete nodes from vector store
        self._vector_store.delete_nodes(node_ids, **delete_kwargs)

        # delete from docstore only if needed
        if (
            not self._vector_store.stores_text or self._store_nodes_override
        ) and delete_from_docstore:
            for node_id in node_ids:
                self._index_struct.delete(node_id)
                self._docstore.delete_document(node_id, raise_error=False)
            self._storage_context.index_store.add_index_struct(self._index_struct)

    def _delete_from_index_struct(self, ref_doc_id: str) -> None:
        # delete from index_struct only if needed
        if not self._vector_store.stores_text or self._store_nodes_override:
            ref_doc_info = self._docstore.get_ref_doc_info(ref_doc_id)
            if ref_doc_info is not None:
                for node_id in ref_doc_info.node_ids:
                    self._index_struct.delete(node_id)
                    self._vector_store.delete(node_id)

    def _delete_from_docstore(self, ref_doc_id: str) -> None:
        # delete from docstore only if needed
        if not self._vector_store.stores_text or self._store_nodes_override:
            self._docstore.delete_ref_doc(ref_doc_id, raise_error=False)

    def delete_ref_doc(
        self, ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any
    ) -> None:
        """Delete a document and it's nodes by using ref_doc_id."""
        self._vector_store.delete(ref_doc_id, **delete_kwargs)
        self._delete_from_index_struct(ref_doc_id)
        if delete_from_docstore:
            self._delete_from_docstore(ref_doc_id)
        self._storage_context.index_store.add_index_struct(self._index_struct)

    async def _adelete_from_index_struct(self, ref_doc_id: str) -> None:
        """Delete from index_struct only if needed."""
        if not self._vector_store.stores_text or self._store_nodes_override:
            ref_doc_info = await self._docstore.aget_ref_doc_info(ref_doc_id)
            if ref_doc_info is not None:
                for node_id in ref_doc_info.node_ids:
                    self._index_struct.delete(node_id)
                    self._vector_store.delete(node_id)

    async def _adelete_from_docstore(self, ref_doc_id: str) -> None:
        """Delete from docstore only if needed."""
        if not self._vector_store.stores_text or self._store_nodes_override:
            await self._docstore.adelete_ref_doc(ref_doc_id, raise_error=False)

    async def adelete_ref_doc(
        self, ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any
    ) -> None:
        """Delete a document and it's nodes by using ref_doc_id."""
        tasks = [
            self._vector_store.adelete(ref_doc_id, **delete_kwargs),
            self._adelete_from_index_struct(ref_doc_id),
        ]
        if delete_from_docstore:
            tasks.append(self._adelete_from_docstore(ref_doc_id))

        await asyncio.gather(*tasks)

        self._storage_context.index_store.add_index_struct(self._index_struct)

    @property
    def ref_doc_info(self) -> Dict[str, RefDocInfo]:
        """Retrieve a dict mapping of ingested documents and their nodes+metadata."""
        if not self._vector_store.stores_text or self._store_nodes_override:
            node_doc_ids = list(self.index_struct.nodes_dict.values())
            nodes = self.docstore.get_nodes(node_doc_ids)

            all_ref_doc_info = {}
            for node in nodes:
                ref_node = node.source_node
                if not ref_node:
                    continue

                ref_doc_info = self.docstore.get_ref_doc_info(ref_node.node_id)
                if not ref_doc_info:
                    continue

                all_ref_doc_info[ref_node.node_id] = ref_doc_info
            return all_ref_doc_info
        else:
            raise NotImplementedError(
                "Vector store integrations that store text in the vector store are "
                "not supported by ref_doc_info yet."
            )

ref_doc_info property #

ref_doc_info: Dict[str, RefDocInfo]

检索已摄取文档及其节点+元数据的字典映射。

build_index_from_nodes #

build_index_from_nodes(nodes: Sequence[BaseNode], **insert_kwargs: Any) -> IndexDict

从节点构建索引。

Overrides BaseIndex.build_index_from_nodes.

如果向量存储不存储文本,VectorStoreIndex 仅在文档存储中存储节点

workflows/handler.py 中的源代码llama_index/core/indices/vector_store/base.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def build_index_from_nodes(
    self,
    nodes: Sequence[BaseNode],
    **insert_kwargs: Any,
) -> IndexDict:
    """
    Build the index from nodes.

    NOTE: Overrides BaseIndex.build_index_from_nodes.
        VectorStoreIndex only stores nodes in document store
        if vector store does not store text
    """
    # Filter out the nodes that don't have content
    content_nodes = [
        node
        for node in nodes
        if node.get_content(metadata_mode=MetadataMode.EMBED) != ""
    ]

    # Report if some nodes are missing content
    if len(content_nodes) != len(nodes):
        print("Some nodes are missing content, skipping them...")

    return self._build_index_from_nodes(content_nodes, **insert_kwargs)

ainsert_nodes async #

ainsert_nodes(nodes: Sequence[BaseNode], **insert_kwargs: Any) -> None

插入节点。

overrides BaseIndex.ainsert_nodes.

如果向量存储不存储文本,VectorStoreIndex 仅在文档存储中存储节点

workflows/handler.py 中的源代码llama_index/core/indices/vector_store/base.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
async def ainsert_nodes(
    self, nodes: Sequence[BaseNode], **insert_kwargs: Any
) -> None:
    """
    Insert nodes.

    NOTE: overrides BaseIndex.ainsert_nodes.
        VectorStoreIndex only stores nodes in document store
        if vector store does not store text
    """
    self._validate_serializable(nodes)

    with self._callback_manager.as_trace("insert_nodes"):
        await self._async_add_nodes_to_index(
            self._index_struct, nodes, **insert_kwargs
        )
        self._storage_context.index_store.add_index_struct(self._index_struct)

insert_nodes #

insert_nodes(nodes: Sequence[BaseNode], **insert_kwargs: Any) -> None

插入节点。

overrides BaseIndex.insert_nodes.

如果向量存储不存储文本,VectorStoreIndex 仅在文档存储中存储节点

workflows/handler.py 中的源代码llama_index/core/indices/vector_store/base.py
343
344
345
346
347
348
349
350
351
352
353
354
355
def insert_nodes(self, nodes: Sequence[BaseNode], **insert_kwargs: Any) -> None:
    """
    Insert nodes.

    NOTE: overrides BaseIndex.insert_nodes.
        VectorStoreIndex only stores nodes in document store
        if vector store does not store text
    """
    self._validate_serializable(nodes)

    with self._callback_manager.as_trace("insert_nodes"):
        self._insert(nodes, **insert_kwargs)
        self._storage_context.index_store.add_index_struct(self._index_struct)

adelete_nodes async #

adelete_nodes(node_ids: List[str], delete_from_docstore: bool = False, **delete_kwargs: Any) -> None

从索引中删除节点列表。

参数:

名称 类型 描述 默认
node_ids List[str]

要删除的节点对应的节点ID列表

required
workflows/handler.py 中的源代码llama_index/core/indices/vector_store/base.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
async def adelete_nodes(
    self,
    node_ids: List[str],
    delete_from_docstore: bool = False,
    **delete_kwargs: Any,
) -> None:
    """
    Delete a list of nodes from the index.

    Args:
        node_ids (List[str]): A list of node_ids from the nodes to delete

    """
    # delete nodes from vector store
    await self._vector_store.adelete_nodes(node_ids, **delete_kwargs)

    # delete from docstore only if needed
    if (
        not self._vector_store.stores_text or self._store_nodes_override
    ) and delete_from_docstore:
        for node_id in node_ids:
            self._index_struct.delete(node_id)
            await self._docstore.adelete_document(node_id, raise_error=False)
        self._storage_context.index_store.add_index_struct(self._index_struct)

delete_nodes #

delete_nodes(node_ids: List[str], delete_from_docstore: bool = False, **delete_kwargs: Any) -> None

从索引中删除节点列表。

参数:

名称 类型 描述 默认
node_ids List[str]

要删除的节点对应的节点ID列表

required
workflows/handler.py 中的源代码llama_index/core/indices/vector_store/base.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def delete_nodes(
    self,
    node_ids: List[str],
    delete_from_docstore: bool = False,
    **delete_kwargs: Any,
) -> None:
    """
    Delete a list of nodes from the index.

    Args:
        node_ids (List[str]): A list of node_ids from the nodes to delete

    """
    # delete nodes from vector store
    self._vector_store.delete_nodes(node_ids, **delete_kwargs)

    # delete from docstore only if needed
    if (
        not self._vector_store.stores_text or self._store_nodes_override
    ) and delete_from_docstore:
        for node_id in node_ids:
            self._index_struct.delete(node_id)
            self._docstore.delete_document(node_id, raise_error=False)
        self._storage_context.index_store.add_index_struct(self._index_struct)

delete_ref_doc #

delete_ref_doc(ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any) -> None

通过使用 ref_doc_id 删除文档及其节点。

workflows/handler.py 中的源代码llama_index/core/indices/vector_store/base.py
424
425
426
427
428
429
430
431
432
def delete_ref_doc(
    self, ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any
) -> None:
    """Delete a document and it's nodes by using ref_doc_id."""
    self._vector_store.delete(ref_doc_id, **delete_kwargs)
    self._delete_from_index_struct(ref_doc_id)
    if delete_from_docstore:
        self._delete_from_docstore(ref_doc_id)
    self._storage_context.index_store.add_index_struct(self._index_struct)

adelete_ref_doc async #

adelete_ref_doc(ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any) -> None

通过使用 ref_doc_id 删除文档及其节点。

workflows/handler.py 中的源代码llama_index/core/indices/vector_store/base.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
async def adelete_ref_doc(
    self, ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any
) -> None:
    """Delete a document and it's nodes by using ref_doc_id."""
    tasks = [
        self._vector_store.adelete(ref_doc_id, **delete_kwargs),
        self._adelete_from_index_struct(ref_doc_id),
    ]
    if delete_from_docstore:
        tasks.append(self._adelete_from_docstore(ref_doc_id))

    await asyncio.gather(*tasks)

    self._storage_context.index_store.add_index_struct(self._index_struct)

SQL文档上下文构建器 #

为给定SQL表集合构建上下文的构建器。

参数:

名称 类型 描述 默认
sql_database Optional[SQLDatabase]

要使用的SQL数据库,

required
text_splitter Optional[TextSplitter]

要使用的文本分割器。

None
table_context_prompt Optional[BasePromptTemplate]

一个表格上下文提示(参见:Prompt-Templates)。

None
refine_table_context_prompt Optional[BasePromptTemplate]

一个优化表格上下文提示(参见:Prompt-Templates)。

None
table_context_task Optional[str]

对表格上下文执行的查询。如果用户未提供查询字符串,则使用默认查询字符串。

None
workflows/handler.py 中的源代码llama_index/core/indices/common/struct_store/base.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class SQLDocumentContextBuilder:
    """
    Builder that builds context for a given set of SQL tables.

    Args:
        sql_database (Optional[SQLDatabase]): SQL database to use,
        text_splitter (Optional[TextSplitter]): Text Splitter to use.
        table_context_prompt (Optional[BasePromptTemplate]): A
            Table Context Prompt (see :ref:`Prompt-Templates`).
        refine_table_context_prompt (Optional[BasePromptTemplate]):
            A Refine Table Context Prompt (see :ref:`Prompt-Templates`).
        table_context_task (Optional[str]): The query to perform
            on the table context. A default query string is used
            if none is provided by the user.

    """

    def __init__(
        self,
        sql_database: SQLDatabase,
        llm: Optional[LLM] = None,
        text_splitter: Optional[TextSplitter] = None,
        table_context_prompt: Optional[BasePromptTemplate] = None,
        refine_table_context_prompt: Optional[BasePromptTemplate] = None,
        table_context_task: Optional[str] = None,
    ) -> None:
        """Initialize params."""
        # TODO: take in an entire index instead of forming a response builder
        if sql_database is None:
            raise ValueError("sql_database must be provided.")
        self._sql_database = sql_database
        self._text_splitter = text_splitter
        self._llm = llm or Settings.llm
        self._prompt_helper = Settings._prompt_helper or PromptHelper.from_llm_metadata(
            self._llm.metadata,
        )
        self._callback_manager = Settings.callback_manager
        self._table_context_prompt = (
            table_context_prompt or DEFAULT_TABLE_CONTEXT_PROMPT
        )
        self._refine_table_context_prompt = (
            refine_table_context_prompt or DEFAULT_REFINE_TABLE_CONTEXT_PROMPT_SEL
        )
        self._table_context_task = table_context_task or DEFAULT_TABLE_CONTEXT_QUERY

    def build_all_context_from_documents(
        self,
        documents_dict: Dict[str, List[BaseNode]],
    ) -> Dict[str, str]:
        """Build context for all tables in the database."""
        context_dict = {}
        for table_name in self._sql_database.get_usable_table_names():
            context_dict[table_name] = self.build_table_context_from_documents(
                documents_dict[table_name], table_name
            )
        return context_dict

    def build_table_context_from_documents(
        self,
        documents: Sequence[BaseNode],
        table_name: str,
    ) -> str:
        """Build context from documents for a single table."""
        schema = self._sql_database.get_single_table_info(table_name)
        prompt_with_schema = self._table_context_prompt.partial_format(schema=schema)
        prompt_with_schema.metadata["prompt_type"] = PromptType.QUESTION_ANSWER
        refine_prompt_with_schema = self._refine_table_context_prompt.partial_format(
            schema=schema
        )
        refine_prompt_with_schema.metadata["prompt_type"] = PromptType.REFINE

        text_splitter = (
            self._text_splitter
            or self._prompt_helper.get_text_splitter_given_prompt(
                prompt_with_schema, llm=self._llm
            )
        )
        # we use the ResponseBuilder to iteratively go through all texts
        response_builder = get_response_synthesizer(
            llm=self._llm,
            text_qa_template=prompt_with_schema,
            refine_template=refine_prompt_with_schema,
        )
        with self._callback_manager.event(
            CBEventType.CHUNKING,
            payload={EventPayload.DOCUMENTS: documents},
        ) as event:
            text_chunks = []
            for doc in documents:
                chunks = text_splitter.split_text(
                    doc.get_content(metadata_mode=MetadataMode.LLM)
                )
                text_chunks.extend(chunks)

            event.on_end(
                payload={EventPayload.CHUNKS: text_chunks},
            )

        # feed in the "query_str" or the task
        table_context = response_builder.get_response(
            text_chunks=text_chunks, query_str=self._table_context_task
        )
        return cast(str, table_context)

build_all_context_from_documents #

build_all_context_from_documents(documents_dict: Dict[str, List[BaseNode]]) -> Dict[str, str]

为数据库中的所有表构建上下文。

workflows/handler.py 中的源代码llama_index/core/indices/common/struct_store/base.py
75
76
77
78
79
80
81
82
83
84
85
def build_all_context_from_documents(
    self,
    documents_dict: Dict[str, List[BaseNode]],
) -> Dict[str, str]:
    """Build context for all tables in the database."""
    context_dict = {}
    for table_name in self._sql_database.get_usable_table_names():
        context_dict[table_name] = self.build_table_context_from_documents(
            documents_dict[table_name], table_name
        )
    return context_dict

build_table_context_from_documents #

build_table_context_from_documents(documents: Sequence[BaseNode], table_name: str) -> str

从文档中为单个表格构建上下文。

workflows/handler.py 中的源代码llama_index/core/indices/common/struct_store/base.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def build_table_context_from_documents(
    self,
    documents: Sequence[BaseNode],
    table_name: str,
) -> str:
    """Build context from documents for a single table."""
    schema = self._sql_database.get_single_table_info(table_name)
    prompt_with_schema = self._table_context_prompt.partial_format(schema=schema)
    prompt_with_schema.metadata["prompt_type"] = PromptType.QUESTION_ANSWER
    refine_prompt_with_schema = self._refine_table_context_prompt.partial_format(
        schema=schema
    )
    refine_prompt_with_schema.metadata["prompt_type"] = PromptType.REFINE

    text_splitter = (
        self._text_splitter
        or self._prompt_helper.get_text_splitter_given_prompt(
            prompt_with_schema, llm=self._llm
        )
    )
    # we use the ResponseBuilder to iteratively go through all texts
    response_builder = get_response_synthesizer(
        llm=self._llm,
        text_qa_template=prompt_with_schema,
        refine_template=refine_prompt_with_schema,
    )
    with self._callback_manager.event(
        CBEventType.CHUNKING,
        payload={EventPayload.DOCUMENTS: documents},
    ) as event:
        text_chunks = []
        for doc in documents:
            chunks = text_splitter.split_text(
                doc.get_content(metadata_mode=MetadataMode.LLM)
            )
            text_chunks.extend(chunks)

        event.on_end(
            payload={EventPayload.CHUNKS: text_chunks},
        )

    # feed in the "query_str" or the task
    table_context = response_builder.get_response(
        text_chunks=text_chunks, query_str=self._table_context_task
    )
    return cast(str, table_context)

提示助手 #

基类:EventBaseComponent

提示助手。

通用提示助手,可帮助处理LLM上下文窗口的令牌限制问题。

其核心原理是通过从大型语言模型的上下文窗口大小开始计算可用上下文空间,并为提示模板和输出预留令牌空间。

它提供了“重新打包”文本块(从索引中检索)的实用功能,以最大限度地利用可用的上下文窗口(从而减少所需的LLM调用次数),或截断它们以使其适应单个LLM调用。

参数:

名称 类型 描述 默认
context_window int

LLM的上下文窗口。

3900
num_output int

LLM的输出数量。

256
chunk_overlap_ratio float

分块重叠作为分块大小的比例

0.1
chunk_size_limit Optional[int]

使用的最大分块大小。

required
tokenizer Optional[Callable[[str], List]]

要使用的分词器。

required
separator str

文本分割器的分隔符

' '
workflows/handler.py 中的源代码llama_index/core/indices/prompt_helper.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
class PromptHelper(BaseComponent):
    """
    Prompt helper.

    General prompt helper that can help deal with LLM context window token limitations.

    At its core, it calculates available context size by starting with the context
    window size of an LLM and reserve token space for the prompt template, and the
    output.

    It provides utility for "repacking" text chunks (retrieved from index) to maximally
    make use of the available context window (and thereby reducing the number of LLM
    calls needed), or truncating them so that they fit in a single LLM call.

    Args:
        context_window (int):                   Context window for the LLM.
        num_output (int):                       Number of outputs for the LLM.
        chunk_overlap_ratio (float):            Chunk overlap as a ratio of chunk size
        chunk_size_limit (Optional[int]):         Maximum chunk size to use.
        tokenizer (Optional[Callable[[str], List]]): Tokenizer to use.
        separator (str):                        Separator for text splitter

    """

    context_window: int = Field(
        default=DEFAULT_CONTEXT_WINDOW,
        description="The maximum context size that will get sent to the LLM.",
    )
    num_output: int = Field(
        default=DEFAULT_NUM_OUTPUTS,
        description="The amount of token-space to leave in input for generation.",
    )
    chunk_overlap_ratio: float = Field(
        default=DEFAULT_CHUNK_OVERLAP_RATIO,
        description="The percentage token amount that each chunk should overlap.",
    )
    chunk_size_limit: Optional[int] = Field(description="The maximum size of a chunk.")
    separator: str = Field(
        default=" ", description="The separator when chunking tokens."
    )

    _token_counter: TokenCounter = PrivateAttr()

    def __init__(
        self,
        context_window: int = DEFAULT_CONTEXT_WINDOW,
        num_output: int = DEFAULT_NUM_OUTPUTS,
        chunk_overlap_ratio: float = DEFAULT_CHUNK_OVERLAP_RATIO,
        chunk_size_limit: Optional[int] = None,
        tokenizer: Optional[Callable[[str], List]] = None,
        separator: str = " ",
    ) -> None:
        """Init params."""
        if chunk_overlap_ratio > 1.0 or chunk_overlap_ratio < 0.0:
            raise ValueError("chunk_overlap_ratio must be a float between 0. and 1.")
        super().__init__(
            context_window=context_window,
            num_output=num_output,
            chunk_overlap_ratio=chunk_overlap_ratio,
            chunk_size_limit=chunk_size_limit,
            separator=separator,
        )

        # TODO: make configurable
        self._token_counter = TokenCounter(tokenizer=tokenizer)

    @classmethod
    def from_llm_metadata(
        cls,
        llm_metadata: LLMMetadata,
        chunk_overlap_ratio: float = DEFAULT_CHUNK_OVERLAP_RATIO,
        chunk_size_limit: Optional[int] = None,
        tokenizer: Optional[Callable[[str], List]] = None,
        separator: str = " ",
    ) -> "PromptHelper":
        """
        Create from llm predictor.

        This will autofill values like context_window and num_output.

        """
        context_window = llm_metadata.context_window

        if llm_metadata.num_output == -1:
            num_output = DEFAULT_NUM_OUTPUTS
        else:
            num_output = llm_metadata.num_output

        return cls(
            context_window=context_window,
            num_output=num_output,
            chunk_overlap_ratio=chunk_overlap_ratio,
            chunk_size_limit=chunk_size_limit,
            tokenizer=tokenizer,
            separator=separator,
        )

    @classmethod
    def class_name(cls) -> str:
        return "PromptHelper"

    def _get_available_context_size(self, num_prompt_tokens: int) -> int:
        """
        Get available context size.

        This is calculated as:
            available context window = total context window
                - input (partially filled prompt)
                - output (room reserved for response)

        Notes:
        - Available context size is further clamped to be non-negative.

        """
        context_size_tokens = self.context_window - num_prompt_tokens - self.num_output
        if context_size_tokens < 0:
            raise ValueError(
                f"Calculated available context size {context_size_tokens} was"
                " not non-negative."
            )
        return context_size_tokens

    def _get_tools_from_llm(
        self, llm: Optional[LLM] = None, tools: Optional[List["BaseTool"]] = None
    ) -> List["BaseTool"]:
        from llama_index.core.program.function_program import get_function_tool

        tools = tools or []
        if isinstance(llm, StructuredLLM):
            tools.append(get_function_tool(llm.output_cls))

        return tools

    def _get_available_chunk_size(
        self,
        prompt: BasePromptTemplate,
        num_chunks: int = 1,
        padding: int = 5,
        llm: Optional[LLM] = None,
        tools: Optional[List["BaseTool"]] = None,
    ) -> int:
        """
        Get available chunk size.

        This is calculated as:
            available chunk size = available context window  // number_chunks
                - padding

        Notes:
        - By default, we use padding of 5 (to save space for formatting needs).
        - Available chunk size is further clamped to chunk_size_limit if specified.

        """
        tools = self._get_tools_from_llm(llm=llm, tools=tools)

        if isinstance(prompt, SelectorPromptTemplate):
            prompt = prompt.select(llm=llm)

        if isinstance(prompt, ChatPromptTemplate):
            messages: List[ChatMessage] = prompt.message_templates

            # account for partial formatting
            partial_messages = []
            for message in messages:
                partial_message = deepcopy(message)

                # TODO: This does not count tokens in non-text blocks
                prompt_kwargs = prompt.kwargs or {}
                partial_message.blocks = format_content_blocks(
                    partial_message.blocks, **prompt_kwargs
                )

                # add to list of partial messages
                partial_messages.append(partial_message)

            num_prompt_tokens = self._token_counter.estimate_tokens_in_messages(
                partial_messages
            )
        else:
            prompt_str = get_empty_prompt_txt(prompt)
            num_prompt_tokens = self._token_counter.get_string_tokens(prompt_str)

        num_prompt_tokens += self._token_counter.estimate_tokens_in_tools(
            [x.metadata.to_openai_tool() for x in tools]
        )

        # structured llms cannot have system prompts currently -- check the underlying llm
        if isinstance(llm, StructuredLLM):
            num_prompt_tokens += self._token_counter.get_string_tokens(
                llm.llm.system_prompt or ""
            )
        elif llm is not None:
            num_prompt_tokens += self._token_counter.get_string_tokens(
                llm.system_prompt or ""
            )

        available_context_size = self._get_available_context_size(num_prompt_tokens)
        result = available_context_size // num_chunks - padding
        if self.chunk_size_limit is not None:
            result = min(result, self.chunk_size_limit)
        return result

    def get_text_splitter_given_prompt(
        self,
        prompt: BasePromptTemplate,
        num_chunks: int = 1,
        padding: int = DEFAULT_PADDING,
        llm: Optional[LLM] = None,
        tools: Optional[List["BaseTool"]] = None,
    ) -> TokenTextSplitter:
        """
        Get text splitter configured to maximally pack available context window,
        taking into account of given prompt, and desired number of chunks.
        """
        chunk_size = self._get_available_chunk_size(
            prompt, num_chunks, padding=padding, llm=llm, tools=tools
        )
        if chunk_size <= 0:
            raise ValueError(f"Chunk size {chunk_size} is not positive.")
        chunk_overlap = int(self.chunk_overlap_ratio * chunk_size)
        return TokenTextSplitter(
            separator=self.separator,
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            tokenizer=self._token_counter.tokenizer,
        )

    def truncate(
        self,
        prompt: BasePromptTemplate,
        text_chunks: Sequence[str],
        padding: int = DEFAULT_PADDING,
        llm: Optional[LLM] = None,
        tools: Optional[List["BaseTool"]] = None,
    ) -> List[str]:
        """Truncate text chunks to fit available context window."""
        text_splitter = self.get_text_splitter_given_prompt(
            prompt,
            num_chunks=len(text_chunks),
            padding=padding,
            llm=llm,
            tools=tools,
        )
        return [truncate_text(chunk, text_splitter) for chunk in text_chunks]

    def repack(
        self,
        prompt: BasePromptTemplate,
        text_chunks: Sequence[str],
        padding: int = DEFAULT_PADDING,
        llm: Optional[LLM] = None,
        tools: Optional[List["BaseTool"]] = None,
    ) -> List[str]:
        """
        Repack text chunks to fit available context window.

        This will combine text chunks into consolidated chunks
        that more fully "pack" the prompt template given the context_window.

        """
        text_splitter = self.get_text_splitter_given_prompt(
            prompt, padding=padding, llm=llm, tools=tools
        )
        combined_str = "\n\n".join([c.strip() for c in text_chunks if c.strip()])
        return text_splitter.split_text(combined_str)

from_llm_metadata classmethod #

from_llm_metadata(llm_metadata: LLMMetadata, chunk_overlap_ratio: float = DEFAULT_CHUNK_OVERLAP_RATIO, chunk_size_limit: Optional[int] = None, tokenizer: Optional[Callable[[str], List]] = None, separator: str = ' ') -> PromptHelper

从LLM预测器创建。

这将自动填充诸如上下文窗口和输出数量等值。

workflows/handler.py 中的源代码llama_index/core/indices/prompt_helper.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@classmethod
def from_llm_metadata(
    cls,
    llm_metadata: LLMMetadata,
    chunk_overlap_ratio: float = DEFAULT_CHUNK_OVERLAP_RATIO,
    chunk_size_limit: Optional[int] = None,
    tokenizer: Optional[Callable[[str], List]] = None,
    separator: str = " ",
) -> "PromptHelper":
    """
    Create from llm predictor.

    This will autofill values like context_window and num_output.

    """
    context_window = llm_metadata.context_window

    if llm_metadata.num_output == -1:
        num_output = DEFAULT_NUM_OUTPUTS
    else:
        num_output = llm_metadata.num_output

    return cls(
        context_window=context_window,
        num_output=num_output,
        chunk_overlap_ratio=chunk_overlap_ratio,
        chunk_size_limit=chunk_size_limit,
        tokenizer=tokenizer,
        separator=separator,
    )

get_text_splitter_given_prompt #

get_text_splitter_given_prompt(prompt: BasePromptTemplate, num_chunks: int = 1, padding: int = DEFAULT_PADDING, llm: Optional[大语言模型] = None, tools: Optional[List[BaseTool]] = None) -> TokenTextSplitter

获取配置好的文本分割器,以最大程度地利用可用上下文窗口,同时考虑给定的提示和期望的分块数量。

workflows/handler.py 中的源代码llama_index/core/indices/prompt_helper.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def get_text_splitter_given_prompt(
    self,
    prompt: BasePromptTemplate,
    num_chunks: int = 1,
    padding: int = DEFAULT_PADDING,
    llm: Optional[LLM] = None,
    tools: Optional[List["BaseTool"]] = None,
) -> TokenTextSplitter:
    """
    Get text splitter configured to maximally pack available context window,
    taking into account of given prompt, and desired number of chunks.
    """
    chunk_size = self._get_available_chunk_size(
        prompt, num_chunks, padding=padding, llm=llm, tools=tools
    )
    if chunk_size <= 0:
        raise ValueError(f"Chunk size {chunk_size} is not positive.")
    chunk_overlap = int(self.chunk_overlap_ratio * chunk_size)
    return TokenTextSplitter(
        separator=self.separator,
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        tokenizer=self._token_counter.tokenizer,
    )

截断 #

truncate(prompt: BasePromptTemplate, text_chunks: Sequence[str], padding: int = DEFAULT_PADDING, llm: Optional[大语言模型] = None, tools: Optional[List[BaseTool]] = None) -> List[str]

截断文本块以适应可用的上下文窗口。

workflows/handler.py 中的源代码llama_index/core/indices/prompt_helper.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def truncate(
    self,
    prompt: BasePromptTemplate,
    text_chunks: Sequence[str],
    padding: int = DEFAULT_PADDING,
    llm: Optional[LLM] = None,
    tools: Optional[List["BaseTool"]] = None,
) -> List[str]:
    """Truncate text chunks to fit available context window."""
    text_splitter = self.get_text_splitter_given_prompt(
        prompt,
        num_chunks=len(text_chunks),
        padding=padding,
        llm=llm,
        tools=tools,
    )
    return [truncate_text(chunk, text_splitter) for chunk in text_chunks]

重新打包 #

repack(prompt: BasePromptTemplate, text_chunks: Sequence[str], padding: int = DEFAULT_PADDING, llm: Optional[大语言模型] = None, tools: Optional[List[BaseTool]] = None) -> List[str]

重新打包文本块以适应可用的上下文窗口。

这将把文本块合并为整合块,以便在给定上下文窗口的情况下更充分地"填充"提示模板。

workflows/handler.py 中的源代码llama_index/core/indices/prompt_helper.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def repack(
    self,
    prompt: BasePromptTemplate,
    text_chunks: Sequence[str],
    padding: int = DEFAULT_PADDING,
    llm: Optional[LLM] = None,
    tools: Optional[List["BaseTool"]] = None,
) -> List[str]:
    """
    Repack text chunks to fit available context window.

    This will combine text chunks into consolidated chunks
    that more fully "pack" the prompt template given the context_window.

    """
    text_splitter = self.get_text_splitter_given_prompt(
        prompt, padding=padding, llm=llm, tools=tools
    )
    combined_str = "\n\n".join([c.strip() for c in text_chunks if c.strip()])
    return text_splitter.split_text(combined_str)

基础提示模板 #

Bases: BaseModel, ABC

参数:

名称 类型 描述 默认
metadata Dict[str, Any]
required
template_vars List[str]
required
kwargs Dict[str, str]
required
output_parser BaseOutputParser | None
required
workflows/handler.py 中的源代码llama_index/core/prompts/base.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class BasePromptTemplate(BaseModel, ABC):  # type: ignore[no-redef]
    model_config = ConfigDict(arbitrary_types_allowed=True)
    metadata: Dict[str, Any]
    template_vars: List[str]
    kwargs: Dict[str, str]
    output_parser: Optional[BaseOutputParser]
    template_var_mappings: Optional[Dict[str, Any]] = Field(
        default_factory=dict,  # type: ignore
        description="Template variable mappings (Optional).",
    )
    function_mappings: Optional[Dict[str, AnnotatedCallable]] = Field(
        default_factory=dict,  # type: ignore
        description=(
            "Function mappings (Optional). This is a mapping from template "
            "variable names to functions that take in the current kwargs and "
            "return a string."
        ),
    )

    def _map_template_vars(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
        """For keys in template_var_mappings, swap in the right keys."""
        template_var_mappings = self.template_var_mappings or {}
        return {template_var_mappings.get(k, k): v for k, v in kwargs.items()}

    def _map_function_vars(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
        """
        For keys in function_mappings, compute values and combine w/ kwargs.

        Users can pass in functions instead of fixed values as format variables.
        For each function, we call the function with the current kwargs,
        get back the value, and then use that value in the template
        for the corresponding format variable.

        """
        function_mappings = self.function_mappings or {}
        # first generate the values for the functions
        new_kwargs = {}
        for k, v in function_mappings.items():
            # TODO: figure out what variables to pass into each function
            # is it the kwargs specified during query time? just the fixed kwargs?
            # all kwargs?
            new_kwargs[k] = v(**kwargs)

        # then, add the fixed variables only if not in new_kwargs already
        # (implying that function mapping will override fixed variables)
        for k, v in kwargs.items():
            if k not in new_kwargs:
                new_kwargs[k] = v

        return new_kwargs

    def _map_all_vars(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
        """
        Map both template and function variables.

        We (1) first call function mappings to compute functions,
        and then (2) call the template_var_mappings.

        """
        # map function
        new_kwargs = self._map_function_vars(kwargs)
        # map template vars (to point to existing format vars in string template)
        return self._map_template_vars(new_kwargs)

    @abstractmethod
    def partial_format(self, **kwargs: Any) -> "BasePromptTemplate": ...

    @abstractmethod
    def format(self, llm: Optional[BaseLLM] = None, **kwargs: Any) -> str: ...

    @abstractmethod
    def format_messages(
        self, llm: Optional[BaseLLM] = None, **kwargs: Any
    ) -> List[ChatMessage]: ...

    @abstractmethod
    def get_template(self, llm: Optional[BaseLLM] = None) -> str: ...

聊天提示模板 #

基类:EventBasePromptTemplate

参数:

名称 类型 描述 默认
message_templates List[ChatMessage]
required
workflows/handler.py 中的源代码llama_index/core/prompts/base.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
class ChatPromptTemplate(BasePromptTemplate):  # type: ignore[no-redef]
    message_templates: List[ChatMessage]

    def __init__(
        self,
        message_templates: Sequence[ChatMessage],
        prompt_type: str = PromptType.CUSTOM,
        output_parser: Optional[BaseOutputParser] = None,
        metadata: Optional[Dict[str, Any]] = None,
        template_var_mappings: Optional[Dict[str, Any]] = None,
        function_mappings: Optional[Dict[str, Callable]] = None,
        **kwargs: Any,
    ):
        if metadata is None:
            metadata = {}
        metadata["prompt_type"] = prompt_type

        template_vars = []
        for message_template in message_templates:
            template_vars.extend(get_template_vars(message_template.content or ""))

        super().__init__(
            message_templates=message_templates,
            kwargs=kwargs,
            metadata=metadata,
            output_parser=output_parser,
            template_vars=template_vars,
            template_var_mappings=template_var_mappings,
            function_mappings=function_mappings,
        )

    @classmethod
    def from_messages(
        cls,
        message_templates: Union[List[Tuple[str, str]], List[ChatMessage]],
        **kwargs: Any,
    ) -> "ChatPromptTemplate":
        """From messages."""
        if isinstance(message_templates[0], tuple):
            message_templates = [
                ChatMessage.from_str(role=role, content=content)  # type: ignore[arg-type]
                for role, content in message_templates
            ]
        return cls(message_templates=message_templates, **kwargs)  # type: ignore[arg-type]

    def partial_format(self, **kwargs: Any) -> "ChatPromptTemplate":
        prompt = deepcopy(self)
        prompt.kwargs.update(kwargs)
        return prompt

    def format(
        self,
        llm: Optional[BaseLLM] = None,
        messages_to_prompt: Optional[Callable[[Sequence[ChatMessage]], str]] = None,
        **kwargs: Any,
    ) -> str:
        del llm  # unused
        messages = self.format_messages(**kwargs)

        if messages_to_prompt is not None:
            return messages_to_prompt(messages)

        return default_messages_to_prompt(messages)

    def format_messages(
        self, llm: Optional[BaseLLM] = None, **kwargs: Any
    ) -> List[ChatMessage]:
        del llm  # unused
        """Format the prompt into a list of chat messages."""
        all_kwargs = {
            **self.kwargs,
            **kwargs,
        }
        mapped_all_kwargs = self._map_all_vars(all_kwargs)

        messages: List[ChatMessage] = []
        for message_template in self.message_templates:
            # Handle messages with multiple blocks
            if message_template.blocks:
                formatted_blocks: List[ContentBlock] = []
                for block in message_template.blocks:
                    if isinstance(block, TextBlock):
                        template_vars = get_template_vars(block.text)
                        relevant_kwargs = {
                            k: v
                            for k, v in mapped_all_kwargs.items()
                            if k in template_vars
                        }
                        formatted_text = format_string(block.text, **relevant_kwargs)
                        formatted_blocks.append(TextBlock(text=formatted_text))
                    else:
                        # For non-text blocks (like images), keep them as is
                        # TODO: can images be formatted as variables?
                        formatted_blocks.append(block)

                message = message_template.model_copy()
                message.blocks = formatted_blocks
                messages.append(message)
            else:
                # Handle empty messages (if any)
                messages.append(message_template.model_copy())

        if self.output_parser is not None:
            messages = self.output_parser.format_messages(messages)

        return messages

    def get_template(self, llm: Optional[BaseLLM] = None) -> str:
        return default_messages_to_prompt(self.message_templates)

from_messages classmethod #

from_messages(message_templates: Union[List[Tuple[str, str]], List[ChatMessage]], **kwargs: Any) -> ChatPromptTemplate

来自消息。

workflows/handler.py 中的源代码llama_index/core/prompts/base.py
248
249
250
251
252
253
254
255
256
257
258
259
260
@classmethod
def from_messages(
    cls,
    message_templates: Union[List[Tuple[str, str]], List[ChatMessage]],
    **kwargs: Any,
) -> "ChatPromptTemplate":
    """From messages."""
    if isinstance(message_templates[0], tuple):
        message_templates = [
            ChatMessage.from_str(role=role, content=content)  # type: ignore[arg-type]
            for role, content in message_templates
        ]
    return cls(message_templates=message_templates, **kwargs)  # type: ignore[arg-type]

提示模板 #

基类:EventBasePromptTemplate

参数:

名称 类型 描述 默认
template str
required
workflows/handler.py 中的源代码llama_index/core/prompts/base.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
class PromptTemplate(BasePromptTemplate):  # type: ignore[no-redef]
    template: str

    def __init__(
        self,
        template: str,
        prompt_type: str = PromptType.CUSTOM,
        output_parser: Optional[BaseOutputParser] = None,
        metadata: Optional[Dict[str, Any]] = None,
        template_var_mappings: Optional[Dict[str, Any]] = None,
        function_mappings: Optional[Dict[str, Callable]] = None,
        **kwargs: Any,
    ) -> None:
        if metadata is None:
            metadata = {}
        metadata["prompt_type"] = prompt_type

        template_vars = get_template_vars(template)

        super().__init__(
            template=template,
            template_vars=template_vars,
            kwargs=kwargs,
            metadata=metadata,
            output_parser=output_parser,
            template_var_mappings=template_var_mappings,
            function_mappings=function_mappings,
        )

    def partial_format(self, **kwargs: Any) -> "PromptTemplate":
        """Partially format the prompt."""
        # NOTE: this is a hack to get around deepcopy failing on output parser
        output_parser = self.output_parser
        self.output_parser = None

        # get function and fixed kwargs, and add that to a copy
        # of the current prompt object
        prompt = deepcopy(self)
        prompt.kwargs.update(kwargs)

        # NOTE: put the output parser back
        prompt.output_parser = output_parser
        self.output_parser = output_parser
        return prompt

    def format(
        self,
        llm: Optional[BaseLLM] = None,
        completion_to_prompt: Optional[Callable[[str], str]] = None,
        **kwargs: Any,
    ) -> str:
        """Format the prompt into a string."""
        del llm  # unused
        all_kwargs = {
            **self.kwargs,
            **kwargs,
        }

        mapped_all_kwargs = self._map_all_vars(all_kwargs)
        prompt = format_string(self.template, **mapped_all_kwargs)

        if self.output_parser is not None:
            prompt = self.output_parser.format(prompt)

        if completion_to_prompt is not None:
            prompt = completion_to_prompt(prompt)

        return prompt

    def format_messages(
        self, llm: Optional[BaseLLM] = None, **kwargs: Any
    ) -> List[ChatMessage]:
        """Format the prompt into a list of chat messages."""
        del llm  # unused
        prompt = self.format(**kwargs)
        return prompt_to_messages(prompt)

    def get_template(self, llm: Optional[BaseLLM] = None) -> str:
        return self.template

partial_format #

partial_format(**kwargs: Any) -> PromptTemplate

部分格式化提示词。

workflows/handler.py 中的源代码llama_index/core/prompts/base.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def partial_format(self, **kwargs: Any) -> "PromptTemplate":
    """Partially format the prompt."""
    # NOTE: this is a hack to get around deepcopy failing on output parser
    output_parser = self.output_parser
    self.output_parser = None

    # get function and fixed kwargs, and add that to a copy
    # of the current prompt object
    prompt = deepcopy(self)
    prompt.kwargs.update(kwargs)

    # NOTE: put the output parser back
    prompt.output_parser = output_parser
    self.output_parser = output_parser
    return prompt

格式 #

format(llm: Optional[BaseLLM] = None, completion_to_prompt: Optional[Callable[[str], str]] = None, **kwargs: Any) -> str

将提示格式化为字符串。

workflows/handler.py 中的源代码llama_index/core/prompts/base.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def format(
    self,
    llm: Optional[BaseLLM] = None,
    completion_to_prompt: Optional[Callable[[str], str]] = None,
    **kwargs: Any,
) -> str:
    """Format the prompt into a string."""
    del llm  # unused
    all_kwargs = {
        **self.kwargs,
        **kwargs,
    }

    mapped_all_kwargs = self._map_all_vars(all_kwargs)
    prompt = format_string(self.template, **mapped_all_kwargs)

    if self.output_parser is not None:
        prompt = self.output_parser.format(prompt)

    if completion_to_prompt is not None:
        prompt = completion_to_prompt(prompt)

    return prompt

format_messages #

format_messages(llm: Optional[BaseLLM] = None, **kwargs: Any) -> List[ChatMessage]

将提示格式化为聊天消息列表。

workflows/handler.py 中的源代码llama_index/core/prompts/base.py
205
206
207
208
209
210
211
def format_messages(
    self, llm: Optional[BaseLLM] = None, **kwargs: Any
) -> List[ChatMessage]:
    """Format the prompt into a list of chat messages."""
    del llm  # unused
    prompt = self.format(**kwargs)
    return prompt_to_messages(prompt)

选择器提示模板 #

基类:EventBasePromptTemplate

参数:

名称 类型 描述 默认
default_template BasePromptTemplate
required
conditionals Sequence[Tuple[Callable[list, bool], BasePromptTemplate]] | None
None
workflows/handler.py 中的源代码llama_index/core/prompts/base.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
class SelectorPromptTemplate(BasePromptTemplate):  # type: ignore[no-redef]
    default_template: SerializeAsAny[BasePromptTemplate]
    conditionals: Optional[
        Sequence[Tuple[Callable[[BaseLLM], bool], BasePromptTemplate]]
    ] = None

    def __init__(
        self,
        default_template: BasePromptTemplate,
        conditionals: Optional[
            Sequence[Tuple[Callable[[BaseLLM], bool], BasePromptTemplate]]
        ] = None,
    ):
        metadata = default_template.metadata
        kwargs = default_template.kwargs
        template_vars = default_template.template_vars
        output_parser = default_template.output_parser
        super().__init__(
            default_template=default_template,
            conditionals=conditionals,
            metadata=metadata,
            kwargs=kwargs,
            template_vars=template_vars,
            output_parser=output_parser,
        )

    def select(self, llm: Optional[BaseLLM] = None) -> BasePromptTemplate:
        # ensure output parser is up to date
        self.default_template.output_parser = self.output_parser

        if llm is None:
            return self.default_template

        if self.conditionals is not None:
            for condition, prompt in self.conditionals:
                if condition(llm):
                    # ensure output parser is up to date
                    prompt.output_parser = self.output_parser
                    return prompt

        return self.default_template

    def partial_format(self, **kwargs: Any) -> "SelectorPromptTemplate":
        default_template = self.default_template.partial_format(**kwargs)
        if self.conditionals is None:
            conditionals = None
        else:
            conditionals = [
                (condition, prompt.partial_format(**kwargs))
                for condition, prompt in self.conditionals
            ]
        return SelectorPromptTemplate(
            default_template=default_template, conditionals=conditionals
        )

    def format(self, llm: Optional[BaseLLM] = None, **kwargs: Any) -> str:
        """Format the prompt into a string."""
        prompt = self.select(llm=llm)
        return prompt.format(**kwargs)

    def format_messages(
        self, llm: Optional[BaseLLM] = None, **kwargs: Any
    ) -> List[ChatMessage]:
        """Format the prompt into a list of chat messages."""
        prompt = self.select(llm=llm)
        return prompt.format_messages(**kwargs)

    def get_template(self, llm: Optional[BaseLLM] = None) -> str:
        prompt = self.select(llm=llm)
        return prompt.get_template(llm=llm)

格式 #

format(llm: Optional[BaseLLM] = None, **kwargs: Any) -> str

将提示格式化为字符串。

workflows/handler.py 中的源代码llama_index/core/prompts/base.py
383
384
385
386
def format(self, llm: Optional[BaseLLM] = None, **kwargs: Any) -> str:
    """Format the prompt into a string."""
    prompt = self.select(llm=llm)
    return prompt.format(**kwargs)

format_messages #

format_messages(llm: Optional[BaseLLM] = None, **kwargs: Any) -> List[ChatMessage]

将提示格式化为聊天消息列表。

workflows/handler.py 中的源代码llama_index/core/prompts/base.py
388
389
390
391
392
393
def format_messages(
    self, llm: Optional[BaseLLM] = None, **kwargs: Any
) -> List[ChatMessage]:
    """Format the prompt into a list of chat messages."""
    prompt = self.select(llm=llm)
    return prompt.format_messages(**kwargs)

简单目录读取器 #

基类:BaseReader, ResourcesReaderMixin, FileSystemReaderMixin

简易目录读取器。

从文件目录加载文件。 根据文件扩展名自动选择最佳文件读取器。

参数:

名称 类型 描述 默认
input_dir Union[Path, str]

目录路径。

None
input_files List

要读取的文件路径列表 (可选;将覆盖 input_dir、exclude)

None
exclude List

要排除的Python文件路径的通配符(可选)

None
exclude_hidden bool

是否排除隐藏文件(点文件)。

True
exclude_empty bool

是否排除空文件(可选)。

False
encoding str

文件的编码方式。 默认是 utf-8。

'utf-8'
errors str

如何处理编码和解码错误, 请参阅 https://docs.python.org/3/library/functions.html#open

'ignore'
recursive bool

是否在子目录中递归搜索。 默认为False。

False
filename_as_id bool

是否使用文件名作为文档ID。 默认为False。

False
required_exts Optional[List[str]]

所需扩展列表。 默认为无。

None
file_extractor Optional[Dict[str, BaseReader]]

文件扩展名到 BaseReader 类的映射,用于指定如何将该文件转换为文本。如果未指定,则使用 DEFAULT_FILE_READER_CLS 中的默认值。

None
num_files_limit Optional[int]

最大读取文件数量。 默认值为无。

None
file_metadata Optional[Callable[[str], Dict]]

一个接收文件名并返回文档元数据字典的函数。 默认为 None。

None
raise_on_error bool

如果无法读取文件,是否引发错误。

False
fs Optional[AbstractFileSystem]

要使用的文件系统。默认值

None
workflows/handler.py 中的源代码llama_index/core/readers/file/base.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
class SimpleDirectoryReader(BaseReader, ResourcesReaderMixin, FileSystemReaderMixin):
    """
    Simple directory reader.

    Load files from file directory.
    Automatically select the best file reader given file extensions.

    Args:
        input_dir (Union[Path, str]): Path to the directory.
        input_files (List): List of file paths to read
            (Optional; overrides input_dir, exclude)
        exclude (List): glob of python file paths to exclude (Optional)
        exclude_hidden (bool): Whether to exclude hidden files (dotfiles).
        exclude_empty (bool): Whether to exclude empty files (Optional).
        encoding (str): Encoding of the files.
            Default is utf-8.
        errors (str): how encoding and decoding errors are to be handled,
              see https://docs.python.org/3/library/functions.html#open
        recursive (bool): Whether to recursively search in subdirectories.
            False by default.
        filename_as_id (bool): Whether to use the filename as the document id.
            False by default.
        required_exts (Optional[List[str]]): List of required extensions.
            Default is None.
        file_extractor (Optional[Dict[str, BaseReader]]): A mapping of file
            extension to a BaseReader class that specifies how to convert that file
            to text. If not specified, use default from DEFAULT_FILE_READER_CLS.
        num_files_limit (Optional[int]): Maximum number of files to read.
            Default is None.
        file_metadata (Optional[Callable[[str], Dict]]): A function that takes
            in a filename and returns a Dict of metadata for the Document.
            Default is None.
        raise_on_error (bool): Whether to raise an error if a file cannot be read.
        fs (Optional[fsspec.AbstractFileSystem]): File system to use. Defaults
        to using the local file system. Can be changed to use any remote file system
        exposed via the fsspec interface.

    """

    supported_suffix_fn: Callable = _try_loading_included_file_formats

    def __init__(
        self,
        input_dir: Optional[Union[Path, str]] = None,
        input_files: Optional[list] = None,
        exclude: Optional[list] = None,
        exclude_hidden: bool = True,
        exclude_empty: bool = False,
        errors: str = "ignore",
        recursive: bool = False,
        encoding: str = "utf-8",
        filename_as_id: bool = False,
        required_exts: Optional[list[str]] = None,
        file_extractor: Optional[dict[str, BaseReader]] = None,
        num_files_limit: Optional[int] = None,
        file_metadata: Optional[Callable[[str], dict]] = None,
        raise_on_error: bool = False,
        fs: fsspec.AbstractFileSystem | None = None,
    ) -> None:
        """Initialize with parameters."""
        super().__init__()

        if not input_dir and not input_files:
            raise ValueError("Must provide either `input_dir` or `input_files`.")

        self.fs = fs or get_default_fs()
        self.errors = errors
        self.encoding = encoding

        self.exclude = exclude
        self.recursive = recursive
        self.exclude_hidden = exclude_hidden
        self.exclude_empty = exclude_empty
        self.required_exts = required_exts
        self.num_files_limit = num_files_limit
        self.raise_on_error = raise_on_error
        _Path = Path if is_default_fs(self.fs) else PurePosixPath

        if input_files:
            self.input_files = []
            for path in input_files:
                if not self.fs.isfile(path):
                    raise ValueError(f"File {path} does not exist.")
                input_file = _Path(path)
                self.input_files.append(input_file)
        elif input_dir:
            if not self.fs.isdir(input_dir):
                raise ValueError(f"Directory {input_dir} does not exist.")
            self.input_dir = _Path(input_dir)
            self.exclude = exclude
            self.input_files = self._add_files(self.input_dir)

        self.file_extractor = file_extractor or {}
        self.file_metadata = file_metadata or _DefaultFileMetadataFunc(self.fs)
        self.filename_as_id = filename_as_id

    def is_hidden(self, path: Path | PurePosixPath) -> bool:
        return any(
            part.startswith(".") and part not in [".", ".."] for part in path.parts
        )

    def is_empty_file(self, path: Path | PurePosixPath) -> bool:
        return self.fs.isfile(str(path)) and self.fs.info(str(path)).get("size", 0) == 0

    def _is_directory(self, path: Path | PurePosixPath) -> bool:
        """
        Check if a path is a directory, with special handling for S3 filesystems.

        For S3 filesystems, directories are often represented as 0-byte objects
        ending with '/'. This method provides more reliable directory detection
        than fs.isdir() alone.
        """
        try:
            # First try the standard isdir check
            if self.fs.isdir(path):
                return True

            # For non-default filesystems (like S3), also check for directory placeholders
            if not is_default_fs(self.fs):
                try:
                    info = self.fs.info(str(path))
                    # Check if it's a 0-byte object ending with '/'
                    # This is how S3 typically represents directory placeholders
                    if (
                        info.get("size", 0) == 0
                        and str(path).endswith("/")
                        and info.get("type") != "file"
                    ):
                        return True
                except Exception:
                    # If we can't get info, fall back to the original isdir check
                    pass

            return False
        except Exception:
            # If anything fails, assume it's not a directory to be safe
            return False

    def _add_files(self, input_dir: Path | PurePosixPath) -> list[Path | PurePosixPath]:
        """Add files."""
        all_files: set[Path | PurePosixPath] = set()
        rejected_files: set[Path | PurePosixPath] = set()
        rejected_dirs: set[Path | PurePosixPath] = set()
        # Default to POSIX paths for non-default file systems (e.g. S3)
        _Path = Path if is_default_fs(self.fs) else PurePosixPath

        if self.exclude is not None:
            for excluded_pattern in self.exclude:
                if self.recursive:
                    # Recursive glob
                    excluded_glob = _Path(input_dir) / _Path("**") / excluded_pattern
                else:
                    # Non-recursive glob
                    excluded_glob = _Path(input_dir) / excluded_pattern
                for file in self.fs.glob(str(excluded_glob)):
                    if self.fs.isdir(file):
                        rejected_dirs.add(_Path(str(file)))
                    else:
                        rejected_files.add(_Path(str(file)))

        file_refs: list[Union[Path, PurePosixPath]] = []
        limit = (
            self.num_files_limit
            if self.num_files_limit is not None and self.num_files_limit > 0
            else None
        )
        c = 0
        depth = 1000 if self.recursive else 1
        for root, _, files in self.fs.walk(
            str(input_dir), topdown=True, maxdepth=depth
        ):
            for file in files:
                c += 1
                if limit and c > limit:
                    break
                file_refs.append(_Path(root, file))

        for ref in file_refs:
            # Manually check if file is hidden or directory instead of
            # in glob for backwards compatibility.
            is_dir = self._is_directory(ref)
            skip_because_hidden = self.exclude_hidden and self.is_hidden(ref)
            skip_because_empty = self.exclude_empty and self.is_empty_file(ref)
            skip_because_bad_ext = (
                self.required_exts is not None and ref.suffix not in self.required_exts
            )
            skip_because_excluded = ref in rejected_files
            if not skip_because_excluded:
                if is_dir:
                    ref_parent_dir = ref
                else:
                    ref_parent_dir = self.fs._parent(ref)
                for rejected_dir in rejected_dirs:
                    if str(ref_parent_dir).startswith(str(rejected_dir)):
                        skip_because_excluded = True
                        logger.debug(
                            "Skipping %s because it in parent dir %s which is in %s",
                            ref,
                            ref_parent_dir,
                            rejected_dir,
                        )
                        break

            if (
                is_dir
                or skip_because_hidden
                or skip_because_bad_ext
                or skip_because_excluded
                or skip_because_empty
            ):
                continue
            else:
                all_files.add(ref)

        new_input_files = sorted(all_files)

        if len(new_input_files) == 0:
            raise ValueError(f"No files found in {input_dir}.")

        # print total number of files added
        logger.debug(
            f"> [SimpleDirectoryReader] Total files added: {len(new_input_files)}"
        )

        return new_input_files

    def _exclude_metadata(self, documents: list[Document]) -> list[Document]:
        """
        Exclude metadata from documents.

        Args:
            documents (List[Document]): List of documents.

        """
        for doc in documents:
            # Keep only metadata['file_path'] in both embedding and llm content
            # str, which contain extreme important context that about the chunks.
            # Dates is provided for convenience of postprocessor such as
            # TimeWeightedPostprocessor, but excluded for embedding and LLMprompts
            doc.excluded_embed_metadata_keys.extend(
                [
                    "file_name",
                    "file_type",
                    "file_size",
                    "creation_date",
                    "last_modified_date",
                    "last_accessed_date",
                ]
            )
            doc.excluded_llm_metadata_keys.extend(
                [
                    "file_name",
                    "file_type",
                    "file_size",
                    "creation_date",
                    "last_modified_date",
                    "last_accessed_date",
                ]
            )

        return documents

    def list_resources(self, *args: Any, **kwargs: Any) -> list[str]:
        """List files in the given filesystem."""
        return [str(x) for x in self.input_files]

    def get_resource_info(self, resource_id: str, *args: Any, **kwargs: Any) -> dict:
        info_result = self.fs.info(resource_id)

        creation_date = _format_file_timestamp(
            info_result.get("created"), include_time=True
        )
        last_modified_date = _format_file_timestamp(
            info_result.get("mtime"), include_time=True
        )

        info_dict = {
            "file_path": resource_id,
            "file_size": info_result.get("size"),
            "creation_date": creation_date,
            "last_modified_date": last_modified_date,
        }

        # Ignore None values
        return {
            meta_key: meta_value
            for meta_key, meta_value in info_dict.items()
            if meta_value is not None
        }

    def load_resource(
        self, resource_id: str, *args: Any, **kwargs: Any
    ) -> list[Document]:
        file_metadata = kwargs.get("file_metadata", self.file_metadata)
        file_extractor = kwargs.get("file_extractor", self.file_extractor)
        filename_as_id = kwargs.get("filename_as_id", self.filename_as_id)
        encoding = kwargs.get("encoding", self.encoding)
        errors = kwargs.get("errors", self.errors)
        raise_on_error = kwargs.get("raise_on_error", self.raise_on_error)
        fs = kwargs.get("fs", self.fs)

        _Path = Path if is_default_fs(fs) else PurePosixPath

        return SimpleDirectoryReader.load_file(
            input_file=_Path(resource_id),
            file_metadata=file_metadata,
            file_extractor=file_extractor,
            filename_as_id=filename_as_id,
            encoding=encoding,
            errors=errors,
            raise_on_error=raise_on_error,
            fs=fs,
            **kwargs,
        )

    async def aload_resource(
        self, resource_id: str, *args: Any, **kwargs: Any
    ) -> list[Document]:
        file_metadata = kwargs.get("file_metadata", self.file_metadata)
        file_extractor = kwargs.get("file_extractor", self.file_extractor)
        filename_as_id = kwargs.get("filename_as_id", self.filename_as_id)
        encoding = kwargs.get("encoding", self.encoding)
        errors = kwargs.get("errors", self.errors)
        raise_on_error = kwargs.get("raise_on_error", self.raise_on_error)
        fs = kwargs.get("fs", self.fs)
        _Path = Path if is_default_fs(fs) else PurePosixPath

        return await SimpleDirectoryReader.aload_file(
            input_file=_Path(resource_id),
            file_metadata=file_metadata,
            file_extractor=file_extractor,
            filename_as_id=filename_as_id,
            encoding=encoding,
            errors=errors,
            raise_on_error=raise_on_error,
            fs=fs,
            **kwargs,
        )

    def read_file_content(self, input_file: Path, **kwargs: Any) -> bytes:
        """Read file content."""
        fs: fsspec.AbstractFileSystem = kwargs.get("fs", self.fs)
        with fs.open(input_file, errors=self.errors, encoding=self.encoding) as f:
            # default mode is 'rb', we can cast the return value of f.read()
            return cast(bytes, f.read())

    @staticmethod
    def load_file(
        input_file: Path | PurePosixPath,
        file_metadata: Callable[[str], dict],
        file_extractor: dict[str, BaseReader],
        filename_as_id: bool = False,
        encoding: str = "utf-8",
        errors: str = "ignore",
        raise_on_error: bool = False,
        fs: fsspec.AbstractFileSystem | None = None,
    ) -> list[Document]:
        """
        Static method for loading file.

        NOTE: necessarily as a static method for parallel processing.

        Args:
            input_file (Path): File path to read
            file_metadata ([Callable[[str], Dict]]): A function that takes
                in a filename and returns a Dict of metadata for the Document.
            file_extractor (Dict[str, BaseReader]): A mapping of file
                extension to a BaseReader class that specifies how to convert that file
                to text.
            filename_as_id (bool): Whether to use the filename as the document id.
            encoding (str): Encoding of the files.
                Default is utf-8.
            errors (str): how encoding and decoding errors are to be handled,
                see https://docs.python.org/3/library/functions.html#open
            raise_on_error (bool): Whether to raise an error if a file cannot be read.
            fs (Optional[fsspec.AbstractFileSystem]): File system to use. Defaults
                to using the local file system. Can be changed to use any remote file system

        Returns:
            List[Document]: loaded documents

        """
        # TODO: make this less redundant
        default_file_reader_cls = SimpleDirectoryReader.supported_suffix_fn()
        default_file_reader_suffix = list(default_file_reader_cls.keys())
        metadata: dict | None = None
        documents: list[Document] = []

        if file_metadata is not None:
            metadata = file_metadata(str(input_file))

        file_suffix = input_file.suffix.lower()
        if file_suffix in default_file_reader_suffix or file_suffix in file_extractor:
            # use file readers
            if file_suffix not in file_extractor:
                # instantiate file reader if not already
                reader_cls = default_file_reader_cls[file_suffix]
                file_extractor[file_suffix] = reader_cls()
            reader = file_extractor[file_suffix]

            # load data -- catch all errors except for ImportError
            try:
                kwargs: dict[str, Any] = {"extra_info": metadata}
                if fs and not is_default_fs(fs):
                    kwargs["fs"] = fs
                docs = reader.load_data(input_file, **kwargs)
            except ImportError as e:
                # ensure that ImportError is raised so user knows
                # about missing dependencies
                raise ImportError(str(e))
            except Exception as e:
                if raise_on_error:
                    raise Exception("Error loading file") from e
                # otherwise, just skip the file and report the error
                print(
                    f"Failed to load file {input_file} with error: {e}. Skipping...",
                    flush=True,
                )
                return []

            # iterate over docs if needed
            if filename_as_id:
                for i, doc in enumerate(docs):
                    doc.id_ = f"{input_file!s}_part_{i}"

            documents.extend(docs)
        else:
            # do standard read
            fs = fs or get_default_fs()
            with fs.open(input_file, errors=errors, encoding=encoding) as f:
                data = cast(bytes, f.read()).decode(encoding, errors=errors)

            doc = Document(text=data, metadata=metadata or {})  # type: ignore
            if filename_as_id:
                doc.id_ = str(input_file)

            documents.append(doc)

        return documents

    @staticmethod
    async def aload_file(
        input_file: Path | PurePosixPath,
        file_metadata: Callable[[str], dict],
        file_extractor: dict[str, BaseReader],
        filename_as_id: bool = False,
        encoding: str = "utf-8",
        errors: str = "ignore",
        raise_on_error: bool = False,
        fs: fsspec.AbstractFileSystem | None = None,
    ) -> list[Document]:
        """Load file asynchronously."""
        # TODO: make this less redundant
        default_file_reader_cls = SimpleDirectoryReader.supported_suffix_fn()
        default_file_reader_suffix = list(default_file_reader_cls.keys())
        metadata: dict | None = None
        documents: list[Document] = []

        if file_metadata is not None:
            metadata = file_metadata(str(input_file))

        file_suffix = input_file.suffix.lower()
        if file_suffix in default_file_reader_suffix or file_suffix in file_extractor:
            # use file readers
            if file_suffix not in file_extractor:
                # instantiate file reader if not already
                reader_cls = default_file_reader_cls[file_suffix]
                file_extractor[file_suffix] = reader_cls()
            reader = file_extractor[file_suffix]

            # load data -- catch all errors except for ImportError
            try:
                kwargs: dict[str, Any] = {"extra_info": metadata}
                if fs and not is_default_fs(fs):
                    kwargs["fs"] = fs
                docs = await reader.aload_data(input_file, **kwargs)
            except ImportError as e:
                # ensure that ImportError is raised so user knows
                # about missing dependencies
                raise ImportError(str(e))
            except Exception as e:
                if raise_on_error:
                    raise
                # otherwise, just skip the file and report the error
                print(
                    f"Failed to load file {input_file} with error: {e}. Skipping...",
                    flush=True,
                )
                return []

            # iterate over docs if needed
            if filename_as_id:
                for i, doc in enumerate(docs):
                    doc.id_ = f"{input_file!s}_part_{i}"

            documents.extend(docs)
        else:
            # do standard read
            fs = fs or get_default_fs()
            with fs.open(input_file, errors=errors, encoding=encoding) as f:
                data = cast(bytes, f.read()).decode(encoding, errors=errors)

            doc = Document(text=data, metadata=metadata or {})  # type: ignore
            if filename_as_id:
                doc.id_ = str(input_file)

            documents.append(doc)

        return documents

    def load_data(
        self,
        show_progress: bool = False,
        num_workers: int | None = None,
        fs: fsspec.AbstractFileSystem | None = None,
    ) -> list[Document]:
        """
        Load data from the input directory.

        Args:
            show_progress (bool): Whether to show tqdm progress bars. Defaults to False.
            num_workers  (Optional[int]): Number of workers to parallelize data-loading over.
            fs (Optional[fsspec.AbstractFileSystem]): File system to use. If fs was specified
                in the constructor, it will override the fs parameter here.

        Returns:
            List[Document]: A list of documents.

        """
        documents = []

        fs = fs or self.fs
        load_file_with_args = partial(
            SimpleDirectoryReader.load_file,
            file_metadata=self.file_metadata,
            file_extractor=self.file_extractor,
            filename_as_id=self.filename_as_id,
            encoding=self.encoding,
            errors=self.errors,
            raise_on_error=self.raise_on_error,
            fs=fs,
        )

        if num_workers and num_workers > 1:
            num_cpus = multiprocessing.cpu_count()
            if num_workers > num_cpus:
                warnings.warn(
                    "Specified num_workers exceed number of CPUs in the system. "
                    "Setting `num_workers` down to the maximum CPU count."
                )
                num_workers = num_cpus

            with multiprocessing.get_context("spawn").Pool(num_workers) as pool:
                map_iterator = cast(
                    Iterable[list[Document]],
                    get_tqdm_iterable(
                        pool.imap(load_file_with_args, self.input_files),
                        show_progress=show_progress,
                        desc="Loading files",
                        total=len(self.input_files),
                    ),
                )
                for result in map_iterator:
                    documents.extend(result)

        else:
            files_to_process = cast(
                list[Union[Path, PurePosixPath]],
                get_tqdm_iterable(
                    self.input_files,
                    show_progress=show_progress,
                    desc="Loading files",
                ),
            )
            for input_file in files_to_process:
                documents.extend(load_file_with_args(input_file))

        return self._exclude_metadata(documents)

    async def aload_data(
        self,
        show_progress: bool = False,
        num_workers: int | None = None,
        fs: fsspec.AbstractFileSystem | None = None,
    ) -> list[Document]:
        """
        Load data from the input directory.

        Args:
            show_progress (bool): Whether to show tqdm progress bars. Defaults to False.
            num_workers  (Optional[int]): Number of workers to parallelize data-loading over.
            fs (Optional[fsspec.AbstractFileSystem]): File system to use. If fs was specified
                in the constructor, it will override the fs parameter here.

        Returns:
            List[Document]: A list of documents.

        """
        files_to_process = self.input_files
        fs = fs or self.fs

        coroutines = [
            SimpleDirectoryReader.aload_file(
                input_file,
                self.file_metadata,
                self.file_extractor,
                self.filename_as_id,
                self.encoding,
                self.errors,
                self.raise_on_error,
                fs,
            )
            for input_file in files_to_process
        ]

        if num_workers:
            document_lists = await run_jobs(
                coroutines, show_progress=show_progress, workers=num_workers
            )
        elif show_progress:
            _asyncio = get_asyncio_module(show_progress=show_progress)
            document_lists = await _asyncio.gather(*coroutines)
        else:
            document_lists = await asyncio.gather(*coroutines)
        documents = [doc for doc_list in document_lists for doc in doc_list]

        return self._exclude_metadata(documents)

    def iter_data(
        self, show_progress: bool = False
    ) -> Generator[list[Document], Any, Any]:
        """
        Load data iteratively from the input directory.

        Args:
            show_progress (bool): Whether to show tqdm progress bars. Defaults to False.

        Returns:
            Generator[List[Document]]: A list of documents.

        """
        files_to_process = cast(
            list[Union[Path, PurePosixPath]],
            get_tqdm_iterable(
                self.input_files,
                show_progress=show_progress,
                desc="Loading files",
            ),
        )
        for input_file in files_to_process:
            documents = SimpleDirectoryReader.load_file(
                input_file=input_file,
                file_metadata=self.file_metadata,
                file_extractor=self.file_extractor,
                filename_as_id=self.filename_as_id,
                encoding=self.encoding,
                errors=self.errors,
                raise_on_error=self.raise_on_error,
                fs=self.fs,
            )

            documents = self._exclude_metadata(documents)

            if len(documents) > 0:
                yield documents

list_resources #

list_resources(*args: Any, **kwargs: Any) -> list[str]

列出给定文件系统中的文件。

workflows/handler.py 中的源代码llama_index/core/readers/file/base.py
470
471
472
def list_resources(self, *args: Any, **kwargs: Any) -> list[str]:
    """List files in the given filesystem."""
    return [str(x) for x in self.input_files]

read_file_content #

read_file_content(input_file: Path, **kwargs: Any) -> bytes

读取文件内容。

workflows/handler.py 中的源代码llama_index/core/readers/file/base.py
547
548
549
550
551
552
def read_file_content(self, input_file: Path, **kwargs: Any) -> bytes:
    """Read file content."""
    fs: fsspec.AbstractFileSystem = kwargs.get("fs", self.fs)
    with fs.open(input_file, errors=self.errors, encoding=self.encoding) as f:
        # default mode is 'rb', we can cast the return value of f.read()
        return cast(bytes, f.read())

load_file staticmethod #

load_file(input_file: Path | PurePosixPath, file_metadata: Callable[[str], dict], file_extractor: dict[str, BaseReader], filename_as_id: bool = False, encoding: str = 'utf-8', errors: str = 'ignore', raise_on_error: bool = False, fs: AbstractFileSystem | None = None) -> list[文档]

用于加载文件的静态方法。

注意:必须作为静态方法以实现并行处理。

参数:

名称 类型 描述 默认
input_file Path

要读取的文件路径

required
file_metadata [Callable[[str], Dict]]

一个接收文件名并返回文档元数据字典的函数。

required
file_extractor Dict[str, BaseReader]

文件扩展名到 BaseReader 类的映射,用于指定如何将该文件转换为文本。

required
filename_as_id bool

是否使用文件名作为文档标识符。

False
encoding str

文件的编码方式。 默认是 utf-8。

'utf-8'
errors str

如何处理编码和解码错误, 请参阅 https://docs.python.org/3/library/functions.html#open

'ignore'
raise_on_error bool

如果无法读取文件,是否引发错误。

False
fs Optional[AbstractFileSystem]

要使用的文件系统。默认使用本地文件系统。可更改为使用任何远程文件系统

None

返回:

类型 描述
list[文档]

List[Document]: 已加载的文档

workflows/handler.py 中的源代码llama_index/core/readers/file/base.py
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
@staticmethod
def load_file(
    input_file: Path | PurePosixPath,
    file_metadata: Callable[[str], dict],
    file_extractor: dict[str, BaseReader],
    filename_as_id: bool = False,
    encoding: str = "utf-8",
    errors: str = "ignore",
    raise_on_error: bool = False,
    fs: fsspec.AbstractFileSystem | None = None,
) -> list[Document]:
    """
    Static method for loading file.

    NOTE: necessarily as a static method for parallel processing.

    Args:
        input_file (Path): File path to read
        file_metadata ([Callable[[str], Dict]]): A function that takes
            in a filename and returns a Dict of metadata for the Document.
        file_extractor (Dict[str, BaseReader]): A mapping of file
            extension to a BaseReader class that specifies how to convert that file
            to text.
        filename_as_id (bool): Whether to use the filename as the document id.
        encoding (str): Encoding of the files.
            Default is utf-8.
        errors (str): how encoding and decoding errors are to be handled,
            see https://docs.python.org/3/library/functions.html#open
        raise_on_error (bool): Whether to raise an error if a file cannot be read.
        fs (Optional[fsspec.AbstractFileSystem]): File system to use. Defaults
            to using the local file system. Can be changed to use any remote file system

    Returns:
        List[Document]: loaded documents

    """
    # TODO: make this less redundant
    default_file_reader_cls = SimpleDirectoryReader.supported_suffix_fn()
    default_file_reader_suffix = list(default_file_reader_cls.keys())
    metadata: dict | None = None
    documents: list[Document] = []

    if file_metadata is not None:
        metadata = file_metadata(str(input_file))

    file_suffix = input_file.suffix.lower()
    if file_suffix in default_file_reader_suffix or file_suffix in file_extractor:
        # use file readers
        if file_suffix not in file_extractor:
            # instantiate file reader if not already
            reader_cls = default_file_reader_cls[file_suffix]
            file_extractor[file_suffix] = reader_cls()
        reader = file_extractor[file_suffix]

        # load data -- catch all errors except for ImportError
        try:
            kwargs: dict[str, Any] = {"extra_info": metadata}
            if fs and not is_default_fs(fs):
                kwargs["fs"] = fs
            docs = reader.load_data(input_file, **kwargs)
        except ImportError as e:
            # ensure that ImportError is raised so user knows
            # about missing dependencies
            raise ImportError(str(e))
        except Exception as e:
            if raise_on_error:
                raise Exception("Error loading file") from e
            # otherwise, just skip the file and report the error
            print(
                f"Failed to load file {input_file} with error: {e}. Skipping...",
                flush=True,
            )
            return []

        # iterate over docs if needed
        if filename_as_id:
            for i, doc in enumerate(docs):
                doc.id_ = f"{input_file!s}_part_{i}"

        documents.extend(docs)
    else:
        # do standard read
        fs = fs or get_default_fs()
        with fs.open(input_file, errors=errors, encoding=encoding) as f:
            data = cast(bytes, f.read()).decode(encoding, errors=errors)

        doc = Document(text=data, metadata=metadata or {})  # type: ignore
        if filename_as_id:
            doc.id_ = str(input_file)

        documents.append(doc)

    return documents

aload_file async staticmethod #

aload_file(input_file: Path | PurePosixPath, file_metadata: Callable[[str], dict], file_extractor: dict[str, BaseReader], filename_as_id: bool = False, encoding: str = 'utf-8', errors: str = 'ignore', raise_on_error: bool = False, fs: AbstractFileSystem | None = None) -> list[文档]

异步加载文件。

workflows/handler.py 中的源代码llama_index/core/readers/file/base.py
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
@staticmethod
async def aload_file(
    input_file: Path | PurePosixPath,
    file_metadata: Callable[[str], dict],
    file_extractor: dict[str, BaseReader],
    filename_as_id: bool = False,
    encoding: str = "utf-8",
    errors: str = "ignore",
    raise_on_error: bool = False,
    fs: fsspec.AbstractFileSystem | None = None,
) -> list[Document]:
    """Load file asynchronously."""
    # TODO: make this less redundant
    default_file_reader_cls = SimpleDirectoryReader.supported_suffix_fn()
    default_file_reader_suffix = list(default_file_reader_cls.keys())
    metadata: dict | None = None
    documents: list[Document] = []

    if file_metadata is not None:
        metadata = file_metadata(str(input_file))

    file_suffix = input_file.suffix.lower()
    if file_suffix in default_file_reader_suffix or file_suffix in file_extractor:
        # use file readers
        if file_suffix not in file_extractor:
            # instantiate file reader if not already
            reader_cls = default_file_reader_cls[file_suffix]
            file_extractor[file_suffix] = reader_cls()
        reader = file_extractor[file_suffix]

        # load data -- catch all errors except for ImportError
        try:
            kwargs: dict[str, Any] = {"extra_info": metadata}
            if fs and not is_default_fs(fs):
                kwargs["fs"] = fs
            docs = await reader.aload_data(input_file, **kwargs)
        except ImportError as e:
            # ensure that ImportError is raised so user knows
            # about missing dependencies
            raise ImportError(str(e))
        except Exception as e:
            if raise_on_error:
                raise
            # otherwise, just skip the file and report the error
            print(
                f"Failed to load file {input_file} with error: {e}. Skipping...",
                flush=True,
            )
            return []

        # iterate over docs if needed
        if filename_as_id:
            for i, doc in enumerate(docs):
                doc.id_ = f"{input_file!s}_part_{i}"

        documents.extend(docs)
    else:
        # do standard read
        fs = fs or get_default_fs()
        with fs.open(input_file, errors=errors, encoding=encoding) as f:
            data = cast(bytes, f.read()).decode(encoding, errors=errors)

        doc = Document(text=data, metadata=metadata or {})  # type: ignore
        if filename_as_id:
            doc.id_ = str(input_file)

        documents.append(doc)

    return documents

load_data #

load_data(show_progress: bool = False, num_workers: int | None = None, fs: AbstractFileSystem | None = None) -> list[文档]

从输入目录加载数据。

参数:

名称 类型 描述 默认
show_progress bool

是否显示tqdm进度条。默认为False。

False
num_workers Optional[int]

用于并行化数据加载的工作进程数量。

None
fs Optional[AbstractFileSystem]

要使用的文件系统。如果在构造函数中指定了fs,它将覆盖此处的fs参数。

None

返回:

类型 描述
list[文档]

List[Document]: 文档列表。

workflows/handler.py 中的源代码llama_index/core/readers/file/base.py
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
def load_data(
    self,
    show_progress: bool = False,
    num_workers: int | None = None,
    fs: fsspec.AbstractFileSystem | None = None,
) -> list[Document]:
    """
    Load data from the input directory.

    Args:
        show_progress (bool): Whether to show tqdm progress bars. Defaults to False.
        num_workers  (Optional[int]): Number of workers to parallelize data-loading over.
        fs (Optional[fsspec.AbstractFileSystem]): File system to use. If fs was specified
            in the constructor, it will override the fs parameter here.

    Returns:
        List[Document]: A list of documents.

    """
    documents = []

    fs = fs or self.fs
    load_file_with_args = partial(
        SimpleDirectoryReader.load_file,
        file_metadata=self.file_metadata,
        file_extractor=self.file_extractor,
        filename_as_id=self.filename_as_id,
        encoding=self.encoding,
        errors=self.errors,
        raise_on_error=self.raise_on_error,
        fs=fs,
    )

    if num_workers and num_workers > 1:
        num_cpus = multiprocessing.cpu_count()
        if num_workers > num_cpus:
            warnings.warn(
                "Specified num_workers exceed number of CPUs in the system. "
                "Setting `num_workers` down to the maximum CPU count."
            )
            num_workers = num_cpus

        with multiprocessing.get_context("spawn").Pool(num_workers) as pool:
            map_iterator = cast(
                Iterable[list[Document]],
                get_tqdm_iterable(
                    pool.imap(load_file_with_args, self.input_files),
                    show_progress=show_progress,
                    desc="Loading files",
                    total=len(self.input_files),
                ),
            )
            for result in map_iterator:
                documents.extend(result)

    else:
        files_to_process = cast(
            list[Union[Path, PurePosixPath]],
            get_tqdm_iterable(
                self.input_files,
                show_progress=show_progress,
                desc="Loading files",
            ),
        )
        for input_file in files_to_process:
            documents.extend(load_file_with_args(input_file))

    return self._exclude_metadata(documents)

aload_data async #

aload_data(show_progress: bool = False, num_workers: int | None = None, fs: AbstractFileSystem | None = None) -> list[文档]

从输入目录加载数据。

参数:

名称 类型 描述 默认
show_progress bool

是否显示tqdm进度条。默认为False。

False
num_workers Optional[int]

用于并行化数据加载的工作进程数量。

None
fs Optional[AbstractFileSystem]

要使用的文件系统。如果在构造函数中指定了fs,它将覆盖此处的fs参数。

None

返回:

类型 描述
list[文档]

List[Document]: 文档列表。

workflows/handler.py 中的源代码llama_index/core/readers/file/base.py
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
async def aload_data(
    self,
    show_progress: bool = False,
    num_workers: int | None = None,
    fs: fsspec.AbstractFileSystem | None = None,
) -> list[Document]:
    """
    Load data from the input directory.

    Args:
        show_progress (bool): Whether to show tqdm progress bars. Defaults to False.
        num_workers  (Optional[int]): Number of workers to parallelize data-loading over.
        fs (Optional[fsspec.AbstractFileSystem]): File system to use. If fs was specified
            in the constructor, it will override the fs parameter here.

    Returns:
        List[Document]: A list of documents.

    """
    files_to_process = self.input_files
    fs = fs or self.fs

    coroutines = [
        SimpleDirectoryReader.aload_file(
            input_file,
            self.file_metadata,
            self.file_extractor,
            self.filename_as_id,
            self.encoding,
            self.errors,
            self.raise_on_error,
            fs,
        )
        for input_file in files_to_process
    ]

    if num_workers:
        document_lists = await run_jobs(
            coroutines, show_progress=show_progress, workers=num_workers
        )
    elif show_progress:
        _asyncio = get_asyncio_module(show_progress=show_progress)
        document_lists = await _asyncio.gather(*coroutines)
    else:
        document_lists = await asyncio.gather(*coroutines)
    documents = [doc for doc_list in document_lists for doc in doc_list]

    return self._exclude_metadata(documents)

iter_data #

iter_data(show_progress: bool = False) -> Generator[list[文档], Any, Any]

从输入目录迭代加载数据。

参数:

名称 类型 描述 默认
show_progress bool

是否显示tqdm进度条。默认为False。

False

返回:

类型 描述
Any

Generator[List[Document]]: 文档列表。

workflows/handler.py 中的源代码llama_index/core/readers/file/base.py
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
def iter_data(
    self, show_progress: bool = False
) -> Generator[list[Document], Any, Any]:
    """
    Load data iteratively from the input directory.

    Args:
        show_progress (bool): Whether to show tqdm progress bars. Defaults to False.

    Returns:
        Generator[List[Document]]: A list of documents.

    """
    files_to_process = cast(
        list[Union[Path, PurePosixPath]],
        get_tqdm_iterable(
            self.input_files,
            show_progress=show_progress,
            desc="Loading files",
        ),
    )
    for input_file in files_to_process:
        documents = SimpleDirectoryReader.load_file(
            input_file=input_file,
            file_metadata=self.file_metadata,
            file_extractor=self.file_extractor,
            filename_as_id=self.filename_as_id,
            encoding=self.encoding,
            errors=self.errors,
            raise_on_error=self.raise_on_error,
            fs=self.fs,
        )

        documents = self._exclude_metadata(documents)

        if len(documents) > 0:
            yield documents

文档 #

基类:Event节点

数据文档的通用接口。

本文档连接至数据源。

workflows/handler.py 中的源代码llama_index/core/schema.py
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
class Document(Node):
    """
    Generic interface for a data document.

    This document connects to data sources.
    """

    def __init__(self, **data: Any) -> None:
        """
        Keeps backward compatibility with old 'Document' versions.

        If 'text' was passed, store it in 'text_resource'.
        If 'doc_id' was passed, store it in 'id_'.
        If 'extra_info' was passed, store it in 'metadata'.
        """
        if "doc_id" in data:
            value = data.pop("doc_id")
            if "id_" in data:
                msg = "'doc_id' is deprecated and 'id_' will be used instead"
                logging.warning(msg)
            else:
                data["id_"] = value

        if "extra_info" in data:
            value = data.pop("extra_info")
            if "metadata" in data:
                msg = "'extra_info' is deprecated and 'metadata' will be used instead"
                logging.warning(msg)
            else:
                data["metadata"] = value

        if data.get("text"):
            text = data.pop("text")
            if "text_resource" in data:
                text_resource = (
                    data["text_resource"]
                    if isinstance(data["text_resource"], MediaResource)
                    else MediaResource.model_validate(data["text_resource"])
                )
                if (text_resource.text or "").strip() != text.strip():
                    msg = (
                        "'text' is deprecated and 'text_resource' will be used instead"
                    )
                    logging.warning(msg)
            else:
                data["text_resource"] = MediaResource(text=text)

        super().__init__(**data)

    @model_serializer(mode="wrap")
    def custom_model_dump(
        self, handler: SerializerFunctionWrapHandler, info: SerializationInfo
    ) -> Dict[str, Any]:
        """For full backward compatibility with the text field, we customize the model serializer."""
        data = super().custom_model_dump(handler, info)
        exclude_set = set(info.exclude or [])
        if "text" not in exclude_set:
            data["text"] = self.text
        return data

    @property
    def text(self) -> str:
        """Provided for backward compatibility, it returns the content of text_resource."""
        return self.get_content()

    @classmethod
    def get_type(cls) -> str:
        """Get Document type."""
        return ObjectType.DOCUMENT

    @property
    def doc_id(self) -> str:
        """Get document ID."""
        return self.id_

    @doc_id.setter
    def doc_id(self, id_: str) -> None:
        self.id_ = id_

    def __str__(self) -> str:
        source_text_truncated = truncate_text(
            self.get_content().strip(), TRUNCATE_LENGTH
        )
        source_text_wrapped = textwrap.fill(
            f"Text: {source_text_truncated}\n", width=WRAP_WIDTH
        )
        return f"Doc ID: {self.doc_id}\n{source_text_wrapped}"

    @deprecated(
        version="0.12.2",
        reason="'get_doc_id' is deprecated, access the 'id_' property instead.",
    )
    def get_doc_id(self) -> str:  # pragma: nocover
        return self.id_

    def to_langchain_format(self) -> LCDocument:
        """Convert struct to LangChain document format."""
        from llama_index.core.bridge.langchain import (
            Document as LCDocument,  # type: ignore
        )

        metadata = self.metadata or {}
        return LCDocument(page_content=self.text, metadata=metadata, id=self.id_)

    @classmethod
    def from_langchain_format(cls, doc: LCDocument) -> Document:
        """Convert struct from LangChain document format."""
        if doc.id:
            return cls(text=doc.page_content, metadata=doc.metadata, id_=doc.id)
        return cls(text=doc.page_content, metadata=doc.metadata)

    def to_haystack_format(self) -> HaystackDocument:
        """Convert struct to Haystack document format."""
        from haystack import Document as HaystackDocument  # type: ignore

        return HaystackDocument(
            content=self.text, meta=self.metadata, embedding=self.embedding, id=self.id_
        )

    @classmethod
    def from_haystack_format(cls, doc: HaystackDocument) -> Document:
        """Convert struct from Haystack document format."""
        return cls(
            text=doc.content, metadata=doc.meta, embedding=doc.embedding, id_=doc.id
        )

    def to_embedchain_format(self) -> Dict[str, Any]:
        """Convert struct to EmbedChain document format."""
        return {
            "doc_id": self.id_,
            "data": {"content": self.text, "meta_data": self.metadata},
        }

    @classmethod
    def from_embedchain_format(cls, doc: Dict[str, Any]) -> Document:
        """Convert struct from EmbedChain document format."""
        return cls(
            text=doc["data"]["content"],
            metadata=doc["data"]["meta_data"],
            id_=doc["doc_id"],
        )

    def to_semantic_kernel_format(self) -> MemoryRecord:
        """Convert struct to Semantic Kernel document format."""
        import numpy as np
        from semantic_kernel.memory.memory_record import MemoryRecord  # type: ignore

        return MemoryRecord(
            id=self.id_,
            text=self.text,
            additional_metadata=self.get_metadata_str(),
            embedding=np.array(self.embedding) if self.embedding else None,
        )

    @classmethod
    def from_semantic_kernel_format(cls, doc: MemoryRecord) -> Document:
        """Convert struct from Semantic Kernel document format."""
        return cls(
            text=doc._text,
            metadata={"additional_metadata": doc._additional_metadata},
            embedding=doc._embedding.tolist() if doc._embedding is not None else None,
            id_=doc._id,
        )

    def to_vectorflow(self, client: Any) -> None:
        """Send a document to vectorflow, since they don't have a document object."""
        # write document to temp file
        import tempfile

        with tempfile.NamedTemporaryFile() as f:
            f.write(self.text.encode("utf-8"))
            f.flush()
            client.embed(f.name)

    @classmethod
    def example(cls) -> Document:
        return Document(
            text=SAMPLE_TEXT,
            metadata={"filename": "README.md", "category": "codebase"},
        )

    @classmethod
    def class_name(cls) -> str:
        return "Document"

    def to_cloud_document(self) -> CloudDocument:
        """Convert to LlamaCloud document type."""
        from llama_cloud.types.cloud_document import CloudDocument  # type: ignore

        return CloudDocument(
            text=self.text,
            metadata=self.metadata,
            excluded_embed_metadata_keys=self.excluded_embed_metadata_keys,
            excluded_llm_metadata_keys=self.excluded_llm_metadata_keys,
            id=self.id_,
        )

    @classmethod
    def from_cloud_document(
        cls,
        doc: CloudDocument,
    ) -> Document:
        """Convert from LlamaCloud document type."""
        return Document(
            text=doc.text,
            metadata=doc.metadata,
            excluded_embed_metadata_keys=doc.excluded_embed_metadata_keys,
            excluded_llm_metadata_keys=doc.excluded_llm_metadata_keys,
            id_=doc.id,
        )

文本 property #

text: str

为向后兼容性提供,它返回 text_resource 的内容。

doc_id property writable #

doc_id: str

获取文档ID。

custom_model_dump #

custom_model_dump(handler: SerializerFunctionWrapHandler, info: SerializationInfo) -> Dict[str, Any]

为了与文本字段完全向后兼容,我们自定义了模型序列化器。

workflows/handler.py 中的源代码llama_index/core/schema.py
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
@model_serializer(mode="wrap")
def custom_model_dump(
    self, handler: SerializerFunctionWrapHandler, info: SerializationInfo
) -> Dict[str, Any]:
    """For full backward compatibility with the text field, we customize the model serializer."""
    data = super().custom_model_dump(handler, info)
    exclude_set = set(info.exclude or [])
    if "text" not in exclude_set:
        data["text"] = self.text
    return data

get_type classmethod #

get_type() -> str

获取文档类型。

workflows/handler.py 中的源代码llama_index/core/schema.py
1077
1078
1079
1080
@classmethod
def get_type(cls) -> str:
    """Get Document type."""
    return ObjectType.DOCUMENT

to_langchain_format #

to_langchain_format() -> Document

将结构体转换为 LangChain 文档格式。

workflows/handler.py 中的源代码llama_index/core/schema.py
1107
1108
1109
1110
1111
1112
1113
1114
def to_langchain_format(self) -> LCDocument:
    """Convert struct to LangChain document format."""
    from llama_index.core.bridge.langchain import (
        Document as LCDocument,  # type: ignore
    )

    metadata = self.metadata or {}
    return LCDocument(page_content=self.text, metadata=metadata, id=self.id_)

from_langchain_format classmethod #

from_langchain_format(doc: Document) -> 文档

将结构从LangChain文档格式转换。

workflows/handler.py 中的源代码llama_index/core/schema.py
1116
1117
1118
1119
1120
1121
@classmethod
def from_langchain_format(cls, doc: LCDocument) -> Document:
    """Convert struct from LangChain document format."""
    if doc.id:
        return cls(text=doc.page_content, metadata=doc.metadata, id_=doc.id)
    return cls(text=doc.page_content, metadata=doc.metadata)

to_haystack_format #

to_haystack_format() -> Document

将结构体转换为Haystack文档格式。

workflows/handler.py 中的源代码llama_index/core/schema.py
1123
1124
1125
1126
1127
1128
1129
def to_haystack_format(self) -> HaystackDocument:
    """Convert struct to Haystack document format."""
    from haystack import Document as HaystackDocument  # type: ignore

    return HaystackDocument(
        content=self.text, meta=self.metadata, embedding=self.embedding, id=self.id_
    )

from_haystack_format classmethod #

from_haystack_format(doc: Document) -> 文档

将结构从Haystack文档格式转换。

workflows/handler.py 中的源代码llama_index/core/schema.py
1131
1132
1133
1134
1135
1136
@classmethod
def from_haystack_format(cls, doc: HaystackDocument) -> Document:
    """Convert struct from Haystack document format."""
    return cls(
        text=doc.content, metadata=doc.meta, embedding=doc.embedding, id_=doc.id
    )

to_embedchain_format #

to_embedchain_format() -> Dict[str, Any]

将结构体转换为EmbedChain文档格式。

workflows/handler.py 中的源代码llama_index/core/schema.py
1138
1139
1140
1141
1142
1143
def to_embedchain_format(self) -> Dict[str, Any]:
    """Convert struct to EmbedChain document format."""
    return {
        "doc_id": self.id_,
        "data": {"content": self.text, "meta_data": self.metadata},
    }

from_embedchain_format classmethod #

from_embedchain_format(doc: Dict[str, Any]) -> 文档

将结构从EmbedChain文档格式转换。

workflows/handler.py 中的源代码llama_index/core/schema.py
1145
1146
1147
1148
1149
1150
1151
1152
@classmethod
def from_embedchain_format(cls, doc: Dict[str, Any]) -> Document:
    """Convert struct from EmbedChain document format."""
    return cls(
        text=doc["data"]["content"],
        metadata=doc["data"]["meta_data"],
        id_=doc["doc_id"],
    )

to_semantic_kernel_format #

to_semantic_kernel_format() -> MemoryRecord

将结构体转换为语义内核文档格式。

workflows/handler.py 中的源代码llama_index/core/schema.py
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
def to_semantic_kernel_format(self) -> MemoryRecord:
    """Convert struct to Semantic Kernel document format."""
    import numpy as np
    from semantic_kernel.memory.memory_record import MemoryRecord  # type: ignore

    return MemoryRecord(
        id=self.id_,
        text=self.text,
        additional_metadata=self.get_metadata_str(),
        embedding=np.array(self.embedding) if self.embedding else None,
    )

from_semantic_kernel_format classmethod #

from_semantic_kernel_format(doc: MemoryRecord) -> 文档

将结构从语义内核文档格式转换。

workflows/handler.py 中的源代码llama_index/core/schema.py
1166
1167
1168
1169
1170
1171
1172
1173
1174
@classmethod
def from_semantic_kernel_format(cls, doc: MemoryRecord) -> Document:
    """Convert struct from Semantic Kernel document format."""
    return cls(
        text=doc._text,
        metadata={"additional_metadata": doc._additional_metadata},
        embedding=doc._embedding.tolist() if doc._embedding is not None else None,
        id_=doc._id,
    )

to_vectorflow #

to_vectorflow(client: Any) -> None

发送文档到 vectorflow,因为他们没有文档对象。

workflows/handler.py 中的源代码llama_index/core/schema.py
1176
1177
1178
1179
1180
1181
1182
1183
1184
def to_vectorflow(self, client: Any) -> None:
    """Send a document to vectorflow, since they don't have a document object."""
    # write document to temp file
    import tempfile

    with tempfile.NamedTemporaryFile() as f:
        f.write(self.text.encode("utf-8"))
        f.flush()
        client.embed(f.name)

to_cloud_document #

to_cloud_document() -> CloudDocument

转换为LlamaCloud文档类型。

workflows/handler.py 中的源代码llama_index/core/schema.py
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
def to_cloud_document(self) -> CloudDocument:
    """Convert to LlamaCloud document type."""
    from llama_cloud.types.cloud_document import CloudDocument  # type: ignore

    return CloudDocument(
        text=self.text,
        metadata=self.metadata,
        excluded_embed_metadata_keys=self.excluded_embed_metadata_keys,
        excluded_llm_metadata_keys=self.excluded_llm_metadata_keys,
        id=self.id_,
    )

from_cloud_document classmethod #

from_cloud_document(doc: CloudDocument) -> 文档

从LlamaCloud文档类型转换。

workflows/handler.py 中的源代码llama_index/core/schema.py
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
@classmethod
def from_cloud_document(
    cls,
    doc: CloudDocument,
) -> Document:
    """Convert from LlamaCloud document type."""
    return Document(
        text=doc.text,
        metadata=doc.metadata,
        excluded_embed_metadata_keys=doc.excluded_embed_metadata_keys,
        excluded_llm_metadata_keys=doc.excluded_llm_metadata_keys,
        id_=doc.id,
    )

查询包 dataclass #

基类:EventDataClassJsonMixin

查询捆绑包。

该数据类包含原始查询字符串及相关的转换操作。

参数:

名称 类型 描述 默认
query_str str

原始用户指定的查询字符串。 目前所有非基于嵌入的查询都使用此功能。

required
custom_embedding_strs list[str]

用于嵌入查询的字符串列表。 目前所有基于嵌入的查询都在使用此列表。

None
embedding list[float]

查询的存储嵌入。

None
image_path str
None
workflows/handler.py 中的源代码llama_index/core/schema.py
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
@dataclass
class QueryBundle(DataClassJsonMixin):
    """
    Query bundle.

    This dataclass contains the original query string and associated transformations.

    Args:
        query_str (str): the original user-specified query string.
            This is currently used by all non embedding-based queries.
        custom_embedding_strs (list[str]): list of strings used for embedding the query.
            This is currently used by all embedding-based queries.
        embedding (list[float]): the stored embedding for the query.

    """

    query_str: str
    # using single image path as query input
    image_path: Optional[str] = None
    custom_embedding_strs: Optional[List[str]] = None
    embedding: Optional[List[float]] = None

    @property
    def embedding_strs(self) -> List[str]:
        """Use custom embedding strs if specified, otherwise use query str."""
        if self.custom_embedding_strs is None:
            if len(self.query_str) == 0:
                return []
            return [self.query_str]
        else:
            return self.custom_embedding_strs

    @property
    def embedding_image(self) -> List[ImageType]:
        """Use image path for image retrieval."""
        if self.image_path is None:
            return []
        return [self.image_path]

    def __str__(self) -> str:
        """Convert to string representation."""
        return self.query_str

embedding_strs property #

embedding_strs: List[str]

如果指定了自定义嵌入字符串,则使用自定义嵌入字符串,否则使用查询字符串。

embedding_image property #

embedding_image: List[ImageType]

使用图像路径进行图像检索。

服务上下文 #

服务上下文容器。

注意:已弃用,请改用 llama_index.settings.Settings 或将模块传递到本地函数/方法/接口。

workflows/handler.py 中的源代码llama_index/core/service_context.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class ServiceContext:
    """
    Service Context container.

    NOTE: Deprecated, use llama_index.settings.Settings instead or pass in
    modules to local functions/methods/interfaces.

    """

    def __init__(self, **kwargs: Any) -> None:
        raise ValueError(
            "ServiceContext is deprecated. Use llama_index.settings.Settings instead, "
            "or pass in modules to local functions/methods/interfaces.\n"
            "See the docs for updated usage/migration: \n"
            "https://docs.llamaindex.ai/en/stable/module_guides/supporting_modules/service_context_migration/"
        )

    @classmethod
    def from_defaults(
        cls,
        **kwargs: Any,
    ) -> "ServiceContext":
        """
        Create a ServiceContext from defaults.

        NOTE: Deprecated, use llama_index.settings.Settings instead or pass in
        modules to local functions/methods/interfaces.

        """
        raise ValueError(
            "ServiceContext is deprecated. Use llama_index.settings.Settings instead, "
            "or pass in modules to local functions/methods/interfaces.\n"
            "See the docs for updated usage/migration: \n"
            "https://docs.llamaindex.ai/en/stable/module_guides/supporting_modules/service_context_migration/"
        )

from_defaults classmethod #

from_defaults(**kwargs: Any) -> ServiceContext

从默认值创建 ServiceContext。

注意:已弃用,请改用 llama_index.settings.Settings 或将模块传递到本地函数/方法/接口中。

workflows/handler.py 中的源代码llama_index/core/service_context.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@classmethod
def from_defaults(
    cls,
    **kwargs: Any,
) -> "ServiceContext":
    """
    Create a ServiceContext from defaults.

    NOTE: Deprecated, use llama_index.settings.Settings instead or pass in
    modules to local functions/methods/interfaces.

    """
    raise ValueError(
        "ServiceContext is deprecated. Use llama_index.settings.Settings instead, "
        "or pass in modules to local functions/methods/interfaces.\n"
        "See the docs for updated usage/migration: \n"
        "https://docs.llamaindex.ai/en/stable/module_guides/supporting_modules/service_context_migration/"
    )

存储上下文 dataclass #

存储上下文。

存储上下文容器是一个用于存储节点、索引和向量的实用容器。它包含以下内容: - docstore: BaseDocumentStore - index_store: BaseIndexStore - vector_store: BasePydanticVectorStore - graph_store: GraphStore - property_graph_store: PropertyGraphStore(延迟初始化)

参数:

名称 类型 描述 默认
docstore BaseDocumentStore
required
index_store BaseIndexStore
required
vector_stores Dict[str, Annotated[BasePydanticVectorStore, SerializeAsAny]]
required
graph_store GraphStore
required
property_graph_store PropertyGraphStore | None
None
workflows/handler.py 中的源代码llama_index/core/storage/storage_context.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
@dataclass
class StorageContext:
    """
    Storage context.

    The storage context container is a utility container for storing nodes,
    indices, and vectors. It contains the following:
    - docstore: BaseDocumentStore
    - index_store: BaseIndexStore
    - vector_store: BasePydanticVectorStore
    - graph_store: GraphStore
    - property_graph_store: PropertyGraphStore (lazily initialized)

    """

    docstore: BaseDocumentStore
    index_store: BaseIndexStore
    vector_stores: Dict[str, SerializeAsAny[BasePydanticVectorStore]]
    graph_store: GraphStore
    property_graph_store: Optional[PropertyGraphStore] = None

    @classmethod
    def from_defaults(
        cls,
        docstore: Optional[BaseDocumentStore] = None,
        index_store: Optional[BaseIndexStore] = None,
        vector_store: Optional[BasePydanticVectorStore] = None,
        image_store: Optional[BasePydanticVectorStore] = None,
        vector_stores: Optional[Dict[str, BasePydanticVectorStore]] = None,
        graph_store: Optional[GraphStore] = None,
        property_graph_store: Optional[PropertyGraphStore] = None,
        persist_dir: Optional[str] = None,
        fs: Optional[fsspec.AbstractFileSystem] = None,
    ) -> "StorageContext":
        """
        Create a StorageContext from defaults.

        Args:
            docstore (Optional[BaseDocumentStore]): document store
            index_store (Optional[BaseIndexStore]): index store
            vector_store (Optional[BasePydanticVectorStore]): vector store
            graph_store (Optional[GraphStore]): graph store
            image_store (Optional[BasePydanticVectorStore]): image store

        """
        if persist_dir is None:
            docstore = docstore or SimpleDocumentStore()
            index_store = index_store or SimpleIndexStore()
            graph_store = graph_store or SimpleGraphStore()
            image_store = image_store or SimpleVectorStore()

            if vector_store:
                vector_stores = {DEFAULT_VECTOR_STORE: vector_store}
            else:
                vector_stores = vector_stores or {
                    DEFAULT_VECTOR_STORE: SimpleVectorStore()
                }
            if image_store:
                # append image store to vector stores
                vector_stores[IMAGE_VECTOR_STORE_NAMESPACE] = image_store
        else:
            docstore = docstore or SimpleDocumentStore.from_persist_dir(
                persist_dir, fs=fs
            )
            index_store = index_store or SimpleIndexStore.from_persist_dir(
                persist_dir, fs=fs
            )
            graph_store = graph_store or SimpleGraphStore.from_persist_dir(
                persist_dir, fs=fs
            )

            try:
                property_graph_store = (
                    property_graph_store
                    or SimplePropertyGraphStore.from_persist_dir(persist_dir, fs=fs)
                )
            except FileNotFoundError:
                property_graph_store = None

            if vector_store:
                vector_stores = {DEFAULT_VECTOR_STORE: vector_store}
            elif vector_stores:
                vector_stores = vector_stores
            else:
                vector_stores = SimpleVectorStore.from_namespaced_persist_dir(
                    persist_dir, fs=fs
                )
            if image_store:
                # append image store to vector stores
                vector_stores[IMAGE_VECTOR_STORE_NAMESPACE] = image_store  # type: ignore

        return cls(
            docstore=docstore,
            index_store=index_store,
            vector_stores=vector_stores,  # type: ignore
            graph_store=graph_store,
            property_graph_store=property_graph_store,
        )

    def persist(
        self,
        persist_dir: Union[str, os.PathLike] = DEFAULT_PERSIST_DIR,
        docstore_fname: str = DOCSTORE_FNAME,
        index_store_fname: str = INDEX_STORE_FNAME,
        vector_store_fname: str = VECTOR_STORE_FNAME,
        image_store_fname: str = IMAGE_STORE_FNAME,
        graph_store_fname: str = GRAPH_STORE_FNAME,
        pg_graph_store_fname: str = PG_FNAME,
        fs: Optional[fsspec.AbstractFileSystem] = None,
    ) -> None:
        """
        Persist the storage context.

        Args:
            persist_dir (str): directory to persist the storage context

        """
        if fs is not None:
            persist_dir = str(persist_dir)  # NOTE: doesn't support Windows here
            docstore_path = concat_dirs(persist_dir, docstore_fname)
            index_store_path = concat_dirs(persist_dir, index_store_fname)
            graph_store_path = concat_dirs(persist_dir, graph_store_fname)
            pg_graph_store_path = concat_dirs(persist_dir, pg_graph_store_fname)
        else:
            persist_dir = Path(persist_dir)
            docstore_path = str(persist_dir / docstore_fname)
            index_store_path = str(persist_dir / index_store_fname)
            graph_store_path = str(persist_dir / graph_store_fname)
            pg_graph_store_path = str(persist_dir / pg_graph_store_fname)

        self.docstore.persist(persist_path=docstore_path, fs=fs)
        self.index_store.persist(persist_path=index_store_path, fs=fs)
        self.graph_store.persist(persist_path=graph_store_path, fs=fs)

        if self.property_graph_store:
            self.property_graph_store.persist(persist_path=pg_graph_store_path, fs=fs)

        # save each vector store under it's namespace
        for vector_store_name, vector_store in self.vector_stores.items():
            if fs is not None:
                vector_store_path = concat_dirs(
                    str(persist_dir),
                    f"{vector_store_name}{NAMESPACE_SEP}{vector_store_fname}",
                )
            else:
                vector_store_path = str(
                    Path(persist_dir)
                    / f"{vector_store_name}{NAMESPACE_SEP}{vector_store_fname}"
                )

            vector_store.persist(persist_path=vector_store_path, fs=fs)

    def to_dict(self) -> dict:
        all_simple = (
            isinstance(self.docstore, SimpleDocumentStore)
            and isinstance(self.index_store, SimpleIndexStore)
            and isinstance(self.graph_store, SimpleGraphStore)
            and isinstance(
                self.property_graph_store, (SimplePropertyGraphStore, type(None))
            )
            and all(
                isinstance(vs, SimpleVectorStore) for vs in self.vector_stores.values()
            )
        )
        if not all_simple:
            raise ValueError(
                "to_dict only available when using simple doc/index/vector stores"
            )

        assert isinstance(self.docstore, SimpleDocumentStore)
        assert isinstance(self.index_store, SimpleIndexStore)
        assert isinstance(self.graph_store, SimpleGraphStore)
        assert isinstance(
            self.property_graph_store, (SimplePropertyGraphStore, type(None))
        )

        return {
            VECTOR_STORE_KEY: {
                key: vector_store.to_dict()
                for key, vector_store in self.vector_stores.items()
                if isinstance(vector_store, SimpleVectorStore)
            },
            DOC_STORE_KEY: self.docstore.to_dict(),
            INDEX_STORE_KEY: self.index_store.to_dict(),
            GRAPH_STORE_KEY: self.graph_store.to_dict(),
            PG_STORE_KEY: (
                self.property_graph_store.to_dict()
                if self.property_graph_store
                else None
            ),
        }

    @classmethod
    def from_dict(cls, save_dict: dict) -> "StorageContext":
        """Create a StorageContext from dict."""
        docstore = SimpleDocumentStore.from_dict(save_dict[DOC_STORE_KEY])
        index_store = SimpleIndexStore.from_dict(save_dict[INDEX_STORE_KEY])
        graph_store = SimpleGraphStore.from_dict(save_dict[GRAPH_STORE_KEY])
        property_graph_store = (
            SimplePropertyGraphStore.from_dict(save_dict[PG_STORE_KEY])
            if save_dict[PG_STORE_KEY]
            else None
        )

        vector_stores: Dict[str, BasePydanticVectorStore] = {}
        for key, vector_store_dict in save_dict[VECTOR_STORE_KEY].items():
            vector_stores[key] = SimpleVectorStore.from_dict(vector_store_dict)

        return cls(
            docstore=docstore,
            index_store=index_store,
            vector_stores=vector_stores,
            graph_store=graph_store,
            property_graph_store=property_graph_store,
        )

    @property
    def vector_store(self) -> BasePydanticVectorStore:
        """Backwrds compatibility for vector_store property."""
        return self.vector_stores[DEFAULT_VECTOR_STORE]

    def add_vector_store(
        self, vector_store: BasePydanticVectorStore, namespace: str
    ) -> None:
        """Add a vector store to the storage context."""
        self.vector_stores[namespace] = vector_store

vector_store property #

向量存储属性的向后兼容性。

from_defaults classmethod #

from_defaults(docstore: Optional[BaseDocumentStore] = None, index_store: Optional[BaseIndexStore] = None, vector_store: Optional[BasePydanticVectorStore] = None, image_store: Optional[BasePydanticVectorStore] = None, vector_stores: Optional[Dict[str, BasePydanticVectorStore]] = None, graph_store: Optional[GraphStore] = None, property_graph_store: Optional[PropertyGraphStore] = None, persist_dir: Optional[str] = None, fs: Optional[AbstractFileSystem] = None) -> StorageContext

从默认值创建一个存储上下文。

参数:

名称 类型 描述 默认
docstore Optional[BaseDocumentStore]

文档存储

None
index_store Optional[BaseIndexStore]

索引存储

None
vector_store Optional[BasePydanticVectorStore]

向量存储

None
graph_store Optional[GraphStore]

图存储

None
image_store Optional[BasePydanticVectorStore]

图像存储

None
workflows/handler.py 中的源代码llama_index/core/storage/storage_context.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
@classmethod
def from_defaults(
    cls,
    docstore: Optional[BaseDocumentStore] = None,
    index_store: Optional[BaseIndexStore] = None,
    vector_store: Optional[BasePydanticVectorStore] = None,
    image_store: Optional[BasePydanticVectorStore] = None,
    vector_stores: Optional[Dict[str, BasePydanticVectorStore]] = None,
    graph_store: Optional[GraphStore] = None,
    property_graph_store: Optional[PropertyGraphStore] = None,
    persist_dir: Optional[str] = None,
    fs: Optional[fsspec.AbstractFileSystem] = None,
) -> "StorageContext":
    """
    Create a StorageContext from defaults.

    Args:
        docstore (Optional[BaseDocumentStore]): document store
        index_store (Optional[BaseIndexStore]): index store
        vector_store (Optional[BasePydanticVectorStore]): vector store
        graph_store (Optional[GraphStore]): graph store
        image_store (Optional[BasePydanticVectorStore]): image store

    """
    if persist_dir is None:
        docstore = docstore or SimpleDocumentStore()
        index_store = index_store or SimpleIndexStore()
        graph_store = graph_store or SimpleGraphStore()
        image_store = image_store or SimpleVectorStore()

        if vector_store:
            vector_stores = {DEFAULT_VECTOR_STORE: vector_store}
        else:
            vector_stores = vector_stores or {
                DEFAULT_VECTOR_STORE: SimpleVectorStore()
            }
        if image_store:
            # append image store to vector stores
            vector_stores[IMAGE_VECTOR_STORE_NAMESPACE] = image_store
    else:
        docstore = docstore or SimpleDocumentStore.from_persist_dir(
            persist_dir, fs=fs
        )
        index_store = index_store or SimpleIndexStore.from_persist_dir(
            persist_dir, fs=fs
        )
        graph_store = graph_store or SimpleGraphStore.from_persist_dir(
            persist_dir, fs=fs
        )

        try:
            property_graph_store = (
                property_graph_store
                or SimplePropertyGraphStore.from_persist_dir(persist_dir, fs=fs)
            )
        except FileNotFoundError:
            property_graph_store = None

        if vector_store:
            vector_stores = {DEFAULT_VECTOR_STORE: vector_store}
        elif vector_stores:
            vector_stores = vector_stores
        else:
            vector_stores = SimpleVectorStore.from_namespaced_persist_dir(
                persist_dir, fs=fs
            )
        if image_store:
            # append image store to vector stores
            vector_stores[IMAGE_VECTOR_STORE_NAMESPACE] = image_store  # type: ignore

    return cls(
        docstore=docstore,
        index_store=index_store,
        vector_stores=vector_stores,  # type: ignore
        graph_store=graph_store,
        property_graph_store=property_graph_store,
    )

persist #

persist(persist_dir: Union[str, PathLike] = DEFAULT_PERSIST_DIR, docstore_fname: str = DEFAULT_PERSIST_FNAME, index_store_fname: str = DEFAULT_PERSIST_FNAME, vector_store_fname: str = DEFAULT_PERSIST_FNAME, image_store_fname: str = IMAGE_STORE_FNAME, graph_store_fname: str = DEFAULT_PERSIST_FNAME, pg_graph_store_fname: str = DEFAULT_PG_PERSIST_FNAME, fs: Optional[AbstractFileSystem] = None) -> None

持久化存储上下文。

参数:

名称 类型 描述 默认
persist_dir str

持久化存储上下文的目录

DEFAULT_PERSIST_DIR
workflows/handler.py 中的源代码llama_index/core/storage/storage_context.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def persist(
    self,
    persist_dir: Union[str, os.PathLike] = DEFAULT_PERSIST_DIR,
    docstore_fname: str = DOCSTORE_FNAME,
    index_store_fname: str = INDEX_STORE_FNAME,
    vector_store_fname: str = VECTOR_STORE_FNAME,
    image_store_fname: str = IMAGE_STORE_FNAME,
    graph_store_fname: str = GRAPH_STORE_FNAME,
    pg_graph_store_fname: str = PG_FNAME,
    fs: Optional[fsspec.AbstractFileSystem] = None,
) -> None:
    """
    Persist the storage context.

    Args:
        persist_dir (str): directory to persist the storage context

    """
    if fs is not None:
        persist_dir = str(persist_dir)  # NOTE: doesn't support Windows here
        docstore_path = concat_dirs(persist_dir, docstore_fname)
        index_store_path = concat_dirs(persist_dir, index_store_fname)
        graph_store_path = concat_dirs(persist_dir, graph_store_fname)
        pg_graph_store_path = concat_dirs(persist_dir, pg_graph_store_fname)
    else:
        persist_dir = Path(persist_dir)
        docstore_path = str(persist_dir / docstore_fname)
        index_store_path = str(persist_dir / index_store_fname)
        graph_store_path = str(persist_dir / graph_store_fname)
        pg_graph_store_path = str(persist_dir / pg_graph_store_fname)

    self.docstore.persist(persist_path=docstore_path, fs=fs)
    self.index_store.persist(persist_path=index_store_path, fs=fs)
    self.graph_store.persist(persist_path=graph_store_path, fs=fs)

    if self.property_graph_store:
        self.property_graph_store.persist(persist_path=pg_graph_store_path, fs=fs)

    # save each vector store under it's namespace
    for vector_store_name, vector_store in self.vector_stores.items():
        if fs is not None:
            vector_store_path = concat_dirs(
                str(persist_dir),
                f"{vector_store_name}{NAMESPACE_SEP}{vector_store_fname}",
            )
        else:
            vector_store_path = str(
                Path(persist_dir)
                / f"{vector_store_name}{NAMESPACE_SEP}{vector_store_fname}"
            )

        vector_store.persist(persist_path=vector_store_path, fs=fs)

from_dict classmethod #

from_dict(save_dict: dict) -> StorageContext

从字典创建存储上下文。

workflows/handler.py 中的源代码llama_index/core/storage/storage_context.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@classmethod
def from_dict(cls, save_dict: dict) -> "StorageContext":
    """Create a StorageContext from dict."""
    docstore = SimpleDocumentStore.from_dict(save_dict[DOC_STORE_KEY])
    index_store = SimpleIndexStore.from_dict(save_dict[INDEX_STORE_KEY])
    graph_store = SimpleGraphStore.from_dict(save_dict[GRAPH_STORE_KEY])
    property_graph_store = (
        SimplePropertyGraphStore.from_dict(save_dict[PG_STORE_KEY])
        if save_dict[PG_STORE_KEY]
        else None
    )

    vector_stores: Dict[str, BasePydanticVectorStore] = {}
    for key, vector_store_dict in save_dict[VECTOR_STORE_KEY].items():
        vector_stores[key] = SimpleVectorStore.from_dict(vector_store_dict)

    return cls(
        docstore=docstore,
        index_store=index_store,
        vector_stores=vector_stores,
        graph_store=graph_store,
        property_graph_store=property_graph_store,
    )

add_vector_store #

add_vector_store(vector_store: BasePydanticVectorStore, namespace: str) -> None

向存储上下文添加向量存储。

workflows/handler.py 中的源代码llama_index/core/storage/storage_context.py
273
274
275
276
277
def add_vector_store(
    self, vector_store: BasePydanticVectorStore, namespace: str
) -> None:
    """Add a vector store to the storage context."""
    self.vector_stores[namespace] = vector_store

SQL数据库 #

SQL数据库。

该类提供了一个围绕SQLAlchemy引擎的封装器,用于与SQL数据库进行交互。 它提供了执行SQL命令、向表中插入数据以及检索数据库模式信息的方法。 它还支持可选功能,例如包含或排除特定表、对表信息进行行采样、 在表信息中包含索引以及支持视图。

基于 langchain SQLDatabase。 https://github.com/langchain-ai/langchain/blob/e355606b1100097665207ca259de6dc548d44c78/libs/langchain/langchain/utilities/sql_database.py#L39

参数:

名称 类型 描述 默认
engine Engine

用于数据库操作的 SQLAlchemy 引擎实例。

required
schema Optional[str]

要使用的模式名称(如果有的话)。

None
metadata Optional[MetaData]

要使用的元数据实例(如果有的话)。

None
ignore_tables Optional[List[str]]

要忽略的表名列表。如果设置了此项, include_tables 必须为 None。

None
include_tables Optional[List[str]]

要包含的表名列表。如果设置了此项, ignore_tables 必须为 None。

None
sample_rows_in_table_info int

要包含在表格信息中的样本行数。

3
indexes_in_table_info bool

是否在表信息中包含索引。

False
custom_table_info Optional[dict]

自定义表格信息以供使用。

None
view_support bool

是否支持视图。

False
max_string_length int

要使用的最大字符串长度。

300
workflows/handler.py 中的源代码llama_index/core/utilities/sql_wrapper.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
class SQLDatabase:
    """
    SQL Database.

    This class provides a wrapper around the SQLAlchemy engine to interact with a SQL
    database.
    It provides methods to execute SQL commands, insert data into tables, and retrieve
    information about the database schema.
    It also supports optional features such as including or excluding specific tables,
    sampling rows for table info,
    including indexes in table info, and supporting views.

    Based on langchain SQLDatabase.
    https://github.com/langchain-ai/langchain/blob/e355606b1100097665207ca259de6dc548d44c78/libs/langchain/langchain/utilities/sql_database.py#L39

    Args:
        engine (Engine): The SQLAlchemy engine instance to use for database operations.
        schema (Optional[str]): The name of the schema to use, if any.
        metadata (Optional[MetaData]): The metadata instance to use, if any.
        ignore_tables (Optional[List[str]]): List of table names to ignore. If set,
            include_tables must be None.
        include_tables (Optional[List[str]]): List of table names to include. If set,
            ignore_tables must be None.
        sample_rows_in_table_info (int): The number of sample rows to include in table
            info.
        indexes_in_table_info (bool): Whether to include indexes in table info.
        custom_table_info (Optional[dict]): Custom table info to use.
        view_support (bool): Whether to support views.
        max_string_length (int): The maximum string length to use.

    """

    def __init__(
        self,
        engine: Engine,
        schema: Optional[str] = None,
        metadata: Optional[MetaData] = None,
        ignore_tables: Optional[List[str]] = None,
        include_tables: Optional[List[str]] = None,
        sample_rows_in_table_info: int = 3,
        indexes_in_table_info: bool = False,
        custom_table_info: Optional[dict] = None,
        view_support: bool = False,
        max_string_length: int = 300,
    ):
        """Create engine from database URI."""
        self._engine = engine
        self._schema = schema
        if include_tables and ignore_tables:
            raise ValueError("Cannot specify both include_tables and ignore_tables")

        self._inspector = inspect(self._engine)

        # including view support by adding the views as well as tables to the all
        # tables list if view_support is True
        self._all_tables = set(
            self._inspector.get_table_names(schema=schema)
            + (self._inspector.get_view_names(schema=schema) if view_support else [])
        )

        self._include_tables = set(include_tables) if include_tables else set()
        if self._include_tables:
            missing_tables = self._include_tables - self._all_tables
            if missing_tables:
                raise ValueError(
                    f"include_tables {missing_tables} not found in database"
                )
        self._ignore_tables = set(ignore_tables) if ignore_tables else set()
        if self._ignore_tables:
            missing_tables = self._ignore_tables - self._all_tables
            if missing_tables:
                raise ValueError(
                    f"ignore_tables {missing_tables} not found in database"
                )
        usable_tables = self.get_usable_table_names()
        self._usable_tables = set(usable_tables) if usable_tables else self._all_tables

        if not isinstance(sample_rows_in_table_info, int):
            raise TypeError("sample_rows_in_table_info must be an integer")

        self._sample_rows_in_table_info = sample_rows_in_table_info
        self._indexes_in_table_info = indexes_in_table_info

        self._custom_table_info = custom_table_info
        if self._custom_table_info:
            if not isinstance(self._custom_table_info, dict):
                raise TypeError(
                    "table_info must be a dictionary with table names as keys and the "
                    "desired table info as values"
                )
            # only keep the tables that are also present in the database
            intersection = set(self._custom_table_info).intersection(self._all_tables)
            self._custom_table_info = {
                table: info
                for table, info in self._custom_table_info.items()
                if table in intersection
            }

        self._max_string_length = max_string_length

        self._metadata = metadata or MetaData()
        # including view support if view_support = true
        self._metadata.reflect(
            views=view_support,
            bind=self._engine,
            only=list(self._usable_tables),
            schema=self._schema,
        )

    @property
    def engine(self) -> Engine:
        """Return SQL Alchemy engine."""
        return self._engine

    @property
    def metadata_obj(self) -> MetaData:
        """Return SQL Alchemy metadata."""
        return self._metadata

    @classmethod
    def from_uri(
        cls, database_uri: str, engine_args: Optional[dict] = None, **kwargs: Any
    ) -> "SQLDatabase":
        """Construct a SQLAlchemy engine from URI."""
        _engine_args = engine_args or {}
        return cls(create_engine(database_uri, **_engine_args), **kwargs)

    @property
    def dialect(self) -> str:
        """Return string representation of dialect to use."""
        return self._engine.dialect.name

    def get_usable_table_names(self) -> Iterable[str]:
        """Get names of tables available."""
        if self._include_tables:
            return sorted(self._include_tables)
        return sorted(self._all_tables - self._ignore_tables)

    def get_table_columns(self, table_name: str) -> List[Any]:
        """Get table columns."""
        return self._inspector.get_columns(table_name)

    def get_single_table_info(self, table_name: str) -> str:
        """Get table info for a single table."""
        # same logic as table_info, but with specific table names
        template = "Table '{table_name}' has columns: {columns}, "
        try:
            # try to retrieve table comment
            table_comment = self._inspector.get_table_comment(
                table_name, schema=self._schema
            )["text"]
            if table_comment:
                template += f"with comment: ({table_comment}) "
        except NotImplementedError:
            # get_table_comment raises NotImplementedError for a dialect that does not support comments.
            pass

        template += "{foreign_keys}."
        columns = []
        for column in self._inspector.get_columns(table_name, schema=self._schema):
            if column.get("comment"):
                columns.append(
                    f"{column['name']} ({column['type']!s}): '{column.get('comment')}'"
                )
            else:
                columns.append(f"{column['name']} ({column['type']!s})")

        column_str = ", ".join(columns)
        foreign_keys = []
        for foreign_key in self._inspector.get_foreign_keys(
            table_name, schema=self._schema
        ):
            foreign_keys.append(
                f"{foreign_key['constrained_columns']} -> "
                f"{foreign_key['referred_table']}.{foreign_key['referred_columns']}"
            )
        foreign_key_str = (
            foreign_keys
            and " and foreign keys: {}".format(", ".join(foreign_keys))
            or ""
        )
        return template.format(
            table_name=table_name, columns=column_str, foreign_keys=foreign_key_str
        )

    def insert_into_table(self, table_name: str, data: dict) -> None:
        """Insert data into a table."""
        table = self._metadata.tables[table_name]
        stmt = insert(table).values(**data)
        with self._engine.begin() as connection:
            connection.execute(stmt)

    def truncate_word(self, content: Any, *, length: int, suffix: str = "...") -> str:
        """
        Truncate a string to a certain number of words, based on the max string
        length.
        """
        if not isinstance(content, str) or length <= 0:
            return content

        if len(content) <= length:
            return content

        return content[: length - len(suffix)].rsplit(" ", 1)[0] + suffix

    def run_sql(self, command: str) -> Tuple[str, Dict]:
        """
        Execute a SQL statement and return a string representing the results.

        If the statement returns rows, a string of the results is returned.
        If the statement returns no rows, an empty string is returned.
        """
        with self._engine.begin() as connection:
            try:
                if self._schema:
                    command = command.replace("FROM ", f"FROM {self._schema}.")
                    command = command.replace("JOIN ", f"JOIN {self._schema}.")
                cursor = connection.execute(text(command))
            except (ProgrammingError, OperationalError) as exc:
                raise NotImplementedError(
                    f"Statement {command!r} is invalid SQL.\nError: {exc.orig}"
                ) from exc
            if cursor.returns_rows:
                result = cursor.fetchall()
                # truncate the results to the max string length
                # we can't use str(result) directly because it automatically truncates long strings
                truncated_results = []
                for row in result:
                    # truncate each column, then convert the row to a tuple
                    truncated_row = tuple(
                        self.truncate_word(column, length=self._max_string_length)
                        for column in row
                    )
                    truncated_results.append(truncated_row)
                return str(truncated_results), {
                    "result": truncated_results,
                    "col_keys": list(cursor.keys()),
                }
        return "", {}

引擎 property #

engine: Engine

返回 SQL Alchemy 引擎。

metadata_obj property #

metadata_obj: MetaData

返回 SQL Alchemy 元数据。

方言 property #

dialect: str

返回要使用的方言的字符串表示。

from_uri classmethod #

from_uri(database_uri: str, engine_args: Optional[dict] = None, **kwargs: Any) -> SQLDatabase

从URI构建SQLAlchemy引擎。

workflows/handler.py 中的源代码llama_index/core/utilities/sql_wrapper.py
129
130
131
132
133
134
135
@classmethod
def from_uri(
    cls, database_uri: str, engine_args: Optional[dict] = None, **kwargs: Any
) -> "SQLDatabase":
    """Construct a SQLAlchemy engine from URI."""
    _engine_args = engine_args or {}
    return cls(create_engine(database_uri, **_engine_args), **kwargs)

get_usable_table_names #

get_usable_table_names() -> Iterable[str]

获取可用表的名称。

workflows/handler.py 中的源代码llama_index/core/utilities/sql_wrapper.py
142
143
144
145
146
def get_usable_table_names(self) -> Iterable[str]:
    """Get names of tables available."""
    if self._include_tables:
        return sorted(self._include_tables)
    return sorted(self._all_tables - self._ignore_tables)

get_table_columns #

get_table_columns(table_name: str) -> List[Any]

获取表列。

workflows/handler.py 中的源代码llama_index/core/utilities/sql_wrapper.py
148
149
150
def get_table_columns(self, table_name: str) -> List[Any]:
    """Get table columns."""
    return self._inspector.get_columns(table_name)

get_single_table_info #

get_single_table_info(table_name: str) -> str

获取单个表的信息。

workflows/handler.py 中的源代码llama_index/core/utilities/sql_wrapper.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def get_single_table_info(self, table_name: str) -> str:
    """Get table info for a single table."""
    # same logic as table_info, but with specific table names
    template = "Table '{table_name}' has columns: {columns}, "
    try:
        # try to retrieve table comment
        table_comment = self._inspector.get_table_comment(
            table_name, schema=self._schema
        )["text"]
        if table_comment:
            template += f"with comment: ({table_comment}) "
    except NotImplementedError:
        # get_table_comment raises NotImplementedError for a dialect that does not support comments.
        pass

    template += "{foreign_keys}."
    columns = []
    for column in self._inspector.get_columns(table_name, schema=self._schema):
        if column.get("comment"):
            columns.append(
                f"{column['name']} ({column['type']!s}): '{column.get('comment')}'"
            )
        else:
            columns.append(f"{column['name']} ({column['type']!s})")

    column_str = ", ".join(columns)
    foreign_keys = []
    for foreign_key in self._inspector.get_foreign_keys(
        table_name, schema=self._schema
    ):
        foreign_keys.append(
            f"{foreign_key['constrained_columns']} -> "
            f"{foreign_key['referred_table']}.{foreign_key['referred_columns']}"
        )
    foreign_key_str = (
        foreign_keys
        and " and foreign keys: {}".format(", ".join(foreign_keys))
        or ""
    )
    return template.format(
        table_name=table_name, columns=column_str, foreign_keys=foreign_key_str
    )

insert_into_table #

insert_into_table(table_name: str, data: dict) -> None

将数据插入表中。

workflows/handler.py 中的源代码llama_index/core/utilities/sql_wrapper.py
195
196
197
198
199
200
def insert_into_table(self, table_name: str, data: dict) -> None:
    """Insert data into a table."""
    table = self._metadata.tables[table_name]
    stmt = insert(table).values(**data)
    with self._engine.begin() as connection:
        connection.execute(stmt)

truncate_word #

truncate_word(content: Any, *, length: int, suffix: str = '...') -> str

根据最大字符串长度,将字符串截断至特定数量的单词。

workflows/handler.py 中的源代码llama_index/core/utilities/sql_wrapper.py
202
203
204
205
206
207
208
209
210
211
212
213
def truncate_word(self, content: Any, *, length: int, suffix: str = "...") -> str:
    """
    Truncate a string to a certain number of words, based on the max string
    length.
    """
    if not isinstance(content, str) or length <= 0:
        return content

    if len(content) <= length:
        return content

    return content[: length - len(suffix)].rsplit(" ", 1)[0] + suffix

run_sql #

run_sql(command: str) -> Tuple[str, Dict]

执行一条SQL语句并返回代表结果的字符串。

如果语句返回行,则返回结果字符串。 如果语句未返回任何行,则返回空字符串。

workflows/handler.py 中的源代码llama_index/core/utilities/sql_wrapper.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def run_sql(self, command: str) -> Tuple[str, Dict]:
    """
    Execute a SQL statement and return a string representing the results.

    If the statement returns rows, a string of the results is returned.
    If the statement returns no rows, an empty string is returned.
    """
    with self._engine.begin() as connection:
        try:
            if self._schema:
                command = command.replace("FROM ", f"FROM {self._schema}.")
                command = command.replace("JOIN ", f"JOIN {self._schema}.")
            cursor = connection.execute(text(command))
        except (ProgrammingError, OperationalError) as exc:
            raise NotImplementedError(
                f"Statement {command!r} is invalid SQL.\nError: {exc.orig}"
            ) from exc
        if cursor.returns_rows:
            result = cursor.fetchall()
            # truncate the results to the max string length
            # we can't use str(result) directly because it automatically truncates long strings
            truncated_results = []
            for row in result:
                # truncate each column, then convert the row to a tuple
                truncated_row = tuple(
                    self.truncate_word(column, length=self._max_string_length)
                    for column in row
                )
                truncated_results.append(truncated_row)
            return str(truncated_results), {
                "result": truncated_results,
                "col_keys": list(cursor.keys()),
            }
    return "", {}

set_global_handler #

set_global_handler(eval_mode: str, **eval_params: Any) -> None

设置全局评估处理器。

workflows/handler.py 中的源代码llama_index/core/callbacks/global_handlers.py
 6
 7
 8
 9
10
11
12
def set_global_handler(eval_mode: str, **eval_params: Any) -> None:
    """Set global eval handlers."""
    import llama_index.core

    handler = create_global_handler(eval_mode, **eval_params)
    if handler:
        llama_index.core.global_handler = handler

load_graph_from_storage #

load_graph_from_storage(storage_context: StorageContext, root_id: str, **kwargs: Any) -> ComposableGraph

从存储上下文中加载可组合图。

参数:

名称 类型 描述 默认
storage_context StorageContext

包含文档存储、索引存储和向量存储的存储上下文。

required
root_id str

图的根索引ID。

required
**kwargs Any

传递给索引构造函数的额外关键字参数。

{}
workflows/handler.py 中的源代码llama_index/core/indices/loading.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def load_graph_from_storage(
    storage_context: StorageContext,
    root_id: str,
    **kwargs: Any,
) -> ComposableGraph:
    """
    Load composable graph from storage context.

    Args:
        storage_context (StorageContext): storage context containing
            docstore, index store and vector store.
        root_id (str): ID of the root index of the graph.
        **kwargs: Additional keyword args to pass to the index constructors.

    """
    indices = load_indices_from_storage(storage_context, index_ids=None, **kwargs)
    all_indices = {index.index_id: index for index in indices}
    return ComposableGraph(all_indices=all_indices, root_id=root_id)

load_index_from_storage #

load_index_from_storage(storage_context: StorageContext, index_id: Optional[str] = None, **kwargs: Any) -> BaseIndex

从存储上下文中加载索引。

参数:

名称 类型 描述 默认
storage_context StorageContext

包含文档存储、索引存储和向量存储的存储上下文。

required
index_id Optional[str]

要加载的索引ID。 默认为None,表示假设索引存储中仅存在单个索引并加载它。

None
**kwargs Any

传递给索引构造函数的额外关键字参数。

{}
workflows/handler.py 中的源代码llama_index/core/indices/loading.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def load_index_from_storage(
    storage_context: StorageContext,
    index_id: Optional[str] = None,
    **kwargs: Any,
) -> BaseIndex:
    """
    Load index from storage context.

    Args:
        storage_context (StorageContext): storage context containing
            docstore, index store and vector store.
        index_id (Optional[str]): ID of the index to load.
            Defaults to None, which assumes there's only a single index
            in the index store and load it.
        **kwargs: Additional keyword args to pass to the index constructors.

    """
    index_ids: Optional[Sequence[str]]
    if index_id is None:
        index_ids = None
    else:
        index_ids = [index_id]

    indices = load_indices_from_storage(storage_context, index_ids=index_ids, **kwargs)

    if len(indices) == 0:
        raise ValueError(
            "No index in storage context, check if you specified the right persist_dir."
        )
    elif len(indices) > 1:
        raise ValueError(
            f"Expected to load a single index, but got {len(indices)} instead. "
            "Please specify index_id."
        )

    return indices[0]

load_indices_from_storage #

load_indices_from_storage(storage_context: StorageContext, index_ids: Optional[Sequence[str]] = None, **kwargs: Any) -> List[BaseIndex]

从存储上下文中加载多个索引。

参数:

名称 类型 描述 默认
storage_context StorageContext

包含文档存储、索引存储和向量存储的存储上下文。

required
index_id Optional[Sequence[str]]

要加载的索引ID。 默认为None,表示加载索引存储中的所有索引。

required
**kwargs Any

传递给索引构造函数的额外关键字参数。

{}
workflows/handler.py 中的源代码llama_index/core/indices/loading.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def load_indices_from_storage(
    storage_context: StorageContext,
    index_ids: Optional[Sequence[str]] = None,
    **kwargs: Any,
) -> List[BaseIndex]:
    """
    Load multiple indices from storage context.

    Args:
        storage_context (StorageContext): storage context containing
            docstore, index store and vector store.
        index_id (Optional[Sequence[str]]): IDs of the indices to load.
            Defaults to None, which loads all indices in the index store.
        **kwargs: Additional keyword args to pass to the index constructors.

    """
    if index_ids is None:
        logger.info("Loading all indices.")
        index_structs = storage_context.index_store.index_structs()
    else:
        logger.info(f"Loading indices with ids: {index_ids}")
        index_structs = []
        for index_id in index_ids:
            index_struct = storage_context.index_store.get_index_struct(index_id)
            if index_struct is None:
                raise ValueError(f"Failed to load index with ID {index_id}")
            index_structs.append(index_struct)

    indices = []
    for index_struct in index_structs:
        type_ = index_struct.get_type()
        index_cls = INDEX_STRUCT_TYPE_TO_INDEX_CLASS[type_]
        index = index_cls(
            index_struct=index_struct, storage_context=storage_context, **kwargs
        )
        indices.append(index)
    return indices

download_loader #

download_loader(loader_class: str, loader_hub_url: str = '', refresh_cache: bool = False, use_gpt_index_import: bool = False, custom_path: Optional[str] = None) -> Type[BaseReader]

从加载器中心下载单个加载器。

参数:

名称 类型 描述 默认
loader_class str

您想要下载的加载器类的名称, 例如 SimpleWebPageReader

required
refresh_cache bool

如果为真,将跳过本地缓存,直接从远程仓库获取加载器。

False
use_gpt_index_import bool

如果为真,加载器文件将使用 llama_index作为基础依赖项。默认情况下(为假), 加载器文件使用llama_index作为基础依赖项。 注意:这是在我们完全将所有使用迁移到llama_index之前的临时解决方案。

False
custom_path Optional[str]

自定义目录路径,用于下载加载器。

None

返回:

类型 描述
Type[BaseReader]

一个加载器。

workflows/handler.py 中的源代码llama_index/core/readers/download.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@deprecated(
    "`download_loader()` is deprecated. "
    "Please install tool using pip install directly instead."
)
def download_loader(
    loader_class: str,
    loader_hub_url: str = "",
    refresh_cache: bool = False,
    use_gpt_index_import: bool = False,
    custom_path: Optional[str] = None,
) -> Type[BaseReader]:  # pragma: no cover
    """
    Download a single loader from the Loader Hub.

    Args:
        loader_class: The name of the loader class you want to download,
            such as `SimpleWebPageReader`.
        refresh_cache: If true, the local cache will be skipped and the
            loader will be fetched directly from the remote repo.
        use_gpt_index_import: If true, the loader files will use
            llama_index as the base dependency. By default (False),
            the loader files use llama_index as the base dependency.
            NOTE: this is a temporary workaround while we fully migrate all usages
            to llama_index.
        custom_path: Custom dirpath to download loader into.

    Returns:
        A Loader.

    """
    # maintain during deprecation period
    del loader_hub_url
    del refresh_cache
    del use_gpt_index_import
    del custom_path

    mappings_path = os.path.join(
        os.path.abspath(
            os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir)
        ),
        "command_line/mappings.json",
    )
    with open(mappings_path) as f:
        mappings = json.load(f)

    if loader_class in mappings:
        new_import_parent = mappings[loader_class]
        new_install_parent = new_import_parent.replace(".", "-").replace("_", "-")
    else:
        raise ValueError(f"Failed to find python package for class {loader_class}")

    reader_cls = download_integration(
        module_str=new_install_parent,
        module_import_str=new_import_parent,
        cls_name=loader_class,
    )
    if not issubclass(reader_cls, BaseReader):
        raise ValueError(
            f"Loader class {loader_class} must be a subclass of BaseReader."
        )

    return reader_cls

get_response_synthesizer #

get_response_synthesizer(llm: Optional[大语言模型] = None, prompt_helper: Optional[PromptHelper] = None, text_qa_template: Optional[BasePromptTemplate] = None, refine_template: Optional[BasePromptTemplate] = None, summary_template: Optional[BasePromptTemplate] = None, simple_template: Optional[BasePromptTemplate] = None, response_mode: ResponseMode = 紧凑型, callback_manager: Optional[CallbackManager] = None, use_async: bool = False, streaming: bool = False, structured_answer_filtering: bool = False, output_cls: Optional[Type[BaseModel]] = None, program_factory: Optional[Callable[[BasePromptTemplate], BasePydanticProgram]] = None, verbose: bool = False) -> BaseSynthesizer

获取一个响应合成器。

workflows/handler.py 中的源代码llama_index/core/response_synthesizers/factory.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def get_response_synthesizer(
    llm: Optional[LLM] = None,
    prompt_helper: Optional[PromptHelper] = None,
    text_qa_template: Optional[BasePromptTemplate] = None,
    refine_template: Optional[BasePromptTemplate] = None,
    summary_template: Optional[BasePromptTemplate] = None,
    simple_template: Optional[BasePromptTemplate] = None,
    response_mode: ResponseMode = ResponseMode.COMPACT,
    callback_manager: Optional[CallbackManager] = None,
    use_async: bool = False,
    streaming: bool = False,
    structured_answer_filtering: bool = False,
    output_cls: Optional[Type[BaseModel]] = None,
    program_factory: Optional[
        Callable[[BasePromptTemplate], BasePydanticProgram]
    ] = None,
    verbose: bool = False,
) -> BaseSynthesizer:
    """Get a response synthesizer."""
    text_qa_template = text_qa_template or DEFAULT_TEXT_QA_PROMPT_SEL
    refine_template = refine_template or DEFAULT_REFINE_PROMPT_SEL
    simple_template = simple_template or DEFAULT_SIMPLE_INPUT_PROMPT
    summary_template = summary_template or DEFAULT_TREE_SUMMARIZE_PROMPT_SEL

    callback_manager = callback_manager or Settings.callback_manager
    llm = llm or Settings.llm
    prompt_helper = (
        prompt_helper
        or Settings._prompt_helper
        or PromptHelper.from_llm_metadata(
            llm.metadata,
        )
    )

    if response_mode == ResponseMode.REFINE:
        return Refine(
            llm=llm,
            callback_manager=callback_manager,
            prompt_helper=prompt_helper,
            text_qa_template=text_qa_template,
            refine_template=refine_template,
            output_cls=output_cls,
            streaming=streaming,
            structured_answer_filtering=structured_answer_filtering,
            program_factory=program_factory,
            verbose=verbose,
        )
    elif response_mode == ResponseMode.COMPACT:
        return CompactAndRefine(
            llm=llm,
            callback_manager=callback_manager,
            prompt_helper=prompt_helper,
            text_qa_template=text_qa_template,
            refine_template=refine_template,
            output_cls=output_cls,
            streaming=streaming,
            structured_answer_filtering=structured_answer_filtering,
            program_factory=program_factory,
            verbose=verbose,
        )
    elif response_mode == ResponseMode.TREE_SUMMARIZE:
        return TreeSummarize(
            llm=llm,
            callback_manager=callback_manager,
            prompt_helper=prompt_helper,
            summary_template=summary_template,
            output_cls=output_cls,
            streaming=streaming,
            use_async=use_async,
            verbose=verbose,
        )
    elif response_mode == ResponseMode.SIMPLE_SUMMARIZE:
        return SimpleSummarize(
            llm=llm,
            callback_manager=callback_manager,
            prompt_helper=prompt_helper,
            text_qa_template=text_qa_template,
            streaming=streaming,
        )
    elif response_mode == ResponseMode.GENERATION:
        return Generation(
            llm=llm,
            callback_manager=callback_manager,
            prompt_helper=prompt_helper,
            simple_template=simple_template,
            streaming=streaming,
        )
    elif response_mode == ResponseMode.ACCUMULATE:
        return Accumulate(
            llm=llm,
            callback_manager=callback_manager,
            prompt_helper=prompt_helper,
            text_qa_template=text_qa_template,
            output_cls=output_cls,
            streaming=streaming,
            use_async=use_async,
        )
    elif response_mode == ResponseMode.COMPACT_ACCUMULATE:
        return CompactAndAccumulate(
            llm=llm,
            callback_manager=callback_manager,
            prompt_helper=prompt_helper,
            text_qa_template=text_qa_template,
            output_cls=output_cls,
            streaming=streaming,
            use_async=use_async,
        )
    elif response_mode == ResponseMode.NO_TEXT:
        return NoText(
            callback_manager=callback_manager,
            streaming=streaming,
        )
    elif response_mode == ResponseMode.CONTEXT_ONLY:
        return ContextOnly(
            callback_manager=callback_manager,
            streaming=streaming,
        )
    else:
        raise ValueError(f"Unknown mode: {response_mode}")

set_global_service_context #

set_global_service_context(service_context: Optional[ServiceContext]) -> None

用于设置全局服务上下文的辅助函数。

workflows/handler.py 中的源代码llama_index/core/service_context.py
41
42
43
44
45
46
47
48
def set_global_service_context(service_context: Optional[ServiceContext]) -> None:
    """Helper function to set the global service context."""
    raise ValueError(
        "ServiceContext is deprecated. Use llama_index.settings.Settings instead, "
        "or pass in modules to local functions/methods/interfaces.\n"
        "See the docs for updated usage/migration: \n"
        "https://docs.llamaindex.ai/en/stable/module_guides/supporting_modules/service_context_migration/"
    )

选项: 成员:- load_index_from_storage - load_indices_from_storage