资源
声明要注入到步骤函数中的资源。
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
factory
|
Callable[..., T]
|
返回资源实例的函数。可能是异步的。 |
必填 |
cache
|
bool
|
如果为 True,则在各步骤间复用已生成的资源。默认为 True。 |
True
|
返回:
| 类型 | 描述 |
|---|---|
_Resource[T]
|
_Resource[T]: 一个资源描述符,将在 |
示例:
from typing import Annotated
from workflows.resource import Resource
def get_memory(**kwargs) -> Memory:
return Memory.from_defaults("user123", token_limit=60000)
class MyWorkflow(Workflow):
@step
async def first(
self,
ev: StartEvent,
memory: Annotated[Memory, Resource(get_memory)],
) -> StopEvent:
await memory.aput(...)
return StopEvent(result="ok")
workflows/handler.py 中的源代码workflows/resource.py
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | |