step(*args: Any, workflow: Optional[Type[Workflow]] = None, pass_context: bool = False, num_workers: int = 4, retry_policy: Optional[RetryPolicy] = None) -> Callable
用于将方法和函数标记为工作流步骤的装饰器。
装饰器在导入时被评估,但我们需要等到运行时才能启动通信通道。因此,我们暂时将本步骤将要消费的事件列表存储在函数对象本身中。
参数:
名称 |
类型 |
描述 |
默认值 |
workflow
|
Optional[Type[Workflow]]
|
工作流类,装饰步骤将被添加到其中。仅在自由函数上使用装饰器而非类方法时才需要。
|
None
|
num_workers
|
int
|
处理装饰步骤事件的工人数量。默认值在大多数情况下都适用。
|
4
|
retry_policy
|
Optional[RetryPolicy]
|
|
None
|
Source code in llama-index-core/llama_index/core/workflow/decorators.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 | def step(
*args: Any,
workflow: Optional[Type["Workflow"]] = None,
pass_context: bool = False,
num_workers: int = 4,
retry_policy: Optional[RetryPolicy] = None,
) -> Callable:
"""
Decorator used to mark methods and functions as workflow steps.
Decorators are evaluated at import time, but we need to wait for
starting the communication channels until runtime. For this reason,
we temporarily store the list of events that will be consumed by this
step in the function object itself.
Args:
workflow: Workflow class to which the decorated step will be added. Only needed when using the
decorator on free functions instead of class methods.
num_workers: The number of workers that will process events for the decorated step. The default
value works most of the times.
retry_policy: The policy used to retry a step that encountered an error while running.
"""
def decorator(func: Callable) -> Callable:
if not isinstance(num_workers, int) or num_workers <= 0:
raise WorkflowValidationError(
"num_workers must be an integer greater than 0"
)
# This will raise providing a message with the specific validation failure
spec = inspect_signature(func)
validate_step_signature(spec)
event_name, accepted_events = next(iter(spec.accepted_events.items()))
# store the configuration in the function object
func.__step_config = StepConfig( # type: ignore[attr-defined]
accepted_events=accepted_events,
event_name=event_name,
return_types=spec.return_types,
context_parameter=spec.context_parameter,
num_workers=num_workers,
requested_services=spec.requested_services or [],
retry_policy=retry_policy,
resources=spec.resources,
)
# If this is a free function, call add_step() explicitly.
if is_free_function(func.__qualname__):
if workflow is None:
msg = f"To decorate {func.__name__} please pass a workflow class to the @step decorator."
raise WorkflowValidationError(msg)
workflow.add_step(func)
return func
if len(args):
# The decorator was used without parentheses, like `@step`
func = args[0]
decorator(func)
return func
return decorator
|