跳转到内容

自然语言SQL表

BaseQueryEngine #

Bases: PromptMixin, DispatcherSpanMixin

基础查询引擎。

workflows/handler.py 中的源代码llama_index/core/base/base_query_engine.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class BaseQueryEngine(PromptMixin, DispatcherSpanMixin):
    """Base query engine."""

    def __init__(
        self,
        callback_manager: Optional[CallbackManager],
    ) -> None:
        self.callback_manager = callback_manager or CallbackManager([])

    def _get_prompts(self) -> Dict[str, Any]:
        """Get prompts."""
        return {}

    def _update_prompts(self, prompts: PromptDictType) -> None:
        """Update prompts."""

    @dispatcher.span
    def query(self, str_or_query_bundle: QueryType) -> RESPONSE_TYPE:
        dispatcher.event(QueryStartEvent(query=str_or_query_bundle))
        with self.callback_manager.as_trace("query"):
            if isinstance(str_or_query_bundle, str):
                str_or_query_bundle = QueryBundle(str_or_query_bundle)
            query_result = self._query(str_or_query_bundle)
        dispatcher.event(
            QueryEndEvent(query=str_or_query_bundle, response=query_result)
        )
        return query_result

    @dispatcher.span
    async def aquery(self, str_or_query_bundle: QueryType) -> RESPONSE_TYPE:
        dispatcher.event(QueryStartEvent(query=str_or_query_bundle))
        with self.callback_manager.as_trace("query"):
            if isinstance(str_or_query_bundle, str):
                str_or_query_bundle = QueryBundle(str_or_query_bundle)
            query_result = await self._aquery(str_or_query_bundle)
        dispatcher.event(
            QueryEndEvent(query=str_or_query_bundle, response=query_result)
        )
        return query_result

    def retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        raise NotImplementedError(
            "This query engine does not support retrieve, use query directly"
        )

    def synthesize(
        self,
        query_bundle: QueryBundle,
        nodes: List[NodeWithScore],
        additional_source_nodes: Optional[Sequence[NodeWithScore]] = None,
    ) -> RESPONSE_TYPE:
        raise NotImplementedError(
            "This query engine does not support synthesize, use query directly"
        )

    async def asynthesize(
        self,
        query_bundle: QueryBundle,
        nodes: List[NodeWithScore],
        additional_source_nodes: Optional[Sequence[NodeWithScore]] = None,
    ) -> RESPONSE_TYPE:
        raise NotImplementedError(
            "This query engine does not support asynthesize, use aquery directly"
        )

    @abstractmethod
    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        pass

    @abstractmethod
    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        pass

自然语言SQL表查询引擎 #

基类:EventBaseSQLTableQueryEngine

自然语言SQL表查询引擎。

阅读 NLStructStoreQueryEngine 的文档字符串以获取关于自然语言 SQL 的更多信息。

注意:任何文本转SQL应用程序都应注意,执行任意SQL查询可能存在安全风险。建议根据需要采取预防措施,例如使用受限角色、只读数据库、沙盒环境等。

