基类:BaseComponent, DispatcherSpanMixin, ABC
参数:
| 名称 |
类型 |
描述 |
默认 |
callback_manager
|
CallbackManager
|
处理 LlamaIndex 内事件回调的回调管理器。
回调管理器提供了一种在事件开始/结束时调用处理程序的方法。
此外,回调管理器会追踪当前的事件堆栈。
它通过使用几个关键属性来实现这一点。
- trace_stack - 当前尚未结束的事件堆栈。
当事件结束时,它会被从堆栈中移除。
由于这是一个上下文变量,它对每个线程/任务都是唯一的。
- trace_map - 事件ID到其子事件的映射。
在事件开始时,追踪堆栈的底部被用作追踪映射中的当前父事件。
- trace_id - 当前追踪的简单名称,通常表示入口点(查询、索引构建、插入等)。
参数:
handlers (List[BaseCallbackHandler]): 要使用的处理器列表。
用法:
使用回调管理器事件(CBEventType.QUERY)作为事件:
事件开始(载荷={键, 值})
...
事件结束(载荷={键, 值})
|
<dynamic>
|
workflows/handler.py 中的源代码llama_index/core/postprocessor/types.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 | class BaseNodePostprocessor(BaseComponent, DispatcherSpanMixin, ABC):
model_config = ConfigDict(arbitrary_types_allowed=True)
callback_manager: CallbackManager = Field(
default_factory=CallbackManager, exclude=True
)
def _get_prompts(self) -> PromptDictType:
"""Get prompts."""
# set by default since most postprocessors don't require prompts
return {}
def _update_prompts(self, prompts: PromptDictType) -> None:
"""Update prompts."""
def _get_prompt_modules(self) -> PromptMixinType:
"""Get prompt modules."""
return {}
# implement class_name so users don't have to worry about it when extending
@classmethod
def class_name(cls) -> str:
return "BaseNodePostprocessor"
def postprocess_nodes(
self,
nodes: List[NodeWithScore],
query_bundle: Optional[QueryBundle] = None,
query_str: Optional[str] = None,
) -> List[NodeWithScore]:
"""Postprocess nodes."""
if query_str is not None and query_bundle is not None:
raise ValueError("Cannot specify both query_str and query_bundle")
elif query_str is not None:
query_bundle = QueryBundle(query_str)
else:
pass
return self._postprocess_nodes(nodes, query_bundle)
@abstractmethod
def _postprocess_nodes(
self,
nodes: List[NodeWithScore],
query_bundle: Optional[QueryBundle] = None,
) -> List[NodeWithScore]:
"""Postprocess nodes."""
async def apostprocess_nodes(
self,
nodes: List[NodeWithScore],
query_bundle: Optional[QueryBundle] = None,
query_str: Optional[str] = None,
) -> List[NodeWithScore]:
"""Postprocess nodes (async)."""
if query_str is not None and query_bundle is not None:
raise ValueError("Cannot specify both query_str and query_bundle")
elif query_str is not None:
query_bundle = QueryBundle(query_str)
else:
pass
return await self._apostprocess_nodes(nodes, query_bundle)
async def _apostprocess_nodes(
self,
nodes: List[NodeWithScore],
query_bundle: Optional[QueryBundle] = None,
) -> List[NodeWithScore]:
"""Postprocess nodes (async)."""
return await asyncio.to_thread(self._postprocess_nodes, nodes, query_bundle)
|
postprocess_nodes
后处理节点。
workflows/handler.py 中的源代码llama_index/core/postprocessor/types.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48 | def postprocess_nodes(
self,
nodes: List[NodeWithScore],
query_bundle: Optional[QueryBundle] = None,
query_str: Optional[str] = None,
) -> List[NodeWithScore]:
"""Postprocess nodes."""
if query_str is not None and query_bundle is not None:
raise ValueError("Cannot specify both query_str and query_bundle")
elif query_str is not None:
query_bundle = QueryBundle(query_str)
else:
pass
return self._postprocess_nodes(nodes, query_bundle)
|
apostprocess_nodes
async
后处理节点(异步)。
workflows/handler.py 中的源代码llama_index/core/postprocessor/types.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71 | async def apostprocess_nodes(
self,
nodes: List[NodeWithScore],
query_bundle: Optional[QueryBundle] = None,
query_str: Optional[str] = None,
) -> List[NodeWithScore]:
"""Postprocess nodes (async)."""
if query_str is not None and query_bundle is not None:
raise ValueError("Cannot specify both query_str and query_bundle")
elif query_str is not None:
query_bundle = QueryBundle(query_str)
else:
pass
return await self._apostprocess_nodes(nodes, query_bundle)
|