workflows/handler.py 中的源代码llama_index/core/indices/struct_store/sql_query.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
class NLSQLTableQueryEngine(BaseSQLTableQueryEngine):
    """
    Natural language SQL Table query engine.

    Read NLStructStoreQueryEngine's docstring for more info on NL SQL.

    NOTE: Any Text-to-SQL application should be aware that executing
    arbitrary SQL queries can be a security risk. It is recommended to
    take precautions as needed, such as using restricted roles, read-only
    databases, sandboxing, etc.
    """

    def __init__(
        self,
        sql_database: SQLDatabase,
        llm: Optional[LLM] = None,
        text_to_sql_prompt: Optional[BasePromptTemplate] = None,
        context_query_kwargs: Optional[dict] = None,
        synthesize_response: bool = True,
        markdown_response: bool = False,
        response_synthesis_prompt: Optional[BasePromptTemplate] = None,
        refine_synthesis_prompt: Optional[BasePromptTemplate] = None,
        tables: Optional[Union[List[str], List[Table]]] = None,
        table_retriever: Optional[ObjectRetriever[SQLTableSchema]] = None,
        rows_retrievers: Optional[dict[str, BaseRetriever]] = None,
        cols_retrievers: Optional[dict[str, dict[str, BaseRetriever]]] = None,
        context_str_prefix: Optional[str] = None,
        embed_model: Optional[BaseEmbedding] = None,
        sql_only: bool = False,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> None:
        """Initialize params."""
        # self._tables = tables
        self._sql_retriever = NLSQLRetriever(
            sql_database,
            llm=llm,
            text_to_sql_prompt=text_to_sql_prompt,
            context_query_kwargs=context_query_kwargs,
            tables=tables,
            table_retriever=table_retriever,
            rows_retrievers=rows_retrievers,
            cols_retrievers=cols_retrievers,
            context_str_prefix=context_str_prefix,
            embed_model=embed_model,
            sql_only=sql_only,
            callback_manager=callback_manager,
            verbose=verbose,
        )
        super().__init__(
            synthesize_response=synthesize_response,
            markdown_response=markdown_response,
            response_synthesis_prompt=response_synthesis_prompt,
            refine_synthesis_prompt=refine_synthesis_prompt,
            llm=llm,
            callback_manager=callback_manager,
            verbose=verbose,
            **kwargs,
        )

    @property
    def sql_retriever(self) -> NLSQLRetriever:
        """Get SQL retriever."""
        return self._sql_retriever

sql_retriever property #

sql_retriever: NLSQLRetriever

获取 SQL 检索器。

PGVectorSQL查询引擎 #

基类:EventBaseSQLTableQueryEngine

PGvector SQL 查询引擎。

普通文本到SQL查询引擎的修改版本,因为我们可以在SQL查询中推断嵌入向量。

注意:这是一个测试版功能

注意:任何文本转SQL应用程序都应注意,执行任意SQL查询可能存在安全风险。建议根据需要采取预防措施,例如使用受限角色、只读数据库、沙盒环境等。

workflows/handler.py 中的源代码llama_index/core/indices/struct_store/sql_query.py
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
class PGVectorSQLQueryEngine(BaseSQLTableQueryEngine):
    """
    PGvector SQL query engine.

    A modified version of the normal text-to-SQL query engine because
    we can infer embedding vectors in the sql query.

    NOTE: this is a beta feature

    NOTE: Any Text-to-SQL application should be aware that executing
    arbitrary SQL queries can be a security risk. It is recommended to
    take precautions as needed, such as using restricted roles, read-only
    databases, sandboxing, etc.

    """

    def __init__(
        self,
        sql_database: SQLDatabase,
        llm: Optional[LLM] = None,
        text_to_sql_prompt: Optional[BasePromptTemplate] = None,
        context_query_kwargs: Optional[dict] = None,
        synthesize_response: bool = True,
        response_synthesis_prompt: Optional[BasePromptTemplate] = None,
        refine_synthesis_prompt: Optional[BasePromptTemplate] = None,
        tables: Optional[Union[List[str], List[Table]]] = None,
        context_str_prefix: Optional[str] = None,
        sql_only: bool = False,
        callback_manager: Optional[CallbackManager] = None,
        **kwargs: Any,
    ) -> None:
        """Initialize params."""
        text_to_sql_prompt = text_to_sql_prompt or DEFAULT_TEXT_TO_SQL_PGVECTOR_PROMPT
        self._sql_retriever = NLSQLRetriever(
            sql_database,
            llm=llm,
            text_to_sql_prompt=text_to_sql_prompt,
            context_query_kwargs=context_query_kwargs,
            tables=tables,
            sql_parser_mode=SQLParserMode.PGVECTOR,
            context_str_prefix=context_str_prefix,
            sql_only=sql_only,
            callback_manager=callback_manager,
        )
        super().__init__(
            synthesize_response=synthesize_response,
            response_synthesis_prompt=response_synthesis_prompt,
            refine_synthesis_prompt=refine_synthesis_prompt,
            llm=llm,
            callback_manager=callback_manager,
            **kwargs,
        )

    @property
    def sql_retriever(self) -> NLSQLRetriever:
        """Get SQL retriever."""
        return self._sql_retriever

sql_retriever property #

sql_retriever: NLSQLRetriever

获取 SQL 检索器。

SQL表检索查询引擎 #

基类:EventBaseSQLTableQueryEngine

SQL表检索器查询引擎。

workflows/handler.py 中的源代码llama_index/core/indices/struct_store/sql_query.py
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
class SQLTableRetrieverQueryEngine(BaseSQLTableQueryEngine):
    """SQL Table retriever query engine."""

    def __init__(
        self,
        sql_database: SQLDatabase,
        table_retriever: ObjectRetriever[SQLTableSchema],
        rows_retrievers: Optional[dict[str, BaseRetriever]] = None,
        cols_retrievers: Optional[dict[str, dict[str, BaseRetriever]]] = None,
        llm: Optional[LLM] = None,
        text_to_sql_prompt: Optional[BasePromptTemplate] = None,
        context_query_kwargs: Optional[dict] = None,
        synthesize_response: bool = True,
        response_synthesis_prompt: Optional[BasePromptTemplate] = None,
        refine_synthesis_prompt: Optional[BasePromptTemplate] = None,
        context_str_prefix: Optional[str] = None,
        sql_only: bool = False,
        callback_manager: Optional[CallbackManager] = None,
        **kwargs: Any,
    ) -> None:
        """Initialize params."""
        self._sql_retriever = NLSQLRetriever(
            sql_database,
            llm=llm,
            text_to_sql_prompt=text_to_sql_prompt,
            context_query_kwargs=context_query_kwargs,
            table_retriever=table_retriever,
            rows_retrievers=rows_retrievers,
            cols_retrievers=cols_retrievers,
            context_str_prefix=context_str_prefix,
            sql_only=sql_only,
            callback_manager=callback_manager,
            verbose=kwargs.get("verbose", False),
        )
        super().__init__(
            synthesize_response=synthesize_response,
            response_synthesis_prompt=response_synthesis_prompt,
            refine_synthesis_prompt=refine_synthesis_prompt,
            llm=llm,
            callback_manager=callback_manager,
            **kwargs,
        )

    @property
    def sql_retriever(self) -> NLSQLRetriever:
        """Get SQL retriever."""
        return self._sql_retriever

sql_retriever property #

sql_retriever: NLSQLRetriever

获取 SQL 检索器。

引用查询引擎 #

基类:EventBaseQueryEngine

引文查询引擎。

参数:

名称 类型 描述 默认
retriever BaseRetriever

一个检索器对象。

required
response_synthesizer Optional[BaseSynthesizer]

一个 BaseSynthesizer 对象。

None
citation_chunk_size int

引用块的大小,默认=512。用于控制来源的粒度。

DEFAULT_CITATION_CHUNK_SIZE
citation_chunk_overlap int

引用节点的重叠数量,默认值=20。

DEFAULT_CITATION_CHUNK_OVERLAP
text_splitter Optional[TextSplitter]

用于创建引用源节点的文本分割器。默认为 SentenceSplitter。

None
callback_manager Optional[CallbackManager]

一个回调管理器。

None
metadata_mode MetadataMode

一个控制元数据如何包含在引用提示中的 MetadataMode 对象。

NONE
workflows/handler.py 中的源代码llama_index/core/query_engine/citation_query_engine.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
class CitationQueryEngine(BaseQueryEngine):
    """
    Citation query engine.

    Args:
        retriever (BaseRetriever): A retriever object.
        response_synthesizer (Optional[BaseSynthesizer]):
            A BaseSynthesizer object.
        citation_chunk_size (int):
            Size of citation chunks, default=512. Useful for controlling
            granularity of sources.
        citation_chunk_overlap (int): Overlap of citation nodes, default=20.
        text_splitter (Optional[TextSplitter]):
            A text splitter for creating citation source nodes. Default is
            a SentenceSplitter.
        callback_manager (Optional[CallbackManager]): A callback manager.
        metadata_mode (MetadataMode): A MetadataMode object that controls how
            metadata is included in the citation prompt.

    """

    def __init__(
        self,
        retriever: BaseRetriever,
        llm: Optional[LLM] = None,
        response_synthesizer: Optional[BaseSynthesizer] = None,
        citation_chunk_size: int = DEFAULT_CITATION_CHUNK_SIZE,
        citation_chunk_overlap: int = DEFAULT_CITATION_CHUNK_OVERLAP,
        text_splitter: Optional[TextSplitter] = None,
        node_postprocessors: Optional[List[BaseNodePostprocessor]] = None,
        callback_manager: Optional[CallbackManager] = None,
        metadata_mode: MetadataMode = MetadataMode.NONE,
    ) -> None:
        self.text_splitter = text_splitter or SentenceSplitter(
            chunk_size=citation_chunk_size, chunk_overlap=citation_chunk_overlap
        )
        self._retriever = retriever

        callback_manager = callback_manager or Settings.callback_manager
        llm = llm or Settings.llm

        self._response_synthesizer = response_synthesizer or get_response_synthesizer(
            llm=llm,
            callback_manager=callback_manager,
            text_qa_template=CITATION_QA_TEMPLATE,
            refine_template=CITATION_REFINE_TEMPLATE,
            response_mode=ResponseMode.COMPACT,
            use_async=False,
            streaming=False,
        )

        self._node_postprocessors = node_postprocessors or []
        self._metadata_mode = metadata_mode

        for node_postprocessor in self._node_postprocessors:
            node_postprocessor.callback_manager = callback_manager

        super().__init__(callback_manager=callback_manager)

    @classmethod
    def from_args(
        cls,
        index: BaseGPTIndex,
        llm: Optional[LLM] = None,
        response_synthesizer: Optional[BaseSynthesizer] = None,
        citation_chunk_size: int = DEFAULT_CITATION_CHUNK_SIZE,
        citation_chunk_overlap: int = DEFAULT_CITATION_CHUNK_OVERLAP,
        text_splitter: Optional[TextSplitter] = None,
        citation_qa_template: BasePromptTemplate = CITATION_QA_TEMPLATE,
        citation_refine_template: BasePromptTemplate = CITATION_REFINE_TEMPLATE,
        retriever: Optional[BaseRetriever] = None,
        node_postprocessors: Optional[List[BaseNodePostprocessor]] = None,
        # response synthesizer args
        response_mode: ResponseMode = ResponseMode.COMPACT,
        use_async: bool = False,
        streaming: bool = False,
        # class-specific args
        metadata_mode: MetadataMode = MetadataMode.NONE,
        **kwargs: Any,
    ) -> "CitationQueryEngine":
        """
        Initialize a CitationQueryEngine object.".

        Args:
            index: (BastGPTIndex): index to use for querying
            llm: (Optional[LLM]): LLM object to use for response generation.
            citation_chunk_size (int):
                Size of citation chunks, default=512. Useful for controlling
                granularity of sources.
            citation_chunk_overlap (int): Overlap of citation nodes, default=20.
            text_splitter (Optional[TextSplitter]):
                A text splitter for creating citation source nodes. Default is
                a SentenceSplitter.
            citation_qa_template (BasePromptTemplate): Template for initial citation QA
            citation_refine_template (BasePromptTemplate):
                Template for citation refinement.
            retriever (BaseRetriever): A retriever object.
            node_postprocessors (Optional[List[BaseNodePostprocessor]]): A list of
                node postprocessors.
            verbose (bool): Whether to print out debug info.
            response_mode (ResponseMode): A ResponseMode object.
            use_async (bool): Whether to use async.
            streaming (bool): Whether to use streaming.
            optimizer (Optional[BaseTokenUsageOptimizer]): A BaseTokenUsageOptimizer
                object.

        """
        retriever = retriever or index.as_retriever(**kwargs)

        response_synthesizer = response_synthesizer or get_response_synthesizer(
            llm=llm,
            text_qa_template=citation_qa_template,
            refine_template=citation_refine_template,
            response_mode=response_mode,
            use_async=use_async,
            streaming=streaming,
        )

        return cls(
            retriever=retriever,
            llm=llm,
            response_synthesizer=response_synthesizer,
            callback_manager=Settings.callback_manager,
            citation_chunk_size=citation_chunk_size,
            citation_chunk_overlap=citation_chunk_overlap,
            text_splitter=text_splitter,
            node_postprocessors=node_postprocessors,
            metadata_mode=metadata_mode,
        )

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {"response_synthesizer": self._response_synthesizer}

    def _create_citation_nodes(self, nodes: List[NodeWithScore]) -> List[NodeWithScore]:
        """Modify retrieved nodes to be granular sources."""
        new_nodes: List[NodeWithScore] = []
        for node in nodes:
            text_chunks = self.text_splitter.split_text(
                node.node.get_content(metadata_mode=self._metadata_mode)
            )

            for text_chunk in text_chunks:
                text = f"Source {len(new_nodes) + 1}:\n{text_chunk}\n"

                new_node = NodeWithScore(
                    node=TextNode.model_validate(node.node.model_dump()),
                    score=node.score,
                )
                new_node.node.set_content(text)
                new_nodes.append(new_node)
        return new_nodes

    def retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        nodes = self._retriever.retrieve(query_bundle)

        for postprocessor in self._node_postprocessors:
            nodes = postprocessor.postprocess_nodes(nodes, query_bundle=query_bundle)

        return nodes

    async def aretrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        nodes = await self._retriever.aretrieve(query_bundle)

        for postprocessor in self._node_postprocessors:
            nodes = postprocessor.postprocess_nodes(nodes, query_bundle=query_bundle)

        return nodes

    @property
    def retriever(self) -> BaseRetriever:
        """Get the retriever object."""
        return self._retriever

    def synthesize(
        self,
        query_bundle: QueryBundle,
        nodes: List[NodeWithScore],
        additional_source_nodes: Optional[Sequence[NodeWithScore]] = None,
    ) -> RESPONSE_TYPE:
        nodes = self._create_citation_nodes(nodes)
        return self._response_synthesizer.synthesize(
            query=query_bundle,
            nodes=nodes,
            additional_source_nodes=additional_source_nodes,
        )

    async def asynthesize(
        self,
        query_bundle: QueryBundle,
        nodes: List[NodeWithScore],
        additional_source_nodes: Optional[Sequence[NodeWithScore]] = None,
    ) -> RESPONSE_TYPE:
        nodes = self._create_citation_nodes(nodes)
        return await self._response_synthesizer.asynthesize(
            query=query_bundle,
            nodes=nodes,
            additional_source_nodes=additional_source_nodes,
        )

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Answer a query."""
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            with self.callback_manager.event(
                CBEventType.RETRIEVE,
                payload={EventPayload.QUERY_STR: query_bundle.query_str},
            ) as retrieve_event:
                nodes = self.retrieve(query_bundle)
                nodes = self._create_citation_nodes(nodes)

                retrieve_event.on_end(payload={EventPayload.NODES: nodes})

            response = self._response_synthesizer.synthesize(
                query=query_bundle,
                nodes=nodes,
            )

            query_event.on_end(payload={EventPayload.RESPONSE: response})

        return response

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Answer a query."""
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            with self.callback_manager.event(
                CBEventType.RETRIEVE,
                payload={EventPayload.QUERY_STR: query_bundle.query_str},
            ) as retrieve_event:
                nodes = await self.aretrieve(query_bundle)
                nodes = self._create_citation_nodes(nodes)

                retrieve_event.on_end(payload={EventPayload.NODES: nodes})

            response = await self._response_synthesizer.asynthesize(
                query=query_bundle,
                nodes=nodes,
            )

            query_event.on_end(payload={EventPayload.RESPONSE: response})

        return response

retriever property #

retriever: BaseRetriever

获取检索器对象。

from_args classmethod #

from_args(index: BaseGPTIndex, llm: Optional[大语言模型] = None, response_synthesizer: Optional[BaseSynthesizer] = None, citation_chunk_size: int = DEFAULT_CITATION_CHUNK_SIZE, citation_chunk_overlap: int = DEFAULT_CITATION_CHUNK_OVERLAP, text_splitter: Optional[TextSplitter] = None, citation_qa_template: BasePromptTemplate = CITATION_QA_TEMPLATE, citation_refine_template: BasePromptTemplate = CITATION_REFINE_TEMPLATE, retriever: Optional[BaseRetriever] = None, node_postprocessors: Optional[List[BaseNodePostprocessor]] = None, response_mode: ResponseMode = 紧凑型, use_async: bool = False, streaming: bool = False, metadata_mode: MetadataMode = NONE, **kwargs: Any) -> CitationQueryEngine

初始化一个CitationQueryEngine对象。

参数:

名称 类型 描述 默认
index BaseGPTIndex

(BastGPTIndex): 用于查询的索引

required
llm Optional[大语言模型]

(可选[LLM]): 用于生成响应的LLM对象。

None
citation_chunk_size int

引文块的大小,默认=512。用于控制来源的粒度。

DEFAULT_CITATION_CHUNK_SIZE
citation_chunk_overlap int

引用节点的重叠数量,默认值=20。

DEFAULT_CITATION_CHUNK_OVERLAP
text_splitter Optional[TextSplitter]

用于创建引用源节点的文本分割器。默认为 SentenceSplitter。

None
citation_qa_template BasePromptTemplate

初始引证问答模板

CITATION_QA_TEMPLATE
citation_refine_template BasePromptTemplate

引文优化模板。

CITATION_REFINE_TEMPLATE
retriever BaseRetriever

一个检索器对象。

None
node_postprocessors Optional[List[BaseNodePostprocessor]]

一个节点后处理器的列表。

None
verbose bool

是否打印调试信息。

required
response_mode ResponseMode

一个 ResponseMode 对象。

紧凑型
use_async bool

是否使用异步。

False
streaming bool

是否使用流式传输。

False
optimizer Optional[BaseTokenUsageOptimizer]

一个 BaseTokenUsageOptimizer 对象。

required
workflows/handler.py 中的源代码llama_index/core/query_engine/citation_query_engine.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@classmethod
def from_args(
    cls,
    index: BaseGPTIndex,
    llm: Optional[LLM] = None,
    response_synthesizer: Optional[BaseSynthesizer] = None,
    citation_chunk_size: int = DEFAULT_CITATION_CHUNK_SIZE,
    citation_chunk_overlap: int = DEFAULT_CITATION_CHUNK_OVERLAP,
    text_splitter: Optional[TextSplitter] = None,
    citation_qa_template: BasePromptTemplate = CITATION_QA_TEMPLATE,
    citation_refine_template: BasePromptTemplate = CITATION_REFINE_TEMPLATE,
    retriever: Optional[BaseRetriever] = None,
    node_postprocessors: Optional[List[BaseNodePostprocessor]] = None,
    # response synthesizer args
    response_mode: ResponseMode = ResponseMode.COMPACT,
    use_async: bool = False,
    streaming: bool = False,
    # class-specific args
    metadata_mode: MetadataMode = MetadataMode.NONE,
    **kwargs: Any,
) -> "CitationQueryEngine":
    """
    Initialize a CitationQueryEngine object.".

    Args:
        index: (BastGPTIndex): index to use for querying
        llm: (Optional[LLM]): LLM object to use for response generation.
        citation_chunk_size (int):
            Size of citation chunks, default=512. Useful for controlling
            granularity of sources.
        citation_chunk_overlap (int): Overlap of citation nodes, default=20.
        text_splitter (Optional[TextSplitter]):
            A text splitter for creating citation source nodes. Default is
            a SentenceSplitter.
        citation_qa_template (BasePromptTemplate): Template for initial citation QA
        citation_refine_template (BasePromptTemplate):
            Template for citation refinement.
        retriever (BaseRetriever): A retriever object.
        node_postprocessors (Optional[List[BaseNodePostprocessor]]): A list of
            node postprocessors.
        verbose (bool): Whether to print out debug info.
        response_mode (ResponseMode): A ResponseMode object.
        use_async (bool): Whether to use async.
        streaming (bool): Whether to use streaming.
        optimizer (Optional[BaseTokenUsageOptimizer]): A BaseTokenUsageOptimizer
            object.

    """
    retriever = retriever or index.as_retriever(**kwargs)

    response_synthesizer = response_synthesizer or get_response_synthesizer(
        llm=llm,
        text_qa_template=citation_qa_template,
        refine_template=citation_refine_template,
        response_mode=response_mode,
        use_async=use_async,
        streaming=streaming,
    )

    return cls(
        retriever=retriever,
        llm=llm,
        response_synthesizer=response_synthesizer,
        callback_manager=Settings.callback_manager,
        citation_chunk_size=citation_chunk_size,
        citation_chunk_overlap=citation_chunk_overlap,
        text_splitter=text_splitter,
        node_postprocessors=node_postprocessors,
        metadata_mode=metadata_mode,
    )

Cogniswitch查询引擎 #

基类:EventBaseQueryEngine

workflows/handler.py 中的源代码llama_index/core/query_engine/cogniswitch_query_engine.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class CogniswitchQueryEngine(BaseQueryEngine):
    def __init__(self, cs_token: str, OAI_token: str, apiKey: str) -> None:
        """
        The required fields.

        Args:
            cs_token (str): Cogniswitch token.
            OAI_token (str): OpenAI token.
            apiKey (str): Oauth token.

        """
        self.cs_token = cs_token
        self.OAI_token = OAI_token
        self.apiKey = apiKey
        self.knowledge_request_endpoint = (
            "https://api.cogniswitch.ai:8243/cs-api/0.0.1/cs/knowledgeRequest"
        )
        self.headers = {
            "apiKey": self.apiKey,
            "platformToken": self.cs_token,
            "openAIToken": self.OAI_token,
        }

    def query_knowledge(self, query: str) -> Response:
        """
        Send a query to the Cogniswitch service and retrieve the response.

        Args:
            query (str): Query to be answered.

        Returns:
            dict: Response JSON from the Cogniswitch service.

        """
        data = {"query": query}
        response = requests.post(
            self.knowledge_request_endpoint,
            headers=self.headers,
            data=data,
        )
        if response.status_code == 200:
            resp = response.json()
            answer = resp["data"]["answer"]

            return Response(response=answer)
        else:
            error_message = response.json()["message"]
            return Response(response=error_message)

    def _query(self, query_bundle: QueryBundle) -> Response:
        return self.query_knowledge(query_bundle.query_str)

    async def _aquery(self, query_bundle: QueryBundle) -> Response:
        return self.query_knowledge(query_bundle.query_str)

    def _get_prompt_modules(self) -> Dict[str, Any]:
        """Get prompts."""
        return {}

query_knowledge #

query_knowledge(query: str) -> 响应

向 Cogniswitch 服务发送查询并获取响应。

参数:

名称 类型 描述 默认
query str

需要回答的查询。

required

返回:

名称 类型 描述
dict 响应

Cogniswitch 服务的响应 JSON。

workflows/handler.py 中的源代码llama_index/core/query_engine/cogniswitch_query_engine.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def query_knowledge(self, query: str) -> Response:
    """
    Send a query to the Cogniswitch service and retrieve the response.

    Args:
        query (str): Query to be answered.

    Returns:
        dict: Response JSON from the Cogniswitch service.

    """
    data = {"query": query}
    response = requests.post(
        self.knowledge_request_endpoint,
        headers=self.headers,
        data=data,
    )
    if response.status_code == 200:
        resp = response.json()
        answer = resp["data"]["answer"]

        return Response(response=answer)
    else:
        error_message = response.json()["message"]
        return Response(response=error_message)

自定义查询引擎 #

Bases: BaseModel, BaseQueryEngine

自定义查询引擎。

子类可以定义额外的属性作为Pydantic字段。 子类必须实现custom_query方法,该方法接收查询字符串 并返回一个Response对象或字符串作为输出。

它们可以选择性地实现 acustom_query 方法以获得异步支持。

参数:

名称 类型 描述 默认
callback_manager CallbackManager
<llama_index.core.callbacks.base.CallbackManager object at 0x7f82e15e9f40>
workflows/handler.py 中的源代码llama_index/core/query_engine/custom.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class CustomQueryEngine(BaseModel, BaseQueryEngine):
    """
    Custom query engine.

    Subclasses can define additional attributes as Pydantic fields.
    Subclasses must implement the `custom_query` method, which takes a query string
    and returns either a Response object or a string as output.

    They can optionally implement the `acustom_query` method for async support.

    """

    model_config = ConfigDict(arbitrary_types_allowed=True)
    callback_manager: CallbackManager = Field(
        default_factory=lambda: CallbackManager([]), exclude=True
    )

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {}

    def query(self, str_or_query_bundle: QueryType) -> RESPONSE_TYPE:
        with self.callback_manager.as_trace("query"):
            # if query bundle, just run the query
            if isinstance(str_or_query_bundle, QueryBundle):
                query_str = str_or_query_bundle.query_str
            else:
                query_str = str_or_query_bundle
            raw_response = self.custom_query(query_str)
            return (
                Response(raw_response)
                if isinstance(raw_response, str)
                else raw_response
            )

    async def aquery(self, str_or_query_bundle: QueryType) -> RESPONSE_TYPE:
        with self.callback_manager.as_trace("query"):
            if isinstance(str_or_query_bundle, QueryBundle):
                query_str = str_or_query_bundle.query_str
            else:
                query_str = str_or_query_bundle
            raw_response = await self.acustom_query(query_str)
            return (
                Response(raw_response)
                if isinstance(raw_response, str)
                else raw_response
            )

    @abstractmethod
    def custom_query(self, query_str: str) -> STR_OR_RESPONSE_TYPE:
        """Run a custom query."""

    async def acustom_query(self, query_str: str) -> STR_OR_RESPONSE_TYPE:
        """Run a custom query asynchronously."""
        # by default, just run the synchronous version
        return self.custom_query(query_str)

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        raise NotImplementedError("This query engine does not support _query.")

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        raise NotImplementedError("This query engine does not support _aquery.")

custom_query abstractmethod #

custom_query(query_str: str) -> STR_OR_RESPONSE_TYPE

运行自定义查询。

workflows/handler.py 中的源代码llama_index/core/query_engine/custom.py
64
65
66
@abstractmethod
def custom_query(self, query_str: str) -> STR_OR_RESPONSE_TYPE:
    """Run a custom query."""

acustom_query async #

acustom_query(query_str: str) -> STR_OR_RESPONSE_TYPE

异步运行自定义查询。

workflows/handler.py 中的源代码llama_index/core/query_engine/custom.py
68
69
70
71
async def acustom_query(self, query_str: str) -> STR_OR_RESPONSE_TYPE:
    """Run a custom query asynchronously."""
    # by default, just run the synchronous version
    return self.custom_query(query_str)

FLARE指令查询引擎 #

基类:EventBaseQueryEngine

FLARE Instruct 查询引擎。

这是使用检索增强指令的FLARE版本。

注意:这是一个测试版功能。接口可能会更改,并且可能不总是给出正确答案。

参数:

名称 类型 描述 默认
query_engine BaseQueryEngine

要使用的查询引擎

required
llm Optional[大语言模型]

LLM模型。默认为None。

None
instruct_prompt Optional[PromptTemplate]

指令提示。默认为空。

None
lookahead_answer_inserter Optional[BaseLookaheadAnswerInserter]

前瞻答案插入器。默认为 None。

None
done_output_parser Optional[IsDoneOutputParser]

完成输出解析器。 默认为空。

None
query_task_output_parser Optional[QueryTaskOutputParser]

查询任务输出解析器。默认为 None。

None
max_iterations int

最大迭代次数。默认为10。

10
max_lookahead_query_tasks int

最大前瞻查询任务数。默认为1。

1
callback_manager Optional[CallbackManager]

回调管理器。 默认为空。

None
verbose bool

提供详细输出。默认为 False。

False
workflows/handler.py 中的源代码llama_index/core/query_engine/flare/base.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
class FLAREInstructQueryEngine(BaseQueryEngine):
    """
    FLARE Instruct query engine.

    This is the version of FLARE that uses retrieval-encouraging instructions.

    NOTE: this is a beta feature. Interfaces might change, and it might not
    always give correct answers.

    Args:
        query_engine (BaseQueryEngine): query engine to use
        llm (Optional[LLM]): LLM model. Defaults to None.
        instruct_prompt (Optional[PromptTemplate]): instruct prompt. Defaults to None.
        lookahead_answer_inserter (Optional[BaseLookaheadAnswerInserter]):
            lookahead answer inserter. Defaults to None.
        done_output_parser (Optional[IsDoneOutputParser]): done output parser.
            Defaults to None.
        query_task_output_parser (Optional[QueryTaskOutputParser]):
            query task output parser. Defaults to None.
        max_iterations (int): max iterations. Defaults to 10.
        max_lookahead_query_tasks (int): max lookahead query tasks. Defaults to 1.
        callback_manager (Optional[CallbackManager]): callback manager.
            Defaults to None.
        verbose (bool): give verbose outputs. Defaults to False.

    """

    def __init__(
        self,
        query_engine: BaseQueryEngine,
        llm: Optional[LLM] = None,
        instruct_prompt: Optional[BasePromptTemplate] = None,
        lookahead_answer_inserter: Optional[BaseLookaheadAnswerInserter] = None,
        done_output_parser: Optional[IsDoneOutputParser] = None,
        query_task_output_parser: Optional[QueryTaskOutputParser] = None,
        max_iterations: int = 10,
        max_lookahead_query_tasks: int = 1,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = False,
    ) -> None:
        """Init params."""
        super().__init__(callback_manager=callback_manager)
        self._query_engine = query_engine
        self._llm = llm or Settings.llm
        self._instruct_prompt = instruct_prompt or DEFAULT_INSTRUCT_PROMPT
        self._lookahead_answer_inserter = lookahead_answer_inserter or (
            LLMLookaheadAnswerInserter(llm=self._llm)
        )
        self._done_output_parser = done_output_parser or IsDoneOutputParser()
        self._query_task_output_parser = (
            query_task_output_parser or QueryTaskOutputParser()
        )
        self._max_iterations = max_iterations
        self._max_lookahead_query_tasks = max_lookahead_query_tasks
        self._verbose = verbose

    def _get_prompts(self) -> Dict[str, Any]:
        """Get prompts."""
        return {
            "instruct_prompt": self._instruct_prompt,
        }

    def _update_prompts(self, prompts: PromptDictType) -> None:
        """Update prompts."""
        if "instruct_prompt" in prompts:
            self._instruct_prompt = prompts["instruct_prompt"]

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {
            "query_engine": self._query_engine,
            "lookahead_answer_inserter": self._lookahead_answer_inserter,
        }

    def _get_relevant_lookahead_response(self, updated_lookahead_resp: str) -> str:
        """Get relevant lookahead response."""
        # if there's remaining query tasks, then truncate the response
        # until the start position of the first tag
        # there may be remaining query tasks because the _max_lookahead_query_tasks
        # is less than the total number of generated [Search(query)] tags
        remaining_query_tasks = self._query_task_output_parser.parse(
            updated_lookahead_resp
        )
        if len(remaining_query_tasks) == 0:
            relevant_lookahead_resp = updated_lookahead_resp
        else:
            first_task = remaining_query_tasks[0]
            relevant_lookahead_resp = updated_lookahead_resp[: first_task.start_idx]
        return relevant_lookahead_resp

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Query and get response."""
        print_text(f"Query: {query_bundle.query_str}\n", color="green")
        cur_response = ""
        source_nodes = []
        for iter in range(self._max_iterations):
            if self._verbose:
                print_text(f"Current response: {cur_response}\n", color="blue")
            # generate "lookahead response" that contains "[Search(query)]" tags
            # e.g.
            # The colors on the flag of Ghana have the following meanings. Red is
            # for [Search(Ghana flag meaning)],...
            lookahead_resp = self._llm.predict(
                self._instruct_prompt,
                query_str=query_bundle.query_str,
                existing_answer=cur_response,
            )
            lookahead_resp = lookahead_resp.strip()
            if self._verbose:
                print_text(f"Lookahead response: {lookahead_resp}\n", color="pink")

            is_done, fmt_lookahead = self._done_output_parser.parse(lookahead_resp)
            if is_done:
                cur_response = cur_response.strip() + " " + fmt_lookahead.strip()
                break

            # parse lookahead response into query tasks
            query_tasks = self._query_task_output_parser.parse(lookahead_resp)

            # get answers for each query task
            query_tasks = query_tasks[: self._max_lookahead_query_tasks]
            query_answers = []
            for _, query_task in enumerate(query_tasks):
                answer_obj = self._query_engine.query(query_task.query_str)
                if not isinstance(answer_obj, Response):
                    raise ValueError(
                        f"Expected Response object, got {type(answer_obj)} instead."
                    )
                query_answer = str(answer_obj)
                query_answers.append(query_answer)
                source_nodes.extend(answer_obj.source_nodes)

            # fill in the lookahead response template with the query answers
            # from the query engine
            updated_lookahead_resp = self._lookahead_answer_inserter.insert(
                lookahead_resp, query_tasks, query_answers, prev_response=cur_response
            )

            # get "relevant" lookahead response by truncating the updated
            # lookahead response until the start position of the first tag
            # also remove the prefix from the lookahead response, so that
            # we can concatenate it with the existing response
            relevant_lookahead_resp_wo_prefix = self._get_relevant_lookahead_response(
                updated_lookahead_resp
            )

            if self._verbose:
                print_text(
                    "Updated lookahead response: "
                    + f"{relevant_lookahead_resp_wo_prefix}\n",
                    color="pink",
                )

            # append the relevant lookahead response to the final response
            cur_response = (
                cur_response.strip() + " " + relevant_lookahead_resp_wo_prefix.strip()
            )

        # NOTE: at the moment, does not support streaming
        return Response(response=cur_response, source_nodes=source_nodes)

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        return self._query(query_bundle)

    def retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        # if the query engine is a retriever, then use the retrieve method
        if isinstance(self._query_engine, RetrieverQueryEngine):
            return self._query_engine.retrieve(query_bundle)
        else:
            raise NotImplementedError(
                "This query engine does not support retrieve, use query directly"
            )

    async def aretrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        # if the query engine is a retriever, then use the retrieve method
        if isinstance(self._query_engine, RetrieverQueryEngine):
            return await self._query_engine.aretrieve(query_bundle)
        else:
            raise NotImplementedError(
                "This query engine does not support retrieve, use query directly"
            )

可组合图查询引擎 #

基类:EventBaseQueryEngine

可组合图查询引擎。

该查询引擎可在组合图上运行。 它能够为其子索引接收自定义查询引擎。

参数:

名称 类型 描述 默认
graph ComposableGraph

一个可组合图对象。

required
custom_query_engines Optional[Dict[str, BaseQueryEngine]]

自定义查询引擎的字典。

None
recursive bool

是否递归查询图。

True
**kwargs Any

传递给底层索引查询引擎的额外参数。

{}
workflows/handler.py 中的源代码llama_index/core/query_engine/graph_query_engine.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class ComposableGraphQueryEngine(BaseQueryEngine):
    """
    Composable graph query engine.

    This query engine can operate over a ComposableGraph.
    It can take in custom query engines for its sub-indices.

    Args:
        graph (ComposableGraph): A ComposableGraph object.
        custom_query_engines (Optional[Dict[str, BaseQueryEngine]]): A dictionary of
            custom query engines.
        recursive (bool): Whether to recursively query the graph.
        **kwargs: additional arguments to be passed to the underlying index query
            engine.

    """

    def __init__(
        self,
        graph: ComposableGraph,
        custom_query_engines: Optional[Dict[str, BaseQueryEngine]] = None,
        recursive: bool = True,
        **kwargs: Any,
    ) -> None:
        """Init params."""
        self._graph = graph
        self._custom_query_engines = custom_query_engines or {}
        self._kwargs = kwargs

        # additional configs
        self._recursive = recursive
        callback_manager = Settings.callback_manager
        super().__init__(callback_manager=callback_manager)

    def _get_prompt_modules(self) -> Dict[str, Any]:
        """Get prompt modules."""
        return {}

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        return self._query_index(query_bundle, index_id=None, level=0)

    @dispatcher.span
    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        return self._query_index(query_bundle, index_id=None, level=0)

    def _query_index(
        self,
        query_bundle: QueryBundle,
        index_id: Optional[str] = None,
        level: int = 0,
    ) -> RESPONSE_TYPE:
        """Query a single index."""
        index_id = index_id or self._graph.root_id

        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            # get query engine
            if index_id in self._custom_query_engines:
                query_engine = self._custom_query_engines[index_id]
            else:
                query_engine = self._graph.get_index(index_id).as_query_engine(
                    **self._kwargs
                )

            with self.callback_manager.event(
                CBEventType.RETRIEVE,
                payload={EventPayload.QUERY_STR: query_bundle.query_str},
            ) as retrieve_event:
                nodes = query_engine.retrieve(query_bundle)
                retrieve_event.on_end(payload={EventPayload.NODES: nodes})

            if self._recursive:
                # do recursion here
                nodes_for_synthesis = []
                additional_source_nodes = []
                for node_with_score in nodes:
                    node_with_score, source_nodes = self._fetch_recursive_nodes(
                        node_with_score, query_bundle, level
                    )
                    nodes_for_synthesis.append(node_with_score)
                    additional_source_nodes.extend(source_nodes)
                response = query_engine.synthesize(
                    query_bundle, nodes_for_synthesis, additional_source_nodes
                )
            else:
                response = query_engine.synthesize(query_bundle, nodes)

            query_event.on_end(payload={EventPayload.RESPONSE: response})

        return response

    def _fetch_recursive_nodes(
        self,
        node_with_score: NodeWithScore,
        query_bundle: QueryBundle,
        level: int,
    ) -> Tuple[NodeWithScore, List[NodeWithScore]]:
        """
        Fetch nodes.

        Uses existing node if it's not an index node.
        Otherwise fetch response from corresponding index.

        """
        if isinstance(node_with_score.node, IndexNode):
            index_node = node_with_score.node
            # recursive call
            response = self._query_index(query_bundle, index_node.index_id, level + 1)

            new_node = TextNode(text=str(response))
            new_node_with_score = NodeWithScore(
                node=new_node, score=node_with_score.score
            )
            return new_node_with_score, response.source_nodes
        else:
            return node_with_score, []

JSONalyze查询引擎 #

JSONalyze 查询引擎。

已弃用:请改用来自 llama-index-experimentalJSONalyzeQueryEngine

workflows/handler.py 中的源代码llama_index/core/query_engine/jsonalyze/jsonalyze_query_engine.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class JSONalyzeQueryEngine:
    """
    JSONalyze query engine.

    DEPRECATED: Use `JSONalyzeQueryEngine` from `llama-index-experimental` instead.
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        raise DeprecationWarning(
            "JSONalyzeQueryEngine has been moved to `llama-index-experimental`.\n"
            "`pip install llama-index-experimental`\n"
            "`from llama_index.experimental.query_engine import JSONalyzeQueryEngine`\n"
            "Note that the JSONalyzeQueryEngine allows for arbitrary file creation, \n"
            "and should be used in a secure environment."
        )

知识图谱查询引擎 #

基类:EventBaseQueryEngine

知识图谱查询引擎。

用于调用知识图谱的查询引擎。

参数:

名称 类型 描述 默认
storage_context Optional[StorageContext]

要使用的存储上下文。

None
refresh_schema bool

是否刷新架构。

False
verbose bool

是否打印中间结果。

False
response_synthesizer Optional[BaseSynthesizer]

一个 BaseSynthesizer 对象。

None
**kwargs Any

额外的关键字参数。

{}
workflows/handler.py 中的源代码llama_index/core/query_engine/knowledge_graph_query_engine.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
@deprecated.deprecated(
    version="0.10.53",
    reason=(
        "KnowledgeGraphQueryEngine is deprecated. It is recommended to use "
        "the PropertyGraphIndex and associated retrievers instead."
    ),
)
class KnowledgeGraphQueryEngine(BaseQueryEngine):
    """
    Knowledge graph query engine.

    Query engine to call a knowledge graph.

    Args:
        storage_context (Optional[StorageContext]): A storage context to use.
        refresh_schema (bool): Whether to refresh the schema.
        verbose (bool): Whether to print intermediate results.
        response_synthesizer (Optional[BaseSynthesizer]):
            A BaseSynthesizer object.
        **kwargs: Additional keyword arguments.

    """

    def __init__(
        self,
        llm: Optional[LLM] = None,
        storage_context: Optional[StorageContext] = None,
        graph_query_synthesis_prompt: Optional[BasePromptTemplate] = None,
        graph_response_answer_prompt: Optional[BasePromptTemplate] = None,
        refresh_schema: bool = False,
        verbose: bool = False,
        response_synthesizer: Optional[BaseSynthesizer] = None,
        **kwargs: Any,
    ):
        # Ensure that we have a graph store
        assert storage_context is not None, "Must provide a storage context."
        assert storage_context.graph_store is not None, (
            "Must provide a graph store in the storage context."
        )
        self._storage_context = storage_context
        self.graph_store = storage_context.graph_store

        self._llm = llm or Settings.llm

        # Get Graph schema
        self._graph_schema = self.graph_store.get_schema(refresh=refresh_schema)

        # Get graph store query synthesis prompt
        self._graph_query_synthesis_prompt = graph_query_synthesis_prompt

        self._graph_response_answer_prompt = (
            graph_response_answer_prompt or DEFAULT_KG_RESPONSE_ANSWER_PROMPT
        )
        self._verbose = verbose
        callback_manager = Settings.callback_manager
        self._response_synthesizer = response_synthesizer or get_response_synthesizer(
            llm=self._llm, callback_manager=callback_manager
        )

        super().__init__(callback_manager=callback_manager)

    def _get_prompts(self) -> Dict[str, Any]:
        """Get prompts."""
        return {
            "graph_query_synthesis_prompt": self._graph_query_synthesis_prompt,
            "graph_response_answer_prompt": self._graph_response_answer_prompt,
        }

    def _update_prompts(self, prompts: PromptDictType) -> None:
        """Update prompts."""
        if "graph_query_synthesis_prompt" in prompts:
            self._graph_query_synthesis_prompt = prompts["graph_query_synthesis_prompt"]
        if "graph_response_answer_prompt" in prompts:
            self._graph_response_answer_prompt = prompts["graph_response_answer_prompt"]

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {"response_synthesizer": self._response_synthesizer}

    def generate_query(self, query_str: str) -> str:
        """Generate a Graph Store Query from a query bundle."""
        # Get the query engine query string

        graph_store_query: str = self._llm.predict(
            self._graph_query_synthesis_prompt,
            query_str=query_str,
            schema=self._graph_schema,
        )

        return graph_store_query

    async def agenerate_query(self, query_str: str) -> str:
        """Generate a Graph Store Query from a query bundle."""
        # Get the query engine query string

        graph_store_query: str = await self._llm.apredict(
            self._graph_query_synthesis_prompt,
            query_str=query_str,
            schema=self._graph_schema,
        )

        return graph_store_query

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        """Get nodes for response."""
        graph_store_query = self.generate_query(query_bundle.query_str)
        if self._verbose:
            print_text(f"Graph Store Query:\n{graph_store_query}\n", color="yellow")
        logger.debug(f"Graph Store Query:\n{graph_store_query}")

        with self.callback_manager.event(
            CBEventType.RETRIEVE,
            payload={EventPayload.QUERY_STR: graph_store_query},
        ) as retrieve_event:
            # Get the graph store response
            graph_store_response = self.graph_store.query(query=graph_store_query)
            if self._verbose:
                print_text(
                    f"Graph Store Response:\n{graph_store_response}\n",
                    color="yellow",
                )
            logger.debug(f"Graph Store Response:\n{graph_store_response}")

            retrieve_event.on_end(payload={EventPayload.RESPONSE: graph_store_response})

        retrieved_graph_context: Sequence = self._graph_response_answer_prompt.format(
            query_str=query_bundle.query_str,
            kg_query_str=graph_store_query,
            kg_response_str=graph_store_response,
        )

        node = NodeWithScore(
            node=TextNode(
                text=retrieved_graph_context,
                metadata={
                    "query_str": query_bundle.query_str,
                    "graph_store_query": graph_store_query,
                    "graph_store_response": graph_store_response,
                    "graph_schema": self._graph_schema,
                },
            ),
            score=1.0,
        )
        return [node]

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Query the graph store."""
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            nodes: List[NodeWithScore] = self._retrieve(query_bundle)

            response = self._response_synthesizer.synthesize(
                query=query_bundle,
                nodes=nodes,
            )

            if self._verbose:
                print_text(f"Final Response: {response}\n", color="green")

            query_event.on_end(payload={EventPayload.RESPONSE: response})

        return response

    async def _aretrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        graph_store_query = await self.agenerate_query(query_bundle.query_str)
        if self._verbose:
            print_text(f"Graph Store Query:\n{graph_store_query}\n", color="yellow")
        logger.debug(f"Graph Store Query:\n{graph_store_query}")

        with self.callback_manager.event(
            CBEventType.RETRIEVE,
            payload={EventPayload.QUERY_STR: graph_store_query},
        ) as retrieve_event:
            # Get the graph store response
            # TBD: This is a blocking call. We need to make it async.
            graph_store_response = self.graph_store.query(query=graph_store_query)
            if self._verbose:
                print_text(
                    f"Graph Store Response:\n{graph_store_response}\n",
                    color="yellow",
                )
            logger.debug(f"Graph Store Response:\n{graph_store_response}")

            retrieve_event.on_end(payload={EventPayload.RESPONSE: graph_store_response})

        retrieved_graph_context: Sequence = self._graph_response_answer_prompt.format(
            query_str=query_bundle.query_str,
            kg_query_str=graph_store_query,
            kg_response_str=graph_store_response,
        )

        node = NodeWithScore(
            node=TextNode(
                text=retrieved_graph_context,
                metadata={
                    "query_str": query_bundle.query_str,
                    "graph_store_query": graph_store_query,
                    "graph_store_response": graph_store_response,
                    "graph_schema": self._graph_schema,
                },
            ),
            score=1.0,
        )
        return [node]

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Query the graph store."""
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            nodes = await self._aretrieve(query_bundle)
            response = await self._response_synthesizer.asynthesize(
                query=query_bundle,
                nodes=nodes,
            )

            if self._verbose:
                print_text(f"Final Response: {response}\n", color="green")

            query_event.on_end(payload={EventPayload.RESPONSE: response})

        return response

generate_query #

generate_query(query_str: str) -> str

根据查询包生成图存储查询。

workflows/handler.py 中的源代码llama_index/core/query_engine/knowledge_graph_query_engine.py
125
126
127
128
129
130
131
132
133
134
135
def generate_query(self, query_str: str) -> str:
    """Generate a Graph Store Query from a query bundle."""
    # Get the query engine query string

    graph_store_query: str = self._llm.predict(
        self._graph_query_synthesis_prompt,
        query_str=query_str,
        schema=self._graph_schema,
    )

    return graph_store_query

agenerate_query async #

agenerate_query(query_str: str) -> str

根据查询包生成图存储查询。

workflows/handler.py 中的源代码llama_index/core/query_engine/knowledge_graph_query_engine.py
137
138
139
140
141
142
143
144
145
146
147
async def agenerate_query(self, query_str: str) -> str:
    """Generate a Graph Store Query from a query bundle."""
    # Get the query engine query string

    graph_store_query: str = await self._llm.apredict(
        self._graph_query_synthesis_prompt,
        query_str=query_str,
        schema=self._graph_schema,
    )

    return graph_store_query

简单多模态查询引擎 #

基类:EventBaseQueryEngine

简单的多模态检索器查询引擎。

假设检索到的文本上下文以及图像能够适配LLM的上下文窗口。

参数:

名称 类型 描述 默认
retriever MultiModalVectorIndexRetriever

一个检索器对象。

required
multi_modal_llm Optional[大语言模型]

一个大型语言模型。

None
text_qa_template Optional[BasePromptTemplate]

文本问答提示模板。

None
image_qa_template Optional[BasePromptTemplate]

图像问答提示模板。

None
node_postprocessors Optional[List[BaseNodePostprocessor]]

节点后处理器。

None
callback_manager Optional[CallbackManager]

一个回调管理器。

None
workflows/handler.py 中的源代码llama_index/core/query_engine/multi_modal.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
class SimpleMultiModalQueryEngine(BaseQueryEngine):
    """
    Simple Multi Modal Retriever query engine.

    Assumes that retrieved text context fits within context window of LLM, along with images.

    Args:
        retriever (MultiModalVectorIndexRetriever): A retriever object.
        multi_modal_llm (Optional[LLM]): An LLM model.
        text_qa_template (Optional[BasePromptTemplate]): Text QA Prompt Template.
        image_qa_template (Optional[BasePromptTemplate]): Image QA Prompt Template.
        node_postprocessors (Optional[List[BaseNodePostprocessor]]): Node Postprocessors.
        callback_manager (Optional[CallbackManager]): A callback manager.

    """

    def __init__(
        self,
        retriever: "MultiModalVectorIndexRetriever",
        multi_modal_llm: Optional[LLM] = None,
        text_qa_template: Optional[BasePromptTemplate] = None,
        image_qa_template: Optional[BasePromptTemplate] = None,
        node_postprocessors: Optional[List[BaseNodePostprocessor]] = None,
        callback_manager: Optional[CallbackManager] = None,
        **kwargs: Any,
    ) -> None:
        self._retriever = retriever
        if multi_modal_llm:
            self._multi_modal_llm = multi_modal_llm
        else:
            try:
                from llama_index.llms.openai import (
                    OpenAIResponses,
                )  # pants: no-infer-dep

                self._multi_modal_llm = OpenAIResponses(
                    model="gpt-4.1", max_output_tokens=1000
                )
            except ImportError as e:
                raise ImportError(
                    "`llama-index-llms-openai` package cannot be found. "
                    "Please install it by using `pip install `llama-index-llms-openai`"
                )
        self._text_qa_template = text_qa_template or DEFAULT_TEXT_QA_PROMPT
        self._image_qa_template = image_qa_template or DEFAULT_TEXT_QA_PROMPT

        self._node_postprocessors = node_postprocessors or []
        callback_manager = callback_manager or CallbackManager([])
        for node_postprocessor in self._node_postprocessors:
            node_postprocessor.callback_manager = callback_manager

        super().__init__(callback_manager)

    def _get_prompts(self) -> Dict[str, Any]:
        """Get prompts."""
        return {"text_qa_template": self._text_qa_template}

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {}

    def _apply_node_postprocessors(
        self, nodes: List[NodeWithScore], query_bundle: QueryBundle
    ) -> List[NodeWithScore]:
        for node_postprocessor in self._node_postprocessors:
            nodes = node_postprocessor.postprocess_nodes(
                nodes, query_bundle=query_bundle
            )
        return nodes

    def retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        nodes = self._retriever.retrieve(query_bundle)
        return self._apply_node_postprocessors(nodes, query_bundle=query_bundle)

    async def aretrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        nodes = await self._retriever.aretrieve(query_bundle)
        return self._apply_node_postprocessors(nodes, query_bundle=query_bundle)

    def synthesize(
        self,
        query_bundle: QueryBundle,
        nodes: List[NodeWithScore],
        additional_source_nodes: Optional[Sequence[NodeWithScore]] = None,
    ) -> RESPONSE_TYPE:
        image_nodes, text_nodes = _get_image_and_text_nodes(nodes)
        context_str = "\n\n".join(
            [r.get_content(metadata_mode=MetadataMode.LLM) for r in text_nodes]
        )
        fmt_prompt = self._text_qa_template.format(
            context_str=context_str, query_str=query_bundle.query_str
        )

        blocks: List[Union[ImageBlock, TextBlock]] = [
            image_node_to_image_block(image_node.node)
            for image_node in image_nodes
            if isinstance(image_node.node, ImageNode)
        ]

        blocks.append(TextBlock(text=fmt_prompt))

        llm_response = self._multi_modal_llm.chat(
            [ChatMessage(role="user", blocks=blocks)]
        )
        return Response(
            response=llm_response.message.content,
            source_nodes=nodes,
            metadata={"text_nodes": text_nodes, "image_nodes": image_nodes},
        )

    def _get_response_with_images(
        self,
        prompt_str: str,
        image_nodes: List[NodeWithScore],
    ) -> RESPONSE_TYPE:
        assert all(isinstance(node.node, ImageNode) for node in image_nodes)

        fmt_prompt = self._image_qa_template.format(
            query_str=prompt_str,
        )

        blocks: List[Union[ImageBlock, TextBlock]] = [
            image_node_to_image_block(image_node.node)
            for image_node in image_nodes
            if isinstance(image_node.node, ImageNode)
        ]

        blocks.append(TextBlock(text=fmt_prompt))

        llm_response = self._multi_modal_llm.chat(
            [ChatMessage(role="user", blocks=blocks)]
        )
        return Response(
            response=llm_response.message.content,
            source_nodes=image_nodes,
            metadata={"image_nodes": image_nodes},
        )

    async def asynthesize(
        self,
        query_bundle: QueryBundle,
        nodes: List[NodeWithScore],
        additional_source_nodes: Optional[Sequence[NodeWithScore]] = None,
    ) -> RESPONSE_TYPE:
        image_nodes, text_nodes = _get_image_and_text_nodes(nodes)
        context_str = "\n\n".join(
            [r.get_content(metadata_mode=MetadataMode.LLM) for r in text_nodes]
        )
        fmt_prompt = self._text_qa_template.format(
            context_str=context_str, query_str=query_bundle.query_str
        )

        blocks: List[Union[ImageBlock, TextBlock]] = [
            image_node_to_image_block(image_node.node)
            for image_node in image_nodes
            if isinstance(image_node.node, ImageNode)
        ]

        blocks.append(TextBlock(text=fmt_prompt))

        llm_response = await self._multi_modal_llm.achat(
            [ChatMessage(role="user", blocks=blocks)]
        )
        return Response(
            response=llm_response.message.content,
            source_nodes=nodes,
            metadata={"text_nodes": text_nodes, "image_nodes": image_nodes},
        )

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Answer a query."""
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            with self.callback_manager.event(
                CBEventType.RETRIEVE,
                payload={EventPayload.QUERY_STR: query_bundle.query_str},
            ) as retrieve_event:
                nodes = self.retrieve(query_bundle)

                retrieve_event.on_end(
                    payload={EventPayload.NODES: nodes},
                )

            response = self.synthesize(
                query_bundle,
                nodes=nodes,
            )

            query_event.on_end(payload={EventPayload.RESPONSE: response})

        return response

    def image_query(self, image_path: QueryType, prompt_str: str) -> RESPONSE_TYPE:
        """Answer a image query."""
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: str(image_path)}
        ) as query_event:
            with self.callback_manager.event(
                CBEventType.RETRIEVE,
                payload={EventPayload.QUERY_STR: str(image_path)},
            ) as retrieve_event:
                nodes = self._retriever.image_to_image_retrieve(image_path)

                retrieve_event.on_end(
                    payload={EventPayload.NODES: nodes},
                )

            image_nodes, _ = _get_image_and_text_nodes(nodes)
            response = self._get_response_with_images(
                prompt_str=prompt_str,
                image_nodes=image_nodes,
            )

            query_event.on_end(payload={EventPayload.RESPONSE: response})

        return response

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Answer a query."""
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            with self.callback_manager.event(
                CBEventType.RETRIEVE,
                payload={EventPayload.QUERY_STR: query_bundle.query_str},
            ) as retrieve_event:
                nodes = await self.aretrieve(query_bundle)

                retrieve_event.on_end(
                    payload={EventPayload.NODES: nodes},
                )

            response = await self.asynthesize(
                query_bundle,
                nodes=nodes,
            )

            query_event.on_end(payload={EventPayload.RESPONSE: response})

        return response

    @property
    def retriever(self) -> "MultiModalVectorIndexRetriever":
        """Get the retriever object."""
        return self._retriever

retriever property #

retriever: MultiModalVectorIndexRetriever

获取检索器对象。

image_query #

image_query(image_path: QueryType, prompt_str: str) -> RESPONSE_TYPE

回答一个图像查询。

workflows/handler.py 中的源代码llama_index/core/query_engine/multi_modal.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def image_query(self, image_path: QueryType, prompt_str: str) -> RESPONSE_TYPE:
    """Answer a image query."""
    with self.callback_manager.event(
        CBEventType.QUERY, payload={EventPayload.QUERY_STR: str(image_path)}
    ) as query_event:
        with self.callback_manager.event(
            CBEventType.RETRIEVE,
            payload={EventPayload.QUERY_STR: str(image_path)},
        ) as retrieve_event:
            nodes = self._retriever.image_to_image_retrieve(image_path)

            retrieve_event.on_end(
                payload={EventPayload.NODES: nodes},
            )

        image_nodes, _ = _get_image_and_text_nodes(nodes)
        response = self._get_response_with_images(
            prompt_str=prompt_str,
            image_nodes=image_nodes,
        )

        query_event.on_end(payload={EventPayload.RESPONSE: response})

    return response

多步查询引擎 #

基类:EventBaseQueryEngine

多步骤查询引擎。

该查询引擎可以在现有基础查询引擎之上运行,同时支持多步骤查询转换。

参数:

名称 类型 描述 默认
query_engine BaseQueryEngine

一个 BaseQueryEngine 对象。

required
query_transform StepDecomposeQueryTransform

一个 StepDecomposeQueryTransform 对象。

required
response_synthesizer Optional[BaseSynthesizer]

一个 BaseSynthesizer 对象。

None
num_steps Optional[int]

运行多步查询的步骤数量。

3
early_stopping bool

如果停止函数返回 True,是否提前停止。

True
index_summary str

索引的字符串摘要。

'None'
stop_fn Optional[Callable[[Dict], bool]]

一个停止函数,接收信息字典并返回布尔值。

None
workflows/handler.py 中的源代码llama_index/core/query_engine/multistep_query_engine.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class MultiStepQueryEngine(BaseQueryEngine):
    """
    Multi-step query engine.

    This query engine can operate over an existing base query engine,
    along with the multi-step query transform.

    Args:
        query_engine (BaseQueryEngine): A BaseQueryEngine object.
        query_transform (StepDecomposeQueryTransform): A StepDecomposeQueryTransform
            object.
        response_synthesizer (Optional[BaseSynthesizer]): A BaseSynthesizer
            object.
        num_steps (Optional[int]): Number of steps to run the multi-step query.
        early_stopping (bool): Whether to stop early if the stop function returns True.
        index_summary (str): A string summary of the index.
        stop_fn (Optional[Callable[[Dict], bool]]): A stop function that takes in a
            dictionary of information and returns a boolean.

    """

    def __init__(
        self,
        query_engine: BaseQueryEngine,
        query_transform: StepDecomposeQueryTransform,
        response_synthesizer: Optional[BaseSynthesizer] = None,
        num_steps: Optional[int] = 3,
        early_stopping: bool = True,
        index_summary: str = "None",
        stop_fn: Optional[Callable[[Dict], bool]] = None,
    ) -> None:
        self._query_engine = query_engine
        self._query_transform = query_transform
        self._response_synthesizer = response_synthesizer or get_response_synthesizer(
            callback_manager=self._query_engine.callback_manager
        )

        self._index_summary = index_summary
        self._num_steps = num_steps
        self._early_stopping = early_stopping
        # TODO: make interface to stop function better
        self._stop_fn = stop_fn or default_stop_fn
        # num_steps must be provided if early_stopping is False
        if not self._early_stopping and self._num_steps is None:
            raise ValueError("Must specify num_steps if early_stopping is False.")

        callback_manager = self._query_engine.callback_manager
        super().__init__(callback_manager)

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {
            "response_synthesizer": self._response_synthesizer,
            "query_transform": self._query_transform,
        }

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            nodes, source_nodes, metadata = self._query_multistep(query_bundle)

            final_response = self._response_synthesizer.synthesize(
                query=query_bundle,
                nodes=nodes,
                additional_source_nodes=source_nodes,
            )
            final_response.metadata = metadata

            query_event.on_end(payload={EventPayload.RESPONSE: final_response})

        return final_response

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            nodes, source_nodes, metadata = self._query_multistep(query_bundle)

            final_response = await self._response_synthesizer.asynthesize(
                query=query_bundle,
                nodes=nodes,
                additional_source_nodes=source_nodes,
            )
            final_response.metadata = metadata

            query_event.on_end(payload={EventPayload.RESPONSE: final_response})

        return final_response

    def _combine_queries(
        self, query_bundle: QueryBundle, prev_reasoning: str
    ) -> QueryBundle:
        """Combine queries."""
        transform_metadata = {
            "prev_reasoning": prev_reasoning,
            "index_summary": self._index_summary,
        }
        return self._query_transform(query_bundle, metadata=transform_metadata)

    def _query_multistep(
        self, query_bundle: QueryBundle
    ) -> Tuple[List[NodeWithScore], List[NodeWithScore], Dict[str, Any]]:
        """Run query combiner."""
        prev_reasoning = ""
        cur_response = None
        should_stop = False
        cur_steps = 0

        # use response
        final_response_metadata: Dict[str, Any] = {"sub_qa": []}

        text_chunks = []
        source_nodes = []
        while not should_stop:
            if self._num_steps is not None and cur_steps >= self._num_steps:
                should_stop = True
                break
            elif should_stop:
                break

            updated_query_bundle = self._combine_queries(query_bundle, prev_reasoning)

            # TODO: make stop logic better
            stop_dict = {"query_bundle": updated_query_bundle}
            if self._stop_fn(stop_dict):
                should_stop = True
                break

            cur_response = self._query_engine.query(updated_query_bundle)

            # append to response builder
            cur_qa_text = (
                f"\nQuestion: {updated_query_bundle.query_str}\n"
                f"Answer: {cur_response!s}"
            )
            text_chunks.append(cur_qa_text)
            for source_node in cur_response.source_nodes:
                source_nodes.append(source_node)
            # update metadata
            final_response_metadata["sub_qa"].append(
                (updated_query_bundle.query_str, cur_response)
            )

            prev_reasoning += (
                f"- {updated_query_bundle.query_str}\n- {cur_response!s}\n"
            )
            cur_steps += 1

        nodes = [
            NodeWithScore(node=TextNode(text=text_chunk)) for text_chunk in text_chunks
        ]
        return nodes, source_nodes, final_response_metadata

Pandas查询引擎 #

Pandas 查询引擎。

DEPRECATED: Use PandasQueryEngine from llama-index-experimental instead.

workflows/handler.py 中的源代码llama_index/core/query_engine/pandas/pandas_query_engine.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class PandasQueryEngine:
    """
    Pandas query engine.

    DEPRECATED: Use `PandasQueryEngine` from `llama-index-experimental` instead.
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        raise DeprecationWarning(
            "PandasQueryEngine has been moved to `llama-index-experimental`.\n"
            "`pip install llama-index-experimental`\n"
            "`from llama_index.experimental.query_engine import PandasQueryEngine`\n"
            "Note that the PandasQueryEngine allows for arbitrary code execution, \n"
            "and should be used in a secure environment."
        )

检索器查询引擎 #

基类:EventBaseQueryEngine

检索器查询引擎。

参数:

名称 类型 描述 默认
retriever BaseRetriever

一个检索器对象。

required
response_synthesizer Optional[BaseSynthesizer]

一个 BaseSynthesizer 对象。

None
callback_manager Optional[CallbackManager]

一个回调管理器。

None
workflows/handler.py 中的源代码llama_index/core/query_engine/retriever_query_engine.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
class RetrieverQueryEngine(BaseQueryEngine):
    """
    Retriever query engine.

    Args:
        retriever (BaseRetriever): A retriever object.
        response_synthesizer (Optional[BaseSynthesizer]): A BaseSynthesizer
            object.
        callback_manager (Optional[CallbackManager]): A callback manager.

    """

    def __init__(
        self,
        retriever: BaseRetriever,
        response_synthesizer: Optional[BaseSynthesizer] = None,
        node_postprocessors: Optional[List[BaseNodePostprocessor]] = None,
        callback_manager: Optional[CallbackManager] = None,
    ) -> None:
        self._retriever = retriever
        self._response_synthesizer = response_synthesizer or get_response_synthesizer(
            llm=Settings.llm,
            callback_manager=callback_manager or Settings.callback_manager,
        )

        self._node_postprocessors = node_postprocessors or []
        callback_manager = (
            callback_manager or self._response_synthesizer.callback_manager
        )
        for node_postprocessor in self._node_postprocessors:
            node_postprocessor.callback_manager = callback_manager
        super().__init__(callback_manager=callback_manager)

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {"response_synthesizer": self._response_synthesizer}

    @classmethod
    def from_args(
        cls,
        retriever: BaseRetriever,
        llm: Optional[LLM] = None,
        response_synthesizer: Optional[BaseSynthesizer] = None,
        node_postprocessors: Optional[List[BaseNodePostprocessor]] = None,
        callback_manager: Optional[CallbackManager] = None,
        # response synthesizer args
        response_mode: ResponseMode = ResponseMode.COMPACT,
        text_qa_template: Optional[BasePromptTemplate] = None,
        refine_template: Optional[BasePromptTemplate] = None,
        summary_template: Optional[BasePromptTemplate] = None,
        simple_template: Optional[BasePromptTemplate] = None,
        output_cls: Optional[Type[BaseModel]] = None,
        use_async: bool = False,
        streaming: bool = False,
        verbose: bool = False,
        **kwargs: Any,
    ) -> "RetrieverQueryEngine":
        """
        Initialize a RetrieverQueryEngine object.".

        Args:
            retriever (BaseRetriever): A retriever object.
            llm (Optional[LLM]): An instance of an LLM.
            response_synthesizer (Optional[BaseSynthesizer]): An instance of a response
                synthesizer.
            node_postprocessors (Optional[List[BaseNodePostprocessor]]): A list of
                node postprocessors.
            callback_manager (Optional[CallbackManager]): A callback manager.
            response_mode (ResponseMode): A ResponseMode object.
            text_qa_template (Optional[BasePromptTemplate]): A BasePromptTemplate
                object.
            refine_template (Optional[BasePromptTemplate]): A BasePromptTemplate object.
            summary_template (Optional[BasePromptTemplate]): A BasePromptTemplate object.
            simple_template (Optional[BasePromptTemplate]): A BasePromptTemplate object.
            output_cls (Optional[Type[BaseModel]]): The pydantic model to pass to the
                response synthesizer.
            use_async (bool): Whether to use async.
            streaming (bool): Whether to use streaming.
            verbose (bool): Whether to print verbose output.

        """
        llm = llm or Settings.llm

        response_synthesizer = response_synthesizer or get_response_synthesizer(
            llm=llm,
            text_qa_template=text_qa_template,
            refine_template=refine_template,
            summary_template=summary_template,
            simple_template=simple_template,
            response_mode=response_mode,
            output_cls=output_cls,
            use_async=use_async,
            streaming=streaming,
            verbose=verbose,
        )

        callback_manager = callback_manager or Settings.callback_manager

        return cls(
            retriever=retriever,
            response_synthesizer=response_synthesizer,
            callback_manager=callback_manager,
            node_postprocessors=node_postprocessors,
        )

    def _apply_node_postprocessors(
        self, nodes: List[NodeWithScore], query_bundle: QueryBundle
    ) -> List[NodeWithScore]:
        for node_postprocessor in self._node_postprocessors:
            nodes = node_postprocessor.postprocess_nodes(
                nodes, query_bundle=query_bundle
            )
        return nodes

    async def _async_apply_node_postprocessors(
        self, nodes: List[NodeWithScore], query_bundle: QueryBundle
    ) -> List[NodeWithScore]:
        for node_postprocessor in self._node_postprocessors:
            nodes = await node_postprocessor.apostprocess_nodes(
                nodes, query_bundle=query_bundle
            )
        return nodes

    def retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        nodes = self._retriever.retrieve(query_bundle)
        return self._apply_node_postprocessors(nodes, query_bundle=query_bundle)

    async def aretrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        nodes = await self._retriever.aretrieve(query_bundle)
        return await self._async_apply_node_postprocessors(
            nodes, query_bundle=query_bundle
        )

    def with_retriever(self, retriever: BaseRetriever) -> "RetrieverQueryEngine":
        return RetrieverQueryEngine(
            retriever=retriever,
            response_synthesizer=self._response_synthesizer,
            callback_manager=self.callback_manager,
            node_postprocessors=self._node_postprocessors,
        )

    def synthesize(
        self,
        query_bundle: QueryBundle,
        nodes: List[NodeWithScore],
        additional_source_nodes: Optional[Sequence[NodeWithScore]] = None,
    ) -> RESPONSE_TYPE:
        return self._response_synthesizer.synthesize(
            query=query_bundle,
            nodes=nodes,
            additional_source_nodes=additional_source_nodes,
        )

    async def asynthesize(
        self,
        query_bundle: QueryBundle,
        nodes: List[NodeWithScore],
        additional_source_nodes: Optional[Sequence[NodeWithScore]] = None,
    ) -> RESPONSE_TYPE:
        return await self._response_synthesizer.asynthesize(
            query=query_bundle,
            nodes=nodes,
            additional_source_nodes=additional_source_nodes,
        )

    @dispatcher.span
    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Answer a query."""
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            nodes = self.retrieve(query_bundle)
            response = self._response_synthesizer.synthesize(
                query=query_bundle,
                nodes=nodes,
            )
            query_event.on_end(payload={EventPayload.RESPONSE: response})

        return response

    @dispatcher.span
    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Answer a query."""
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            nodes = await self.aretrieve(query_bundle)

            response = await self._response_synthesizer.asynthesize(
                query=query_bundle,
                nodes=nodes,
            )

            query_event.on_end(payload={EventPayload.RESPONSE: response})

        return response

    @property
    def retriever(self) -> BaseRetriever:
        """Get the retriever object."""
        return self._retriever

retriever property #

retriever: BaseRetriever

获取检索器对象。

from_args classmethod #

from_args(retriever: BaseRetriever, llm: Optional[大语言模型] = None, response_synthesizer: Optional[BaseSynthesizer] = None, node_postprocessors: Optional[List[BaseNodePostprocessor]] = None, callback_manager: Optional[CallbackManager] = None, response_mode: ResponseMode = 紧凑型, text_qa_template: Optional[BasePromptTemplate] = None, refine_template: Optional[BasePromptTemplate] = None, summary_template: Optional[BasePromptTemplate] = None, simple_template: Optional[BasePromptTemplate] = None, output_cls: Optional[Type[BaseModel]] = None, use_async: bool = False, streaming: bool = False, verbose: bool = False, **kwargs: Any) -> RetrieverQueryEngine

初始化一个 RetrieverQueryEngine 对象。

参数:

名称 类型 描述 默认
retriever BaseRetriever

一个检索器对象。

required
llm Optional[大语言模型]

一个大型语言模型的实例。

None
response_synthesizer Optional[BaseSynthesizer]

响应合成器的一个实例。

None
node_postprocessors Optional[List[BaseNodePostprocessor]]

一个节点后处理器的列表。

None
callback_manager Optional[CallbackManager]

一个回调管理器。

None
response_mode ResponseMode

一个 ResponseMode 对象。

紧凑型
text_qa_template Optional[BasePromptTemplate]

一个 BasePromptTemplate 对象。

None
refine_template Optional[BasePromptTemplate]

一个BasePromptTemplate对象。

None
summary_template Optional[BasePromptTemplate]

一个BasePromptTemplate对象。

None
simple_template Optional[BasePromptTemplate]

一个BasePromptTemplate对象。

None
output_cls Optional[Type[BaseModel]]

传递给响应合成器的pydantic模型。

None
use_async bool

是否使用异步。

False
streaming bool

是否使用流式传输。

False
verbose bool

是否打印详细输出。

False
workflows/handler.py 中的源代码llama_index/core/query_engine/retriever_query_engine.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
@classmethod
def from_args(
    cls,
    retriever: BaseRetriever,
    llm: Optional[LLM] = None,
    response_synthesizer: Optional[BaseSynthesizer] = None,
    node_postprocessors: Optional[List[BaseNodePostprocessor]] = None,
    callback_manager: Optional[CallbackManager] = None,
    # response synthesizer args
    response_mode: ResponseMode = ResponseMode.COMPACT,
    text_qa_template: Optional[BasePromptTemplate] = None,
    refine_template: Optional[BasePromptTemplate] = None,
    summary_template: Optional[BasePromptTemplate] = None,
    simple_template: Optional[BasePromptTemplate] = None,
    output_cls: Optional[Type[BaseModel]] = None,
    use_async: bool = False,
    streaming: bool = False,
    verbose: bool = False,
    **kwargs: Any,
) -> "RetrieverQueryEngine":
    """
    Initialize a RetrieverQueryEngine object.".

    Args:
        retriever (BaseRetriever): A retriever object.
        llm (Optional[LLM]): An instance of an LLM.
        response_synthesizer (Optional[BaseSynthesizer]): An instance of a response
            synthesizer.
        node_postprocessors (Optional[List[BaseNodePostprocessor]]): A list of
            node postprocessors.
        callback_manager (Optional[CallbackManager]): A callback manager.
        response_mode (ResponseMode): A ResponseMode object.
        text_qa_template (Optional[BasePromptTemplate]): A BasePromptTemplate
            object.
        refine_template (Optional[BasePromptTemplate]): A BasePromptTemplate object.
        summary_template (Optional[BasePromptTemplate]): A BasePromptTemplate object.
        simple_template (Optional[BasePromptTemplate]): A BasePromptTemplate object.
        output_cls (Optional[Type[BaseModel]]): The pydantic model to pass to the
            response synthesizer.
        use_async (bool): Whether to use async.
        streaming (bool): Whether to use streaming.
        verbose (bool): Whether to print verbose output.

    """
    llm = llm or Settings.llm

    response_synthesizer = response_synthesizer or get_response_synthesizer(
        llm=llm,
        text_qa_template=text_qa_template,
        refine_template=refine_template,
        summary_template=summary_template,
        simple_template=simple_template,
        response_mode=response_mode,
        output_cls=output_cls,
        use_async=use_async,
        streaming=streaming,
        verbose=verbose,
    )

    callback_manager = callback_manager or Settings.callback_manager

    return cls(
        retriever=retriever,
        response_synthesizer=response_synthesizer,
        callback_manager=callback_manager,
        node_postprocessors=node_postprocessors,
    )

重试指南查询引擎 #

基类:EventBaseQueryEngine

如果查询引擎评估失败,则使用评估器反馈进行重试。

参数:

名称 类型 描述 默认
query_engine BaseQueryEngine

一个查询引擎对象

required
guideline_evaluator GuidelineEvaluator

一个指南评估器对象

required
resynthesize_query bool

是否重新合成查询

False
max_retries int

最大重试次数

3
callback_manager Optional[CallbackManager]

一个回调管理器对象

None
workflows/handler.py 中的源代码llama_index/core/query_engine/retry_query_engine.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class RetryGuidelineQueryEngine(BaseQueryEngine):
    """
    Does retry with evaluator feedback
    if query engine fails evaluation.

    Args:
        query_engine (BaseQueryEngine): A query engine object
        guideline_evaluator (GuidelineEvaluator): A guideline evaluator object
        resynthesize_query (bool): Whether to resynthesize query
        max_retries (int): Maximum number of retries
        callback_manager (Optional[CallbackManager]): A callback manager object

    """

    def __init__(
        self,
        query_engine: BaseQueryEngine,
        guideline_evaluator: GuidelineEvaluator,
        resynthesize_query: bool = False,
        max_retries: int = 3,
        callback_manager: Optional[CallbackManager] = None,
        query_transformer: Optional[FeedbackQueryTransformation] = None,
    ) -> None:
        self._query_engine = query_engine
        self._guideline_evaluator = guideline_evaluator
        self.max_retries = max_retries
        self.resynthesize_query = resynthesize_query
        self.query_transformer = query_transformer or FeedbackQueryTransformation(
            resynthesize_query=self.resynthesize_query
        )
        super().__init__(callback_manager)

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {
            "query_engine": self._query_engine,
            "guideline_evalator": self._guideline_evaluator,
        }

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Answer a query."""
        response = self._query_engine._query(query_bundle)
        assert not isinstance(response, AsyncStreamingResponse)
        if self.max_retries <= 0:
            return response
        typed_response = (
            response if isinstance(response, Response) else response.get_response()
        )
        query_str = query_bundle.query_str
        eval = self._guideline_evaluator.evaluate_response(query_str, typed_response)
        if eval.passing:
            logger.debug("Evaluation returned True.")
            return response
        else:
            logger.debug("Evaluation returned False.")
            new_query_engine = RetryGuidelineQueryEngine(
                self._query_engine,
                self._guideline_evaluator,
                self.resynthesize_query,
                self.max_retries - 1,
                self.callback_manager,
            )
            new_query = self.query_transformer.run(query_bundle, {"evaluation": eval})
            logger.debug("New query: %s", new_query.query_str)
            return new_query_engine.query(new_query)

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Not supported."""
        return self._query(query_bundle)

重试查询引擎 #

基类:EventBaseQueryEngine

如果查询引擎评估失败,将进行重试。

参数:

名称 类型 描述 默认
query_engine BaseQueryEngine

一个查询引擎对象

required
evaluator BaseEvaluator

一个评估器对象

required
max_retries int

最大重试次数

3
callback_manager Optional[CallbackManager]

一个回调管理器对象

None
workflows/handler.py 中的源代码llama_index/core/query_engine/retry_query_engine.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class RetryQueryEngine(BaseQueryEngine):
    """
    Does retry on query engine if it fails evaluation.

    Args:
        query_engine (BaseQueryEngine): A query engine object
        evaluator (BaseEvaluator): An evaluator object
        max_retries (int): Maximum number of retries
        callback_manager (Optional[CallbackManager]): A callback manager object

    """

    def __init__(
        self,
        query_engine: BaseQueryEngine,
        evaluator: BaseEvaluator,
        max_retries: int = 3,
        callback_manager: Optional[CallbackManager] = None,
    ) -> None:
        self._query_engine = query_engine
        self._evaluator = evaluator
        self.max_retries = max_retries
        super().__init__(callback_manager)

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {"query_engine": self._query_engine, "evaluator": self._evaluator}

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Answer a query."""
        response = self._query_engine._query(query_bundle)
        assert not isinstance(response, AsyncStreamingResponse)
        if self.max_retries <= 0:
            return response
        typed_response = (
            response if isinstance(response, Response) else response.get_response()
        )
        query_str = query_bundle.query_str
        eval = self._evaluator.evaluate_response(query_str, typed_response)
        if eval.passing:
            logger.debug("Evaluation returned True.")
            return response
        else:
            logger.debug("Evaluation returned False.")
            new_query_engine = RetryQueryEngine(
                self._query_engine, self._evaluator, self.max_retries - 1
            )
            query_transformer = FeedbackQueryTransformation()
            new_query = query_transformer.run(query_bundle, {"evaluation": eval})
            return new_query_engine.query(new_query)

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Not supported."""
        return self._query(query_bundle)

重试源查询引擎 #

基类:EventBaseQueryEngine

使用不同的源节点重试。

workflows/handler.py 中的源代码llama_index/core/query_engine/retry_source_query_engine.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class RetrySourceQueryEngine(BaseQueryEngine):
    """Retry with different source nodes."""

    def __init__(
        self,
        query_engine: RetrieverQueryEngine,
        evaluator: BaseEvaluator,
        llm: Optional[LLM] = None,
        max_retries: int = 3,
        callback_manager: Optional[CallbackManager] = None,
    ) -> None:
        """Run a BaseQueryEngine with retries."""
        self._query_engine = query_engine
        self._evaluator = evaluator
        self._llm = llm or Settings.llm
        self.max_retries = max_retries
        super().__init__(callback_manager=callback_manager or Settings.callback_manager)

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {"query_engine": self._query_engine, "evaluator": self._evaluator}

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        response = self._query_engine._query(query_bundle)
        assert not isinstance(response, AsyncStreamingResponse)
        if self.max_retries <= 0:
            return response
        typed_response = (
            response if isinstance(response, Response) else response.get_response()
        )
        query_str = query_bundle.query_str
        eval = self._evaluator.evaluate_response(query_str, typed_response)
        if eval.passing:
            logger.debug("Evaluation returned True.")
            return response
        else:
            logger.debug("Evaluation returned False.")
            # Test source nodes
            source_evals = [
                self._evaluator.evaluate(
                    query=query_str,
                    response=typed_response.response,
                    contexts=[source_node.get_content()],
                )
                for source_node in typed_response.source_nodes
            ]
            orig_nodes = typed_response.source_nodes
            assert len(source_evals) == len(orig_nodes)
            new_docs = []
            for node, eval_result in zip(orig_nodes, source_evals):
                if eval_result:
                    new_docs.append(Document(text=node.node.get_content()))
            if len(new_docs) == 0:
                raise ValueError("No source nodes passed evaluation.")
            new_index = SummaryIndex.from_documents(
                new_docs,
            )
            new_retriever_engine = RetrieverQueryEngine(new_index.as_retriever())
            new_query_engine = RetrySourceQueryEngine(
                new_retriever_engine,
                self._evaluator,
                self._llm,
                self.max_retries - 1,
            )
            return new_query_engine.query(query_bundle)

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Not supported."""
        return self._query(query_bundle)

检索器路由查询引擎 #

基类:EventBaseQueryEngine

基于检索器的路由查询引擎。

注意:此功能已弃用,请使用我们新的 ToolRetrieverRouterQueryEngine

使用检索器选择一组节点。每个节点将被转换为ToolMetadata对象,同时用于检索查询引擎,以形成QueryEngineTool。

注意:这是一个测试版功能。我们正在确定检索器与查询引擎之间的正确接口。

参数:

名称 类型 描述 默认
selector BaseSelector

一个根据每个候选者的元数据和查询从多个选项中选择其一的筛选器。

required
query_engine_tools Sequence[QueryEngineTool]

一系列候选查询引擎。它们必须被封装为工具,以便向选择器暴露元数据。

required
callback_manager Optional[CallbackManager]

一个回调管理器。

None
workflows/handler.py 中的源代码llama_index/core/query_engine/router_query_engine.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
class RetrieverRouterQueryEngine(BaseQueryEngine):
    """
    Retriever-based router query engine.

    NOTE: this is deprecated, please use our new ToolRetrieverRouterQueryEngine

    Use a retriever to select a set of Nodes. Each node will be converted
    into a ToolMetadata object, and also used to retrieve a query engine, to form
    a QueryEngineTool.

    NOTE: this is a beta feature. We are figuring out the right interface
    between the retriever and query engine.

    Args:
        selector (BaseSelector): A selector that chooses one out of many options based
            on each candidate's metadata and query.
        query_engine_tools (Sequence[QueryEngineTool]): A sequence of candidate
            query engines. They must be wrapped as tools to expose metadata to
            the selector.
        callback_manager (Optional[CallbackManager]): A callback manager.

    """

    def __init__(
        self,
        retriever: BaseRetriever,
        node_to_query_engine_fn: Callable,
        callback_manager: Optional[CallbackManager] = None,
    ) -> None:
        self._retriever = retriever
        self._node_to_query_engine_fn = node_to_query_engine_fn
        super().__init__(callback_manager)

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        # NOTE: don't include tools for now
        return {"retriever": self._retriever}

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        nodes_with_score = self._retriever.retrieve(query_bundle)
        # TODO: for now we only support retrieving one node
        if len(nodes_with_score) > 1:
            raise ValueError("Retrieved more than one node.")

        node = nodes_with_score[0].node
        query_engine = self._node_to_query_engine_fn(node)
        return query_engine.query(query_bundle)

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        return self._query(query_bundle)

路由查询引擎 #

基类:EventBaseQueryEngine

路由查询引擎。

从多个候选查询引擎中选择一个来执行查询。

参数:

名称 类型 描述 默认
selector BaseSelector

一个根据每个候选者的元数据和查询从多个选项中选择其一的筛选器。

required
query_engine_tools Sequence[QueryEngineTool]

一系列候选查询引擎。它们必须被封装为工具,以便向选择器暴露元数据。

required
summarizer Optional[TreeSummarize]

树状摘要器,用于汇总子结果。

None
workflows/handler.py 中的源代码llama_index/core/query_engine/router_query_engine.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
class RouterQueryEngine(BaseQueryEngine):
    """
    Router query engine.

    Selects one out of several candidate query engines to execute a query.

    Args:
        selector (BaseSelector): A selector that chooses one out of many options based
            on each candidate's metadata and query.
        query_engine_tools (Sequence[QueryEngineTool]): A sequence of candidate
            query engines. They must be wrapped as tools to expose metadata to
            the selector.
        summarizer (Optional[TreeSummarize]): Tree summarizer to summarize sub-results.

    """

    def __init__(
        self,
        selector: BaseSelector,
        query_engine_tools: Sequence[QueryEngineTool],
        llm: Optional[LLM] = None,
        summarizer: Optional[TreeSummarize] = None,
        verbose: bool = False,
    ) -> None:
        self._llm = llm or Settings.llm
        self._selector = selector
        self._query_engines = [x.query_engine for x in query_engine_tools]
        self._metadatas = [x.metadata for x in query_engine_tools]
        self._summarizer = summarizer or TreeSummarize(
            llm=self._llm,
            summary_template=DEFAULT_TREE_SUMMARIZE_PROMPT_SEL,
        )
        self._verbose = verbose

        super().__init__(callback_manager=Settings.callback_manager)

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        # NOTE: don't include tools for now
        return {"summarizer": self._summarizer, "selector": self._selector}

    @classmethod
    def from_defaults(
        cls,
        query_engine_tools: Sequence[QueryEngineTool],
        llm: Optional[LLM] = None,
        selector: Optional[BaseSelector] = None,
        summarizer: Optional[TreeSummarize] = None,
        select_multi: bool = False,
        **kwargs: Any,
    ) -> "RouterQueryEngine":
        llm = llm or Settings.llm

        selector = selector or get_selector_from_llm(llm, is_multi=select_multi)

        assert selector is not None

        return cls(
            selector,
            query_engine_tools,
            llm=llm,
            summarizer=summarizer,
            **kwargs,
        )

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            result = self._selector.select(self._metadatas, query_bundle)

            if len(result.inds) > 1:
                responses = []
                for i, engine_ind in enumerate(result.inds):
                    log_str = (
                        f"Selecting query engine {engine_ind}: {result.reasons[i]}."
                    )
                    logger.info(log_str)
                    if self._verbose:
                        print_text(log_str + "\n", color="pink")

                    selected_query_engine = self._query_engines[engine_ind]
                    responses.append(selected_query_engine.query(query_bundle))

                if len(responses) > 1:
                    final_response = combine_responses(
                        self._summarizer, responses, query_bundle
                    )
                else:
                    final_response = responses[0]
            else:
                try:
                    selected_query_engine = self._query_engines[result.ind]
                    log_str = f"Selecting query engine {result.ind}: {result.reason}."
                    logger.info(log_str)
                    if self._verbose:
                        print_text(log_str + "\n", color="pink")
                except ValueError as e:
                    raise ValueError("Failed to select query engine") from e

                final_response = selected_query_engine.query(query_bundle)

            # add selected result
            final_response.metadata = final_response.metadata or {}
            final_response.metadata["selector_result"] = result

            query_event.on_end(payload={EventPayload.RESPONSE: final_response})

        return final_response

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            result = await self._selector.aselect(self._metadatas, query_bundle)

            if len(result.inds) > 1:
                tasks = []
                for i, engine_ind in enumerate(result.inds):
                    log_str = (
                        f"Selecting query engine {engine_ind}: {result.reasons[i]}."
                    )
                    logger.info(log_str)
                    if self._verbose:
                        print_text(log_str + "\n", color="pink")
                    selected_query_engine = self._query_engines[engine_ind]
                    tasks.append(selected_query_engine.aquery(query_bundle))

                responses = run_async_tasks(tasks)
                if len(responses) > 1:
                    final_response = await acombine_responses(
                        self._summarizer, responses, query_bundle
                    )
                else:
                    final_response = responses[0]
            else:
                try:
                    selected_query_engine = self._query_engines[result.ind]
                    log_str = f"Selecting query engine {result.ind}: {result.reason}."
                    logger.info(log_str)
                    if self._verbose:
                        print_text(log_str + "\n", color="pink")
                except ValueError as e:
                    raise ValueError("Failed to select query engine") from e

                final_response = await selected_query_engine.aquery(query_bundle)

            # add selected result
            final_response.metadata = final_response.metadata or {}
            final_response.metadata["selector_result"] = result

            query_event.on_end(payload={EventPayload.RESPONSE: final_response})

        return final_response

工具检索路由查询引擎 #

基类:EventBaseQueryEngine

工具检索器路由查询引擎。

选择一组候选查询引擎来执行查询。

参数:

名称 类型 描述 默认
retriever ObjectRetriever

一个用于检索一组查询引擎工具的检索器。

required
summarizer Optional[TreeSummarize]

树状摘要器,用于汇总子结果。

None
workflows/handler.py 中的源代码llama_index/core/query_engine/router_query_engine.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
class ToolRetrieverRouterQueryEngine(BaseQueryEngine):
    """
    Tool Retriever router query engine.

    Selects a set of candidate query engines to execute a query.

    Args:
        retriever (ObjectRetriever): A retriever that retrieves a set of
            query engine tools.
        summarizer (Optional[TreeSummarize]): Tree summarizer to summarize sub-results.

    """

    def __init__(
        self,
        retriever: ObjectRetriever[QueryEngineTool],
        llm: Optional[LLM] = None,
        summarizer: Optional[TreeSummarize] = None,
    ) -> None:
        llm = llm or Settings.llm
        self._summarizer = summarizer or TreeSummarize(
            llm=llm,
            summary_template=DEFAULT_TREE_SUMMARIZE_PROMPT_SEL,
        )
        self._retriever = retriever

        super().__init__(Settings.callback_manager)

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        # NOTE: don't include tools for now
        return {"summarizer": self._summarizer}

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            query_engine_tools = self._retriever.retrieve(query_bundle)
            responses = []
            for query_engine_tool in query_engine_tools:
                query_engine = query_engine_tool.query_engine
                responses.append(query_engine.query(query_bundle))

            if len(responses) > 1:
                final_response = combine_responses(
                    self._summarizer, responses, query_bundle
                )
            else:
                final_response = responses[0]

            # add selected result
            final_response.metadata = final_response.metadata or {}
            final_response.metadata["retrieved_tools"] = query_engine_tools

            query_event.on_end(payload={EventPayload.RESPONSE: final_response})

        return final_response

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            query_engine_tools = self._retriever.retrieve(query_bundle)
            tasks = []
            for query_engine_tool in query_engine_tools:
                query_engine = query_engine_tool.query_engine
                tasks.append(query_engine.aquery(query_bundle))
            responses = run_async_tasks(tasks)
            if len(responses) > 1:
                final_response = await acombine_responses(
                    self._summarizer, responses, query_bundle
                )
            else:
                final_response = responses[0]

            # add selected result
            final_response.metadata = final_response.metadata or {}
            final_response.metadata["retrieved_tools"] = query_engine_tools

            query_event.on_end(payload={EventPayload.RESPONSE: final_response})

        return final_response

SQL连接查询引擎 #

基类:EventBaseQueryEngine

SQL 连接查询引擎。

该查询引擎能够将 SQL 数据库结果与另一个查询引擎进行"连接"。 它可以判断需要查询 SQL 数据库还是另一个查询引擎。 如果决定查询 SQL 数据库,它将首先查询 SQL 数据库, 并判断是否要用从另一个查询引擎检索到的结果来增强信息。

参数:

名称 类型 描述 默认
sql_query_tool QueryEngineTool

用于SQL数据库的查询引擎工具。 other_query_tool (QueryEngineTool): 其他查询引擎工具。

required
selector Optional[Union[LLMSingleSelector, PydanticSingleSelector]]

选择器使用。

None
sql_join_synthesis_prompt Optional[BasePromptTemplate]

用于SQL连接合成的提示模板。

None
sql_augment_query_transform Optional[SQLAugmentQueryTransform]

用于SQL增强的查询转换。

None
use_sql_join_synthesis bool

是否使用SQL连接合成。

True
callback_manager Optional[CallbackManager]

要使用的回调管理器。

None
verbose bool

是否打印中间结果。

True
workflows/handler.py 中的源代码llama_index/core/query_engine/sql_join_query_engine.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
class SQLJoinQueryEngine(BaseQueryEngine):
    """
    SQL Join Query Engine.

    This query engine can "Join" a SQL database results
    with another query engine.
    It can decide it needs to query the SQL database or the other query engine.
    If it decides to query the SQL database, it will first query the SQL database,
    whether to augment information with retrieved results from the other query engine.

    Args:
        sql_query_tool (QueryEngineTool): Query engine tool for SQL database.
            other_query_tool (QueryEngineTool): Other query engine tool.
        selector (Optional[Union[LLMSingleSelector, PydanticSingleSelector]]):
            Selector to use.
        sql_join_synthesis_prompt (Optional[BasePromptTemplate]):
            PromptTemplate to use for SQL join synthesis.
        sql_augment_query_transform (Optional[SQLAugmentQueryTransform]): Query
            transform to use for SQL augmentation.
        use_sql_join_synthesis (bool): Whether to use SQL join synthesis.
        callback_manager (Optional[CallbackManager]): Callback manager to use.
        verbose (bool): Whether to print intermediate results.

    """

    def __init__(
        self,
        sql_query_tool: QueryEngineTool,
        other_query_tool: QueryEngineTool,
        selector: Optional[Union[LLMSingleSelector, PydanticSingleSelector]] = None,
        llm: Optional[LLM] = None,
        sql_join_synthesis_prompt: Optional[BasePromptTemplate] = None,
        sql_augment_query_transform: Optional[SQLAugmentQueryTransform] = None,
        use_sql_join_synthesis: bool = True,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = True,
        streaming: bool = False,
    ) -> None:
        """Initialize params."""
        super().__init__(callback_manager=callback_manager)
        # validate that the query engines are of the right type
        if not isinstance(
            sql_query_tool.query_engine,
            (BaseSQLTableQueryEngine, NLSQLTableQueryEngine),
        ):
            raise ValueError(
                "sql_query_tool.query_engine must be an instance of "
                "BaseSQLTableQueryEngine or NLSQLTableQueryEngine"
            )
        self._sql_query_tool = sql_query_tool
        self._other_query_tool = other_query_tool

        self._llm = llm or Settings.llm

        self._selector = selector or get_selector_from_llm(self._llm, is_multi=False)  # type: ignore
        assert isinstance(self._selector, (LLMSingleSelector, PydanticSingleSelector))

        self._sql_join_synthesis_prompt = (
            sql_join_synthesis_prompt or DEFAULT_SQL_JOIN_SYNTHESIS_PROMPT
        )
        self._sql_augment_query_transform = (
            sql_augment_query_transform or SQLAugmentQueryTransform(llm=self._llm)
        )
        self._use_sql_join_synthesis = use_sql_join_synthesis
        self._verbose = verbose
        self._streaming = streaming

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {
            "selector": self._selector,
            "sql_augment_query_transform": self._sql_augment_query_transform,
        }

    def _get_prompts(self) -> PromptDictType:
        """Get prompts."""
        return {"sql_join_synthesis_prompt": self._sql_join_synthesis_prompt}

    def _update_prompts(self, prompts: PromptDictType) -> None:
        """Update prompts."""
        if "sql_join_synthesis_prompt" in prompts:
            self._sql_join_synthesis_prompt = prompts["sql_join_synthesis_prompt"]

    def _query_sql_other(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Query SQL database + other query engine in sequence."""
        # first query SQL database
        sql_response = self._sql_query_tool.query_engine.query(query_bundle)
        if not self._use_sql_join_synthesis:
            return sql_response

        sql_query = (
            sql_response.metadata["sql_query"] if sql_response.metadata else None
        )
        if self._verbose:
            print_text(f"SQL query: {sql_query}\n", color="yellow")
            print_text(f"SQL response: {sql_response}\n", color="yellow")

        # given SQL db, transform query into new query
        new_query = self._sql_augment_query_transform(
            query_bundle.query_str,
            metadata={
                "sql_query": _format_sql_query(sql_query),
                "sql_query_response": str(sql_response),
            },
        )

        if self._verbose:
            print_text(
                f"Transformed query given SQL response: {new_query.query_str}\n",
                color="blue",
            )
        logger.info(f"> Transformed query given SQL response: {new_query.query_str}")
        if self._sql_augment_query_transform.check_stop(new_query):
            return sql_response

        other_response = self._other_query_tool.query_engine.query(new_query)
        if self._verbose:
            print_text(f"query engine response: {other_response}\n", color="pink")
        logger.info(f"> query engine response: {other_response}")

        if self._streaming:
            response_gen = self._llm.stream(
                self._sql_join_synthesis_prompt,
                query_str=query_bundle.query_str,
                sql_query_str=sql_query,
                sql_response_str=str(sql_response),
                query_engine_query_str=new_query.query_str,
                query_engine_response_str=str(other_response),
            )

            response_metadata = {
                **(sql_response.metadata or {}),
                **(other_response.metadata or {}),
            }
            source_nodes = other_response.source_nodes
            return StreamingResponse(
                response_gen,
                metadata=response_metadata,
                source_nodes=source_nodes,
            )
        else:
            response_str = self._llm.predict(
                self._sql_join_synthesis_prompt,
                query_str=query_bundle.query_str,
                sql_query_str=sql_query,
                sql_response_str=str(sql_response),
                query_engine_query_str=new_query.query_str,
                query_engine_response_str=str(other_response),
            )

            response_metadata = {
                **(sql_response.metadata or {}),
                **(other_response.metadata or {}),
            }
            source_nodes = other_response.source_nodes
            return Response(
                response_str,
                metadata=response_metadata,
                source_nodes=source_nodes,
            )

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Query and get response."""
        # TODO: see if this can be consolidated with logic in RouterQueryEngine
        metadatas = [self._sql_query_tool.metadata, self._other_query_tool.metadata]
        result = self._selector.select(metadatas, query_bundle)
        # pick sql query
        if result.ind == 0:
            if self._verbose:
                print_text(f"Querying SQL database: {result.reason}\n", color="blue")
            logger.info(f"> Querying SQL database: {result.reason}")
            return self._query_sql_other(query_bundle)
        elif result.ind == 1:
            if self._verbose:
                print_text(
                    f"Querying other query engine: {result.reason}\n", color="blue"
                )
            logger.info(f"> Querying other query engine: {result.reason}")
            return self._other_query_tool.query_engine.query(query_bundle)
        else:
            raise ValueError(f"Invalid result.ind: {result.ind}")

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        # TODO: make async
        return self._query(query_bundle)

SQL自动向量查询引擎 #

基类:EventSQLJoinQueryEngine

SQL + 向量索引自动检索查询引擎。

该查询引擎可以同时查询SQL数据库和向量数据库。它会首先判断需要查询SQL数据库还是向量存储。如果决定查询SQL数据库,它还会判断是否需要用向量存储的检索结果来增强信息。我们使用VectorIndexAutoRetriever来检索结果。

参数:

名称 类型 描述 默认
sql_query_tool QueryEngineTool

用于SQL数据库的查询引擎工具。

required
vector_query_tool QueryEngineTool

向量数据库的查询引擎工具。

required
selector Optional[Union[LLMSingleSelector, PydanticSingleSelector]]

选择器使用。

None
sql_vector_synthesis_prompt Optional[BasePromptTemplate]

用于SQL向量合成的提示词。

None
sql_augment_query_transform Optional[SQLAugmentQueryTransform]

用于SQL增强的查询转换。

None
use_sql_vector_synthesis bool

是否使用SQL向量合成。

True
callback_manager Optional[CallbackManager]

要使用的回调管理器。

None
verbose bool

是否打印中间结果。

True
workflows/handler.py 中的源代码llama_index/core/query_engine/sql_vector_query_engine.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
class SQLAutoVectorQueryEngine(SQLJoinQueryEngine):
    """
    SQL + Vector Index Auto Retriever Query Engine.

    This query engine can query both a SQL database
    as well as a vector database. It will first decide
    whether it needs to query the SQL database or vector store.
    If it decides to query the SQL database, it will also decide
    whether to augment information with retrieved results from the vector store.
    We use the VectorIndexAutoRetriever to retrieve results.

    Args:
        sql_query_tool (QueryEngineTool): Query engine tool for SQL database.
        vector_query_tool (QueryEngineTool): Query engine tool for vector database.
        selector (Optional[Union[LLMSingleSelector, PydanticSingleSelector]]):
            Selector to use.
        sql_vector_synthesis_prompt (Optional[BasePromptTemplate]):
            Prompt to use for SQL vector synthesis.
        sql_augment_query_transform (Optional[SQLAugmentQueryTransform]): Query
            transform to use for SQL augmentation.
        use_sql_vector_synthesis (bool): Whether to use SQL vector synthesis.
        callback_manager (Optional[CallbackManager]): Callback manager to use.
        verbose (bool): Whether to print intermediate results.

    """

    def __init__(
        self,
        sql_query_tool: QueryEngineTool,
        vector_query_tool: QueryEngineTool,
        selector: Optional[Union[LLMSingleSelector, PydanticSingleSelector]] = None,
        llm: Optional[LLM] = None,
        sql_vector_synthesis_prompt: Optional[BasePromptTemplate] = None,
        sql_augment_query_transform: Optional[SQLAugmentQueryTransform] = None,
        use_sql_vector_synthesis: bool = True,
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = True,
    ) -> None:
        """Initialize params."""
        # validate that the query engines are of the right type
        if not isinstance(
            sql_query_tool.query_engine,
            (BaseSQLTableQueryEngine, NLSQLTableQueryEngine),
        ):
            raise ValueError(
                "sql_query_tool.query_engine must be an instance of "
                "BaseSQLTableQueryEngine or NLSQLTableQueryEngine"
            )
        if not isinstance(vector_query_tool.query_engine, RetrieverQueryEngine):
            raise ValueError(
                "vector_query_tool.query_engine must be an instance of "
                "RetrieverQueryEngine"
            )
        if not isinstance(
            vector_query_tool.query_engine.retriever, VectorIndexAutoRetriever
        ):
            raise ValueError(
                "vector_query_tool.query_engine.retriever must be an instance "
                "of VectorIndexAutoRetriever"
            )

        sql_vector_synthesis_prompt = (
            sql_vector_synthesis_prompt or DEFAULT_SQL_VECTOR_SYNTHESIS_PROMPT
        )
        super().__init__(
            sql_query_tool,
            vector_query_tool,
            selector=selector,
            llm=llm,
            sql_join_synthesis_prompt=sql_vector_synthesis_prompt,
            sql_augment_query_transform=sql_augment_query_transform,
            use_sql_join_synthesis=use_sql_vector_synthesis,
            callback_manager=callback_manager,
            verbose=verbose,
        )

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {
            "selector": self._selector,
            "sql_augment_query_transform": self._sql_augment_query_transform,
        }

    def _get_prompts(self) -> PromptDictType:
        """Get prompts."""
        return {"sql_join_synthesis_prompt": self._sql_join_synthesis_prompt}

    def _update_prompts(self, prompts: PromptDictType) -> None:
        """Update prompts."""
        if "sql_join_synthesis_prompt" in prompts:
            self._sql_join_synthesis_prompt = prompts["sql_join_synthesis_prompt"]

    @classmethod
    def from_sql_and_vector_query_engines(
        cls,
        sql_query_engine: Union[BaseSQLTableQueryEngine, NLSQLTableQueryEngine],
        sql_tool_name: str,
        sql_tool_description: str,
        vector_auto_retriever: RetrieverQueryEngine,
        vector_tool_name: str,
        vector_tool_description: str,
        selector: Optional[Union[LLMSingleSelector, PydanticSingleSelector]] = None,
        **kwargs: Any,
    ) -> "SQLAutoVectorQueryEngine":
        """
        From SQL and vector query engines.

        Args:
            sql_query_engine (BaseSQLTableQueryEngine): SQL query engine.
            vector_query_engine (VectorIndexAutoRetriever): Vector retriever.
            selector (Optional[Union[LLMSingleSelector, PydanticSingleSelector]]):
                Selector to use.

        """
        sql_query_tool = QueryEngineTool.from_defaults(
            sql_query_engine, name=sql_tool_name, description=sql_tool_description
        )
        vector_query_tool = QueryEngineTool.from_defaults(
            vector_auto_retriever,
            name=vector_tool_name,
            description=vector_tool_description,
        )
        return cls(sql_query_tool, vector_query_tool, selector, **kwargs)

from_sql_and_vector_query_engines classmethod #

from_sql_and_vector_query_engines(sql_query_engine: Union[BaseSQLTableQueryEngine, NLSQLTableQueryEngine], sql_tool_name: str, sql_tool_description: str, vector_auto_retriever: RetrieverQueryEngine, vector_tool_name: str, vector_tool_description: str, selector: Optional[Union[LLMSingleSelector, PydanticSingleSelector]] = None, **kwargs: Any) -> SQLAutoVectorQueryEngine

来自SQL和向量查询引擎。

参数:

名称 类型 描述 默认
sql_query_engine BaseSQLTableQueryEngine

SQL查询引擎。

required
vector_query_engine VectorIndexAutoRetriever

向量检索器。

required
selector Optional[Union[LLMSingleSelector, PydanticSingleSelector]]

选择器使用。

None
workflows/handler.py 中的源代码llama_index/core/query_engine/sql_vector_query_engine.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@classmethod
def from_sql_and_vector_query_engines(
    cls,
    sql_query_engine: Union[BaseSQLTableQueryEngine, NLSQLTableQueryEngine],
    sql_tool_name: str,
    sql_tool_description: str,
    vector_auto_retriever: RetrieverQueryEngine,
    vector_tool_name: str,
    vector_tool_description: str,
    selector: Optional[Union[LLMSingleSelector, PydanticSingleSelector]] = None,
    **kwargs: Any,
) -> "SQLAutoVectorQueryEngine":
    """
    From SQL and vector query engines.

    Args:
        sql_query_engine (BaseSQLTableQueryEngine): SQL query engine.
        vector_query_engine (VectorIndexAutoRetriever): Vector retriever.
        selector (Optional[Union[LLMSingleSelector, PydanticSingleSelector]]):
            Selector to use.

    """
    sql_query_tool = QueryEngineTool.from_defaults(
        sql_query_engine, name=sql_tool_name, description=sql_tool_description
    )
    vector_query_tool = QueryEngineTool.from_defaults(
        vector_auto_retriever,
        name=vector_tool_name,
        description=vector_tool_description,
    )
    return cls(sql_query_tool, vector_query_tool, selector, **kwargs)

子问题回答对 #

基类:EventBaseModel

子问题及其可选答案(如果已回答)的配对。

参数:

名称 类型 描述 默认
sub_q SubQuestion
required
answer str | None
None
sources List[NodeWithScore]

内置可变序列。

如果未提供参数,构造函数将创建一个新的空列表。 如果指定了参数,则它必须是一个可迭代对象。

<dynamic>
workflows/handler.py 中的源代码llama_index/core/query_engine/sub_question_query_engine.py
27
28
29
30
31
32
33
34
class SubQuestionAnswerPair(BaseModel):
    """
    Pair of the sub question and optionally its answer (if its been answered yet).
    """

    sub_q: SubQuestion
    answer: Optional[str] = None
    sources: List[NodeWithScore] = Field(default_factory=list)

子问题查询引擎 #

基类:EventBaseQueryEngine

子问题查询引擎。

一种查询引擎,能够将复杂查询(例如比较与对比)分解为多个子问题及其对应的目标查询引擎以供执行。 在执行完所有子问题后,所有响应将被收集并发送至响应合成器以生成最终响应。

参数:

名称 类型 描述 默认
question_gen BaseQuestionGenerator

一个用于生成子问题的模块,给定一个复杂问题及工具。

required
response_synthesizer BaseSynthesizer

用于生成最终响应的响应合成器

required
query_engine_tools Sequence[QueryEngineTool]

用于回答子问题的工具。

required
verbose bool

是否打印中间的问题和答案。 默认为 True

True
use_async bool

是否使用异步方式执行子问题。 默认为 True

False
workflows/handler.py 中的源代码llama_index/core/query_engine/sub_question_query_engine.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
class SubQuestionQueryEngine(BaseQueryEngine):
    """
    Sub question query engine.

    A query engine that breaks down a complex query (e.g. compare and contrast) into
        many sub questions and their target query engine for execution.
        After executing all sub questions, all responses are gathered and sent to
        response synthesizer to produce the final response.

    Args:
        question_gen (BaseQuestionGenerator): A module for generating sub questions
            given a complex question and tools.
        response_synthesizer (BaseSynthesizer): A response synthesizer for
            generating the final response
        query_engine_tools (Sequence[QueryEngineTool]): Tools to answer the
            sub questions.
        verbose (bool): whether to print intermediate questions and answers.
            Defaults to True
        use_async (bool): whether to execute the sub questions with asyncio.
            Defaults to True

    """

    def __init__(
        self,
        question_gen: BaseQuestionGenerator,
        response_synthesizer: BaseSynthesizer,
        query_engine_tools: Sequence[QueryEngineTool],
        callback_manager: Optional[CallbackManager] = None,
        verbose: bool = True,
        use_async: bool = False,
    ) -> None:
        self._question_gen = question_gen
        self._response_synthesizer = response_synthesizer
        self._metadatas = [x.metadata for x in query_engine_tools]
        self._query_engines = {
            tool.metadata.name: tool.query_engine for tool in query_engine_tools
        }
        self._verbose = verbose
        self._use_async = use_async
        super().__init__(callback_manager)

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {
            "question_gen": self._question_gen,
            "response_synthesizer": self._response_synthesizer,
        }

    @classmethod
    def from_defaults(
        cls,
        query_engine_tools: Sequence[QueryEngineTool],
        llm: Optional[LLM] = None,
        question_gen: Optional[BaseQuestionGenerator] = None,
        response_synthesizer: Optional[BaseSynthesizer] = None,
        verbose: bool = True,
        use_async: bool = True,
    ) -> "SubQuestionQueryEngine":
        callback_manager = Settings.callback_manager
        if len(query_engine_tools) > 0:
            callback_manager = query_engine_tools[0].query_engine.callback_manager

        llm = llm or Settings.llm
        if question_gen is None:
            try:
                from llama_index.question_gen.openai import (
                    OpenAIQuestionGenerator,
                )  # pants: no-infer-dep

                # try to use OpenAI function calling based question generator.
                # if incompatible, use general LLM question generator
                question_gen = OpenAIQuestionGenerator.from_defaults(llm=llm)

            except ImportError as e:
                raise ImportError(
                    "`llama-index-question-gen-openai` package cannot be found. "
                    "Please install it by using `pip install `llama-index-question-gen-openai`"
                )
            except ValueError:
                question_gen = LLMQuestionGenerator.from_defaults(llm=llm)

        synth = response_synthesizer or get_response_synthesizer(
            llm=llm,
            callback_manager=callback_manager,
            use_async=use_async,
        )

        return cls(
            question_gen,
            synth,
            query_engine_tools,
            callback_manager=callback_manager,
            verbose=verbose,
            use_async=use_async,
        )

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            sub_questions = self._question_gen.generate(self._metadatas, query_bundle)

            colors = get_color_mapping([str(i) for i in range(len(sub_questions))])

            if self._verbose:
                print_text(f"Generated {len(sub_questions)} sub questions.\n")

            if self._use_async:
                tasks = [
                    self._aquery_subq(sub_q, color=colors[str(ind)])
                    for ind, sub_q in enumerate(sub_questions)
                ]

                qa_pairs_all = run_async_tasks(tasks)
                qa_pairs_all = cast(List[Optional[SubQuestionAnswerPair]], qa_pairs_all)
            else:
                qa_pairs_all = [
                    self._query_subq(sub_q, color=colors[str(ind)])
                    for ind, sub_q in enumerate(sub_questions)
                ]

            # filter out sub questions that failed
            qa_pairs: List[SubQuestionAnswerPair] = list(filter(None, qa_pairs_all))

            nodes = [self._construct_node(pair) for pair in qa_pairs]

            source_nodes = [node for qa_pair in qa_pairs for node in qa_pair.sources]
            response = self._response_synthesizer.synthesize(
                query=query_bundle,
                nodes=nodes,
                additional_source_nodes=source_nodes,
            )

            query_event.on_end(payload={EventPayload.RESPONSE: response})

        return response

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            sub_questions = await self._question_gen.agenerate(
                self._metadatas, query_bundle
            )

            colors = get_color_mapping([str(i) for i in range(len(sub_questions))])

            if self._verbose:
                print_text(f"Generated {len(sub_questions)} sub questions.\n")

            tasks = [
                self._aquery_subq(sub_q, color=colors[str(ind)])
                for ind, sub_q in enumerate(sub_questions)
            ]

            qa_pairs_all = await asyncio.gather(*tasks)
            qa_pairs_all = cast(List[Optional[SubQuestionAnswerPair]], qa_pairs_all)

            # filter out sub questions that failed
            qa_pairs: List[SubQuestionAnswerPair] = list(filter(None, qa_pairs_all))

            nodes = [self._construct_node(pair) for pair in qa_pairs]

            source_nodes = [node for qa_pair in qa_pairs for node in qa_pair.sources]
            response = await self._response_synthesizer.asynthesize(
                query=query_bundle,
                nodes=nodes,
                additional_source_nodes=source_nodes,
            )

            query_event.on_end(payload={EventPayload.RESPONSE: response})

        return response

    def _construct_node(self, qa_pair: SubQuestionAnswerPair) -> NodeWithScore:
        node_text = (
            f"Sub question: {qa_pair.sub_q.sub_question}\nResponse: {qa_pair.answer}"
        )
        return NodeWithScore(node=TextNode(text=node_text))

    async def _aquery_subq(
        self, sub_q: SubQuestion, color: Optional[str] = None
    ) -> Optional[SubQuestionAnswerPair]:
        try:
            with self.callback_manager.event(
                CBEventType.SUB_QUESTION,
                payload={EventPayload.SUB_QUESTION: SubQuestionAnswerPair(sub_q=sub_q)},
            ) as event:
                question = sub_q.sub_question
                query_engine = self._query_engines[sub_q.tool_name]

                if self._verbose:
                    print_text(f"[{sub_q.tool_name}] Q: {question}\n", color=color)

                response = await query_engine.aquery(question)
                response_text = str(response)

                if self._verbose:
                    print_text(f"[{sub_q.tool_name}] A: {response_text}\n", color=color)

                qa_pair = SubQuestionAnswerPair(
                    sub_q=sub_q, answer=response_text, sources=response.source_nodes
                )

                event.on_end(payload={EventPayload.SUB_QUESTION: qa_pair})

            return qa_pair
        except ValueError:
            logger.warning(f"[{sub_q.tool_name}] Failed to run {question}")
            return None

    def _query_subq(
        self, sub_q: SubQuestion, color: Optional[str] = None
    ) -> Optional[SubQuestionAnswerPair]:
        try:
            with self.callback_manager.event(
                CBEventType.SUB_QUESTION,
                payload={EventPayload.SUB_QUESTION: SubQuestionAnswerPair(sub_q=sub_q)},
            ) as event:
                question = sub_q.sub_question
                query_engine = self._query_engines[sub_q.tool_name]

                if self._verbose:
                    print_text(f"[{sub_q.tool_name}] Q: {question}\n", color=color)

                response = query_engine.query(question)
                response_text = str(response)

                if self._verbose:
                    print_text(f"[{sub_q.tool_name}] A: {response_text}\n", color=color)

                qa_pair = SubQuestionAnswerPair(
                    sub_q=sub_q, answer=response_text, sources=response.source_nodes
                )

                event.on_end(payload={EventPayload.SUB_QUESTION: qa_pair})

            return qa_pair
        except ValueError:
            logger.warning(f"[{sub_q.tool_name}] Failed to run {question}")
            return None

转换查询引擎 #

基类:EventBaseQueryEngine

转换查询引擎。

在将查询包传递给查询引擎之前,对其应用查询转换。

参数:

名称 类型 描述 默认
query_engine BaseQueryEngine

一个查询引擎对象。

required
query_transform BaseQueryTransform

一个查询转换对象。

required
transform_metadata Optional[dict]

传递给查询转换的元数据。

None
callback_manager Optional[CallbackManager]

一个回调管理器。

None
workflows/handler.py 中的源代码llama_index/core/query_engine/transform_query_engine.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class TransformQueryEngine(BaseQueryEngine):
    """
    Transform query engine.

    Applies a query transform to a query bundle before passing
        it to a query engine.

    Args:
        query_engine (BaseQueryEngine): A query engine object.
        query_transform (BaseQueryTransform): A query transform object.
        transform_metadata (Optional[dict]): metadata to pass to the
            query transform.
        callback_manager (Optional[CallbackManager]): A callback manager.

    """

    def __init__(
        self,
        query_engine: BaseQueryEngine,
        query_transform: BaseQueryTransform,
        transform_metadata: Optional[dict] = None,
        callback_manager: Optional[CallbackManager] = None,
    ) -> None:
        self._query_engine = query_engine
        self._query_transform = query_transform
        self._transform_metadata = transform_metadata
        super().__init__(callback_manager)

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {
            "query_transform": self._query_transform,
            "query_engine": self._query_engine,
        }

    def retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        query_bundle = self._query_transform.run(
            query_bundle, metadata=self._transform_metadata
        )
        return self._query_engine.retrieve(query_bundle)

    def synthesize(
        self,
        query_bundle: QueryBundle,
        nodes: List[NodeWithScore],
        additional_source_nodes: Optional[Sequence[NodeWithScore]] = None,
    ) -> RESPONSE_TYPE:
        query_bundle = self._query_transform.run(
            query_bundle, metadata=self._transform_metadata
        )
        return self._query_engine.synthesize(
            query_bundle=query_bundle,
            nodes=nodes,
            additional_source_nodes=additional_source_nodes,
        )

    async def asynthesize(
        self,
        query_bundle: QueryBundle,
        nodes: List[NodeWithScore],
        additional_source_nodes: Optional[Sequence[NodeWithScore]] = None,
    ) -> RESPONSE_TYPE:
        query_bundle = self._query_transform.run(
            query_bundle, metadata=self._transform_metadata
        )
        return await self._query_engine.asynthesize(
            query_bundle=query_bundle,
            nodes=nodes,
            additional_source_nodes=additional_source_nodes,
        )

    def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Answer a query."""
        query_bundle = self._query_transform.run(
            query_bundle, metadata=self._transform_metadata
        )
        return self._query_engine.query(query_bundle)

    async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
        """Answer a query."""
        query_bundle = self._query_transform.run(
            query_bundle, metadata=self._transform_metadata
        )
        return await self._query_engine.aquery(query_bundle)

选项: 成员:- NLSQLTableQueryEngine