跳至内容

Python SDK#

客户端#

LlamaDeploy Python客户端。

客户端提供了对异步和非异步API的访问。要使用同步API,只需调用client.sync的方法。

使用示例:

from llama_deploy.client import Client

# 使用相同的客户端实例
c = Client()

async def an_async_function():
    status = await client.apiserver.status()

def normal_function():
    status = client.sync.apiserver.status()

Source code in llama_deploy/client/client.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class Client(_BaseClient):
    """The LlamaDeploy Python client.

    The client is gives access to both the asyncio and non-asyncio APIs. To access the sync
    API just use methods of `client.sync`.

    Example usage:
    ```py
    from llama_deploy.client import Client

    # Use the same client instance
    c = Client()

    async def an_async_function():
        status = await client.apiserver.status()

    def normal_function():
        status = client.sync.apiserver.status()
    ```
    """

    @property
    def sync(self) -> "_SyncClient":
        """Returns the sync version of the client API."""
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            return _SyncClient(**self.model_dump())

        msg = "You cannot use the sync client within an async event loop - just await the async methods directly."
        raise RuntimeError(msg)

    @property
    def apiserver(self) -> ApiServer:
        """Returns the ApiServer model."""
        return ApiServer(client=self, id="apiserver")

    @property
    def core(self) -> Core:
        """Returns the Core model."""
        return Core(client=self, id="core")

同步 property #

sync: _SyncClient

返回客户端API的同步版本。

apiserver property #

apiserver: ApiServer

返回ApiServer模型。

核心 property #

core: Core

返回核心模型。

API服务器功能#

会话集合 #

基类: Collection

表示给定部署的会话集合的模型。

参数:

名称 类型 描述 默认值
deployment_id str
required
Source code in llama_deploy/client/models/apiserver.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class SessionCollection(Collection):
    """A model representing a collection of session for a given deployment."""

    deployment_id: str

    async def delete(self, session_id: str) -> None:
        """Deletes the session with the provided `session_id`.

        Args:
            session_id: The id of the session that will be removed

        Raises:
            HTTPException: If the session couldn't be found with the id provided.
        """
        delete_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/sessions/delete"

        await self.client.request(
            "POST",
            delete_url,
            params={"session_id": session_id},
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )

    async def create(self) -> SessionDefinition:
        """"""
        create_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/sessions/create"

        r = await self.client.request(
            "POST",
            create_url,
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )

        return SessionDefinition(**r.json())

    async def list(self) -> list[SessionDefinition]:
        """Returns a collection of all the sessions in the given deployment."""
        sessions_url = (
            f"{self.client.api_server_url}/deployments/{self.deployment_id}/sessions"
        )
        r = await self.client.request(
            "GET",
            sessions_url,
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )

        return r.json()

删除 async #

delete(session_id: str) -> None

删除具有提供的session_id的会话。

参数:

名称 类型 描述 默认值
session_id str

将被移除的会话ID

required

抛出异常:

类型 描述
HTTPException

如果无法通过提供的ID找到会话。

Source code in llama_deploy/client/models/apiserver.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
async def delete(self, session_id: str) -> None:
    """Deletes the session with the provided `session_id`.

    Args:
        session_id: The id of the session that will be removed

    Raises:
        HTTPException: If the session couldn't be found with the id provided.
    """
    delete_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/sessions/delete"

    await self.client.request(
        "POST",
        delete_url,
        params={"session_id": session_id},
        verify=not self.client.disable_ssl,
        timeout=self.client.timeout,
    )

创建 async #

create() -> SessionDefinition
Source code in llama_deploy/client/models/apiserver.py
44
45
46
47
48
49
50
51
52
53
54
55
async def create(self) -> SessionDefinition:
    """"""
    create_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/sessions/create"

    r = await self.client.request(
        "POST",
        create_url,
        verify=not self.client.disable_ssl,
        timeout=self.client.timeout,
    )

    return SessionDefinition(**r.json())

列表 async #

返回给定部署中的所有会话集合。

Source code in llama_deploy/client/models/apiserver.py
57
58
59
60
61
62
63
64
65
66
67
68
69
async def list(self) -> list[SessionDefinition]:
    """Returns a collection of all the sessions in the given deployment."""
    sessions_url = (
        f"{self.client.api_server_url}/deployments/{self.deployment_id}/sessions"
    )
    r = await self.client.request(
        "GET",
        sessions_url,
        verify=not self.client.disable_ssl,
        timeout=self.client.timeout,
    )

    return r.json()

任务 #

基类: Model

表示属于给定部署中特定会话的任务的模型。

参数:

名称 类型 描述 默认值
deployment_id str
required
session_id str
required
Source code in llama_deploy/client/models/apiserver.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class Task(Model):
    """A model representing a task belonging to a given session in the given deployment."""

    deployment_id: str
    session_id: str

    async def results(self) -> TaskResult:
        """Returns the result of a given task."""
        results_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/results"

        r = await self.client.request(
            "GET",
            results_url,
            verify=not self.client.disable_ssl,
            params={"session_id": self.session_id},
            timeout=self.client.timeout,
        )
        return TaskResult.model_validate(r.json())

    async def send_event(self, ev: Event, service_name: str) -> EventDefinition:
        """Sends a human response event."""
        url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/events"

        serializer = JsonSerializer()
        event_def = EventDefinition(
            event_obj_str=serializer.serialize(ev), agent_id=service_name
        )

        r = await self.client.request(
            "POST",
            url,
            verify=not self.client.disable_ssl,
            params={"session_id": self.session_id},
            json=event_def.model_dump(),
            timeout=self.client.timeout,
        )
        return EventDefinition.model_validate(r.json())

    async def events(self) -> AsyncGenerator[dict[str, Any], None]:  # pragma: no cover
        """Returns a generator object to consume the events streamed from a service."""
        events_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/events"

        while True:
            try:
                async with httpx.AsyncClient(
                    verify=not self.client.disable_ssl
                ) as client:
                    async with client.stream(
                        "GET", events_url, params={"session_id": self.session_id}
                    ) as response:
                        response.raise_for_status()
                        async for line in response.aiter_lines():
                            json_line = json.loads(line)
                            yield json_line
                        break  # Exit the function if successful
            except httpx.HTTPStatusError as e:
                if e.response.status_code != 404:
                    raise  # Re-raise if it's not a 404 error
                await asyncio.sleep(self.client.poll_interval)

结果 async #

results() -> TaskResult

返回给定任务的结果。

Source code in llama_deploy/client/models/apiserver.py
78
79
80
81
82
83
84
85
86
87
88
89
async def results(self) -> TaskResult:
    """Returns the result of a given task."""
    results_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/results"

    r = await self.client.request(
        "GET",
        results_url,
        verify=not self.client.disable_ssl,
        params={"session_id": self.session_id},
        timeout=self.client.timeout,
    )
    return TaskResult.model_validate(r.json())

send_event async #

send_event(ev: Event, service_name: str) -> EventDefinition

发送一个人工响应事件。

Source code in llama_deploy/client/models/apiserver.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
async def send_event(self, ev: Event, service_name: str) -> EventDefinition:
    """Sends a human response event."""
    url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/events"

    serializer = JsonSerializer()
    event_def = EventDefinition(
        event_obj_str=serializer.serialize(ev), agent_id=service_name
    )

    r = await self.client.request(
        "POST",
        url,
        verify=not self.client.disable_ssl,
        params={"session_id": self.session_id},
        json=event_def.model_dump(),
        timeout=self.client.timeout,
    )
    return EventDefinition.model_validate(r.json())

事件 async #

events() -> AsyncGenerator[dict[str, Any], None]

返回一个生成器对象,用于消费从服务流式传输的事件。

Source code in llama_deploy/client/models/apiserver.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
async def events(self) -> AsyncGenerator[dict[str, Any], None]:  # pragma: no cover
    """Returns a generator object to consume the events streamed from a service."""
    events_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/events"

    while True:
        try:
            async with httpx.AsyncClient(
                verify=not self.client.disable_ssl
            ) as client:
                async with client.stream(
                    "GET", events_url, params={"session_id": self.session_id}
                ) as response:
                    response.raise_for_status()
                    async for line in response.aiter_lines():
                        json_line = json.loads(line)
                        yield json_line
                    break  # Exit the function if successful
        except httpx.HTTPStatusError as e:
            if e.response.status_code != 404:
                raise  # Re-raise if it's not a 404 error
            await asyncio.sleep(self.client.poll_interval)

任务集合 #

基类: Collection

表示给定部署中任务集合的模型。

参数:

名称 类型 描述 默认值
deployment_id str
required
Source code in llama_deploy/client/models/apiserver.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class TaskCollection(Collection):
    """A model representing a collection of tasks for a given deployment."""

    deployment_id: str

    async def run(self, task: TaskDefinition) -> Any:
        """Runs a task and returns the results once it's done.

        Args:
            task: The definition of the task we want to run.
        """
        run_url = (
            f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/run"
        )
        if task.session_id:
            run_url += f"?session_id={task.session_id}"

        r = await self.client.request(
            "POST",
            run_url,
            verify=not self.client.disable_ssl,
            json=task.model_dump(),
            timeout=self.client.timeout,
        )

        return r.json()

    async def create(self, task: TaskDefinition) -> Task:
        """Runs a task returns it immediately, without waiting for the results."""
        create_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/create"

        r = await self.client.request(
            "POST",
            create_url,
            verify=not self.client.disable_ssl,
            json=task.model_dump(),
            timeout=self.client.timeout,
        )
        response_fields = r.json()

        model_class = self._prepare(Task)
        return model_class(
            client=self.client,
            deployment_id=self.deployment_id,
            id=response_fields["task_id"],
            session_id=response_fields["session_id"],
        )

    async def list(self) -> list[Task]:
        """Returns the list of tasks from this collection."""
        tasks_url = (
            f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks"
        )
        r = await self.client.request(
            "GET",
            tasks_url,
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )
        task_model_class = self._prepare(Task)
        items = {
            "id": task_model_class(
                client=self.client,
                id=task_def.task_id,
                session_id=task_def.session_id,
                deployment_id=self.deployment_id,
            )
            for task_def in r.json()
        }
        model_class = self._prepare(TaskCollection)
        return model_class(
            client=self.client, deployment_id=self.deployment_id, items=items
        )

运行 async #

run(task: TaskDefinition) -> Any

运行任务并在完成后返回结果。

参数:

名称 类型 描述 默认值
task TaskDefinition

我们想要运行的任务的定义。

required
Source code in llama_deploy/client/models/apiserver.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
async def run(self, task: TaskDefinition) -> Any:
    """Runs a task and returns the results once it's done.

    Args:
        task: The definition of the task we want to run.
    """
    run_url = (
        f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/run"
    )
    if task.session_id:
        run_url += f"?session_id={task.session_id}"

    r = await self.client.request(
        "POST",
        run_url,
        verify=not self.client.disable_ssl,
        json=task.model_dump(),
        timeout=self.client.timeout,
    )

    return r.json()

创建 async #

create(task: TaskDefinition) -> Task

运行任务后立即返回,无需等待结果。

Source code in llama_deploy/client/models/apiserver.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
async def create(self, task: TaskDefinition) -> Task:
    """Runs a task returns it immediately, without waiting for the results."""
    create_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/create"

    r = await self.client.request(
        "POST",
        create_url,
        verify=not self.client.disable_ssl,
        json=task.model_dump(),
        timeout=self.client.timeout,
    )
    response_fields = r.json()

    model_class = self._prepare(Task)
    return model_class(
        client=self.client,
        deployment_id=self.deployment_id,
        id=response_fields["task_id"],
        session_id=response_fields["session_id"],
    )

列表 async #

list() -> list[Task]

返回此集合中的任务列表。

Source code in llama_deploy/client/models/apiserver.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
async def list(self) -> list[Task]:
    """Returns the list of tasks from this collection."""
    tasks_url = (
        f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks"
    )
    r = await self.client.request(
        "GET",
        tasks_url,
        verify=not self.client.disable_ssl,
        timeout=self.client.timeout,
    )
    task_model_class = self._prepare(Task)
    items = {
        "id": task_model_class(
            client=self.client,
            id=task_def.task_id,
            session_id=task_def.session_id,
            deployment_id=self.deployment_id,
        )
        for task_def in r.json()
    }
    model_class = self._prepare(TaskCollection)
    return model_class(
        client=self.client, deployment_id=self.deployment_id, items=items
    )

部署 #

基类: Model

表示部署的模型。

Source code in llama_deploy/client/models/apiserver.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
class Deployment(Model):
    """A model representing a deployment."""

    @property
    def tasks(self) -> TaskCollection:
        """Returns a collection of tasks from all the sessions in the given deployment."""

        model_class = self._prepare(TaskCollection)
        return model_class(client=self.client, deployment_id=self.id, items={})

    @property
    def sessions(self) -> SessionCollection:
        """Returns a collection of all the sessions in the given deployment."""

        coll_model_class = self._prepare(SessionCollection)
        return coll_model_class(client=self.client, deployment_id=self.id, items={})

任务 property #

返回给定部署中所有会话的任务集合。

会话 property #

返回给定部署中的所有会话集合。

部署集合 #

基类: Collection

表示当前活跃部署集合的模型。

Source code in llama_deploy/client/models/apiserver.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
class DeploymentCollection(Collection):
    """A model representing a collection of deployments currently active."""

    async def create(self, config: TextIO, reload: bool = False) -> Deployment:
        """Creates a new deployment from a deployment file.

        If `reload` is true, an existing deployment will be reloaded, otherwise
        an error will be raised.

        Example:
            ```
            with open("deployment.yml") as f:
                await client.apiserver.deployments.create(f)
            ```
        """
        create_url = f"{self.client.api_server_url}/deployments/create"

        files = {"config_file": config.read()}
        r = await self.client.request(
            "POST",
            create_url,
            files=files,
            params={"reload": reload},
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )

        model_class = self._prepare(Deployment)
        return model_class(client=self.client, id=r.json().get("name"))

    async def get(self, id: str) -> Deployment:
        """Gets a deployment by id."""
        get_url = f"{self.client.api_server_url}/deployments/{id}"
        # Current version of apiserver doesn't returns anything useful in this endpoint, let's just ignore it
        await self.client.request(
            "GET",
            get_url,
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )
        model_class = self._prepare(Deployment)
        return model_class(client=self.client, id=id)

    async def list(self) -> list[Deployment]:
        deployments_url = f"{self.client.api_server_url}/deployments/"
        r = await self.client.request("GET", deployments_url)
        model_class = self._prepare(Deployment)
        deployments = [model_class(client=self.client, id=name) for name in r.json()]
        return deployments

创建 async #

create(config: TextIO, reload: bool = False) -> Deployment

从部署文件创建一个新的部署。

如果 reload 为 true,将重新加载现有的部署,否则会引发错误。

Example
with open("deployment.yml") as f:
    await client.apiserver.deployments.create(f)
Source code in llama_deploy/client/models/apiserver.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
async def create(self, config: TextIO, reload: bool = False) -> Deployment:
    """Creates a new deployment from a deployment file.

    If `reload` is true, an existing deployment will be reloaded, otherwise
    an error will be raised.

    Example:
        ```
        with open("deployment.yml") as f:
            await client.apiserver.deployments.create(f)
        ```
    """
    create_url = f"{self.client.api_server_url}/deployments/create"

    files = {"config_file": config.read()}
    r = await self.client.request(
        "POST",
        create_url,
        files=files,
        params={"reload": reload},
        verify=not self.client.disable_ssl,
        timeout=self.client.timeout,
    )

    model_class = self._prepare(Deployment)
    return model_class(client=self.client, id=r.json().get("name"))

get async #

get(id: str) -> Deployment

通过ID获取部署。

Source code in llama_deploy/client/models/apiserver.py
256
257
258
259
260
261
262
263
264
265
266
267
async def get(self, id: str) -> Deployment:
    """Gets a deployment by id."""
    get_url = f"{self.client.api_server_url}/deployments/{id}"
    # Current version of apiserver doesn't returns anything useful in this endpoint, let's just ignore it
    await self.client.request(
        "GET",
        get_url,
        verify=not self.client.disable_ssl,
        timeout=self.client.timeout,
    )
    model_class = self._prepare(Deployment)
    return model_class(client=self.client, id=id)

ApiServer #

基类: Model

一个代表API服务器实例的模型。

Source code in llama_deploy/client/models/apiserver.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
class ApiServer(Model):
    """A model representing the API Server instance."""

    async def status(self) -> Status:
        """Returns the status of the API Server."""
        status_url = f"{self.client.api_server_url}/status/"

        try:
            r = await self.client.request(
                "GET",
                status_url,
                verify=not self.client.disable_ssl,
                timeout=self.client.timeout,
            )
        except httpx.ConnectError:
            return Status(
                status=StatusEnum.DOWN,
                status_message="API Server is down",
            )

        if r.status_code >= 400:
            body = r.json()
            return Status(status=StatusEnum.UNHEALTHY, status_message=r.text)

        description = "LlamaDeploy is up and running."
        body = r.json()
        deployments = body.get("deployments") or []
        if deployments:
            description += "\nActive deployments:"
            for d in deployments:
                description += f"\n- {d}"
        else:
            description += "\nCurrently there are no active deployments"

        return Status(
            status=StatusEnum.HEALTHY,
            status_message=description,
            deployments=deployments,
        )

    @property
    def deployments(self) -> DeploymentCollection:
        """Returns a collection of deployments currently active in the API Server."""
        model_class = self._prepare(DeploymentCollection)
        return model_class(client=self.client, items={})

部署 property #

deployments: DeploymentCollection

返回API服务器中当前活跃的部署集合。

状态 async #

status() -> Status

返回API服务器的状态。

Source code in llama_deploy/client/models/apiserver.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
async def status(self) -> Status:
    """Returns the status of the API Server."""
    status_url = f"{self.client.api_server_url}/status/"

    try:
        r = await self.client.request(
            "GET",
            status_url,
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )
    except httpx.ConnectError:
        return Status(
            status=StatusEnum.DOWN,
            status_message="API Server is down",
        )

    if r.status_code >= 400:
        body = r.json()
        return Status(status=StatusEnum.UNHEALTHY, status_message=r.text)

    description = "LlamaDeploy is up and running."
    body = r.json()
    deployments = body.get("deployments") or []
    if deployments:
        description += "\nActive deployments:"
        for d in deployments:
            description += f"\n- {d}"
    else:
        description += "\nCurrently there are no active deployments"

    return Status(
        status=StatusEnum.HEALTHY,
        status_message=description,
        deployments=deployments,
    )

控制平面功能#

会话 #

基类: Model

Source code in llama_deploy/client/models/core.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class Session(Model):
    async def run(self, service_name: str, **run_kwargs: Any) -> str:
        """Implements the workflow-based run API for a session."""
        task_input = json.dumps(run_kwargs)
        task_def = TaskDefinition(input=task_input, agent_id=service_name)
        task_id = await self._do_create_task(task_def)

        # wait for task to complete, up to timeout seconds
        async def _get_result() -> str:
            while True:
                task_result = await self._do_get_task_result(task_id)

                if isinstance(task_result, TaskResult):
                    return task_result.result or ""
                await asyncio.sleep(self.client.poll_interval)

        return await asyncio.wait_for(_get_result(), timeout=self.client.timeout)

    async def run_nowait(self, service_name: str, **run_kwargs: Any) -> str:
        """Implements the workflow-based run API for a session, but does not wait for the task to complete."""

        task_input = json.dumps(run_kwargs)
        task_def = TaskDefinition(input=task_input, agent_id=service_name)
        task_id = await self._do_create_task(task_def)

        return task_id

    async def create_task(self, task_def: TaskDefinition) -> str:
        """Create a new task in this session.

        Args:
            task_def (Union[str, TaskDefinition]): The task definition or input string.

        Returns:
            str: The ID of the created task.
        """
        return await self._do_create_task(task_def)

    async def _do_create_task(self, task_def: TaskDefinition) -> str:
        """Async-only version of create_task, to be used internally from other methods."""
        task_def.session_id = self.id
        url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks"
        response = await self.client.request("POST", url, json=task_def.model_dump())
        return response.json()

    async def get_task_result(self, task_id: str) -> TaskResult | None:
        """Get the result of a task in this session if it has one.

        Args:
            task_id (str): The ID of the task to get the result for.

        Returns:
            Optional[TaskResult]: The result of the task if it has one, otherwise None.
        """
        return await self._do_get_task_result(task_id)

    async def _do_get_task_result(self, task_id: str) -> TaskResult | None:
        """Async-only version of get_task_result, to be used internally from other methods."""
        url = (
            f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/result"
        )
        response = await self.client.request("GET", url)
        data = response.json()
        return TaskResult(**data) if data else None

    async def get_tasks(self) -> list[TaskDefinition]:
        """Get all tasks in this session.

        Returns:
            list[TaskDefinition]: A list of task definitions in the session.
        """
        url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks"
        response = await self.client.request("GET", url)
        return [TaskDefinition(**task) for task in response.json()]

    async def send_event(self, service_name: str, task_id: str, ev: Event) -> None:
        """Send event to a Workflow service.

        Args:
            event (Event): The event to be submitted to the workflow.

        Returns:
            None
        """
        serializer = JsonSerializer()
        event_def = EventDefinition(
            event_obj_str=serializer.serialize(ev), agent_id=service_name
        )

        url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/send_event"
        await self.client.request("POST", url, json=event_def.model_dump())

    async def send_event_def(self, task_id: str, ev_def: EventDefinition) -> None:
        """Send event to a Workflow service.

        Args:
            event (Event): The event to be submitted to the workflow.

        Returns:
            None
        """
        url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/send_event"
        await self.client.request("POST", url, json=ev_def.model_dump())

    async def get_task_result_stream(
        self, task_id: str
    ) -> AsyncGenerator[dict[str, Any], None]:
        """Get the result of a task in this session if it has one.

        Args:
            task_id (str): The ID of the task to get the result for.

        Returns:
            AsyncGenerator[str, None, None]: A generator that yields the result of the task.
        """
        url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/result_stream"
        start_time = time.time()
        while True:
            try:
                async with httpx.AsyncClient(timeout=self.client.timeout) as client:
                    async with client.stream("GET", url) as response:
                        response.raise_for_status()
                        async for line in response.aiter_lines():
                            json_line = json.loads(line)
                            yield json_line
                        break  # Exit the function if successful
            except httpx.HTTPStatusError as e:
                if e.response.status_code != 404:
                    raise  # Re-raise if it's not a 404 error
                if (
                    self.client.timeout is None  # means no timeout, always poll
                    or time.time() - start_time < self.client.timeout
                ):
                    await asyncio.sleep(self.client.poll_interval)
                else:
                    raise TimeoutError(
                        f"Task result not available after waiting for {self.client.timeout} seconds"
                    )

运行 async #

run(service_name: str, **run_kwargs: Any) -> str

为会话实现基于工作流的运行API。

Source code in llama_deploy/client/models/core.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
async def run(self, service_name: str, **run_kwargs: Any) -> str:
    """Implements the workflow-based run API for a session."""
    task_input = json.dumps(run_kwargs)
    task_def = TaskDefinition(input=task_input, agent_id=service_name)
    task_id = await self._do_create_task(task_def)

    # wait for task to complete, up to timeout seconds
    async def _get_result() -> str:
        while True:
            task_result = await self._do_get_task_result(task_id)

            if isinstance(task_result, TaskResult):
                return task_result.result or ""
            await asyncio.sleep(self.client.poll_interval)

    return await asyncio.wait_for(_get_result(), timeout=self.client.timeout)

run_nowait async #

run_nowait(service_name: str, **run_kwargs: Any) -> str

实现了基于工作流的会话运行API,但不会等待任务完成。

Source code in llama_deploy/client/models/core.py
38
39
40
41
42
43
44
45
async def run_nowait(self, service_name: str, **run_kwargs: Any) -> str:
    """Implements the workflow-based run API for a session, but does not wait for the task to complete."""

    task_input = json.dumps(run_kwargs)
    task_def = TaskDefinition(input=task_input, agent_id=service_name)
    task_id = await self._do_create_task(task_def)

    return task_id

create_task async #

create_task(task_def: TaskDefinition) -> str

在此会话中创建一个新任务。

参数:

名称 类型 描述 默认值
task_def Union[str, TaskDefinition]

任务定义或输入字符串。

required

返回:

名称 类型 描述
str str

创建任务的ID。

Source code in llama_deploy/client/models/core.py
47
48
49
50
51
52
53
54
55
56
async def create_task(self, task_def: TaskDefinition) -> str:
    """Create a new task in this session.

    Args:
        task_def (Union[str, TaskDefinition]): The task definition or input string.

    Returns:
        str: The ID of the created task.
    """
    return await self._do_create_task(task_def)

get_task_result async #

get_task_result(task_id: str) -> TaskResult | None

获取此会话中任务的结果(如果有的话)。

参数:

名称 类型 描述 默认值
task_id str

获取结果的任务ID。

required

返回:

类型 描述
TaskResult | None

Optional[TaskResult]: 如果任务有结果则返回该结果,否则返回None。

Source code in llama_deploy/client/models/core.py
65
66
67
68
69
70
71
72
73
74
async def get_task_result(self, task_id: str) -> TaskResult | None:
    """Get the result of a task in this session if it has one.

    Args:
        task_id (str): The ID of the task to get the result for.

    Returns:
        Optional[TaskResult]: The result of the task if it has one, otherwise None.
    """
    return await self._do_get_task_result(task_id)

get_tasks async #

get_tasks() -> list[TaskDefinition]

获取此会话中的所有任务。

返回:

类型 描述
list[TaskDefinition]

list[TaskDefinition]: 会话中的任务定义列表。

Source code in llama_deploy/client/models/core.py
85
86
87
88
89
90
91
92
93
async def get_tasks(self) -> list[TaskDefinition]:
    """Get all tasks in this session.

    Returns:
        list[TaskDefinition]: A list of task definitions in the session.
    """
    url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks"
    response = await self.client.request("GET", url)
    return [TaskDefinition(**task) for task in response.json()]

send_event async #

send_event(service_name: str, task_id: str, ev: Event) -> None

将事件发送至工作流服务。

参数:

名称 类型 描述 默认值
event Event

要提交给工作流的事件。

required

返回:

类型 描述
None

Source code in llama_deploy/client/models/core.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
async def send_event(self, service_name: str, task_id: str, ev: Event) -> None:
    """Send event to a Workflow service.

    Args:
        event (Event): The event to be submitted to the workflow.

    Returns:
        None
    """
    serializer = JsonSerializer()
    event_def = EventDefinition(
        event_obj_str=serializer.serialize(ev), agent_id=service_name
    )

    url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/send_event"
    await self.client.request("POST", url, json=event_def.model_dump())

send_event_def async #

send_event_def(task_id: str, ev_def: EventDefinition) -> None

向工作流服务发送事件。

参数:

名称 类型 描述 默认值
event Event

要提交给工作流的事件。

required

返回:

类型 描述
None

Source code in llama_deploy/client/models/core.py
112
113
114
115
116
117
118
119
120
121
122
async def send_event_def(self, task_id: str, ev_def: EventDefinition) -> None:
    """Send event to a Workflow service.

    Args:
        event (Event): The event to be submitted to the workflow.

    Returns:
        None
    """
    url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/send_event"
    await self.client.request("POST", url, json=ev_def.model_dump())

get_task_result_stream async #

get_task_result_stream(task_id: str) -> AsyncGenerator[dict[str, Any], None]

获取此会话中任务的结果(如果有的话)。

参数:

名称 类型 描述 默认值
task_id str

获取结果的任务ID。

required

返回:

类型 描述
AsyncGenerator[dict[str, Any], None]

AsyncGenerator[str, None, None]: 一个生成器,用于产生任务的结果。

Source code in llama_deploy/client/models/core.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
async def get_task_result_stream(
    self, task_id: str
) -> AsyncGenerator[dict[str, Any], None]:
    """Get the result of a task in this session if it has one.

    Args:
        task_id (str): The ID of the task to get the result for.

    Returns:
        AsyncGenerator[str, None, None]: A generator that yields the result of the task.
    """
    url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/result_stream"
    start_time = time.time()
    while True:
        try:
            async with httpx.AsyncClient(timeout=self.client.timeout) as client:
                async with client.stream("GET", url) as response:
                    response.raise_for_status()
                    async for line in response.aiter_lines():
                        json_line = json.loads(line)
                        yield json_line
                    break  # Exit the function if successful
        except httpx.HTTPStatusError as e:
            if e.response.status_code != 404:
                raise  # Re-raise if it's not a 404 error
            if (
                self.client.timeout is None  # means no timeout, always poll
                or time.time() - start_time < self.client.timeout
            ):
                await asyncio.sleep(self.client.poll_interval)
            else:
                raise TimeoutError(
                    f"Task result not available after waiting for {self.client.timeout} seconds"
                )

会话集合 #

基类: Collection

Source code in llama_deploy/client/models/core.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
class SessionCollection(Collection):
    async def list(self) -> list[Session]:  # type: ignore
        """Returns a list of all the sessions in the collection."""
        sessions_url = f"{self.client.control_plane_url}/sessions"
        response = await self.client.request("GET", sessions_url)
        sessions = []
        model_class = self._prepare(Session)
        for id, session_def in response.json().items():
            sessions.append(model_class(client=self.client, id=id))
        return sessions

    async def create(self) -> Session:
        """Creates a new session and returns a Session object.

        Returns:
            Session: A Session object representing the newly created session.
        """
        return await self._create()

    async def _create(self) -> Session:
        """Async-only version of create, to be used internally from other methods."""
        create_url = f"{self.client.control_plane_url}/sessions/create"
        response = await self.client.request("POST", create_url)
        session_id = response.json()
        model_class = self._prepare(Session)
        return model_class(client=self.client, id=session_id)

    async def get(self, id: str) -> Session:
        """Gets a session by ID.

        Args:
            session_id: The ID of the session to get.

        Returns:
            Session: A Session object representing the specified session.

        Raises:
            ValueError: If the session does not exist.
        """
        return await self._get(id)

    async def _get(self, id: str) -> Session:
        """Async-only version of get, to be used internally from other methods."""

        get_url = f"{self.client.control_plane_url}/sessions/{id}"
        await self.client.request("GET", get_url)
        model_class = self._prepare(Session)
        return model_class(client=self.client, id=id)

    async def get_or_create(self, id: str) -> Session:
        """Gets a session by ID, or creates a new one if it doesn't exist.

        Returns:
            Session: A Session object representing the specified session.
        """
        try:
            return await self._get(id)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 404:
                return await self._create()
            raise e

    async def delete(self, session_id: str) -> None:
        """Deletes a session by ID.

        Args:
            session_id: The ID of the session to delete.
        """
        delete_url = f"{self.client.control_plane_url}/sessions/{session_id}/delete"
        await self.client.request("POST", delete_url)

列表 async #

list() -> list[Session]

返回集合中所有会话的列表。

Source code in llama_deploy/client/models/core.py
161
162
163
164
165
166
167
168
169
async def list(self) -> list[Session]:  # type: ignore
    """Returns a list of all the sessions in the collection."""
    sessions_url = f"{self.client.control_plane_url}/sessions"
    response = await self.client.request("GET", sessions_url)
    sessions = []
    model_class = self._prepare(Session)
    for id, session_def in response.json().items():
        sessions.append(model_class(client=self.client, id=id))
    return sessions

创建 async #

create() -> Session

创建一个新会话并返回一个会话对象。

返回:

名称 类型 描述
Session Session

表示新创建会话的Session对象。

Source code in llama_deploy/client/models/core.py
171
172
173
174
175
176
177
async def create(self) -> Session:
    """Creates a new session and returns a Session object.

    Returns:
        Session: A Session object representing the newly created session.
    """
    return await self._create()

get async #

get(id: str) -> Session

通过ID获取会话。

参数:

名称 类型 描述 默认值
session_id

要获取的会话ID。

required

返回:

名称 类型 描述
Session Session

表示指定会话的Session对象。

抛出异常:

类型 描述
ValueError

如果会话不存在。

Source code in llama_deploy/client/models/core.py
187
188
189
190
191
192
193
194
195
196
197
198
199
async def get(self, id: str) -> Session:
    """Gets a session by ID.

    Args:
        session_id: The ID of the session to get.

    Returns:
        Session: A Session object representing the specified session.

    Raises:
        ValueError: If the session does not exist.
    """
    return await self._get(id)

获取或创建 async #

get_or_create(id: str) -> Session

根据ID获取会话,如果不存在则创建一个新会话。

返回:

名称 类型 描述
Session Session

表示指定会话的Session对象。

Source code in llama_deploy/client/models/core.py
209
210
211
212
213
214
215
216
217
218
219
220
async def get_or_create(self, id: str) -> Session:
    """Gets a session by ID, or creates a new one if it doesn't exist.

    Returns:
        Session: A Session object representing the specified session.
    """
    try:
        return await self._get(id)
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            return await self._create()
        raise e

删除 async #

delete(session_id: str) -> None

根据ID删除会话。

参数:

名称 类型 描述 默认值
session_id str

要删除的会话ID。

required
Source code in llama_deploy/client/models/core.py
222
223
224
225
226
227
228
229
async def delete(self, session_id: str) -> None:
    """Deletes a session by ID.

    Args:
        session_id: The ID of the session to delete.
    """
    delete_url = f"{self.client.control_plane_url}/sessions/{session_id}/delete"
    await self.client.request("POST", delete_url)

服务 #

基类: Model

Source code in llama_deploy/client/models/core.py
232
233
class Service(Model):
    pass

ServiceCollection #

基类: Collection

Source code in llama_deploy/client/models/core.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
class ServiceCollection(Collection):
    async def list(self) -> list[Service]:  # type: ignore
        """Returns a list containing all the services registered with the control plane.

        Returns:
            list[Service]: List of services registered with the control plane.
        """
        services_url = f"{self.client.control_plane_url}/services"
        response = await self.client.request("GET", services_url)
        services = []
        model_class = self._prepare(Service)

        for name, service in response.json().items():
            services.append(model_class(client=self.client, id=name))

        return services

    async def register(self, service: ServiceDefinition) -> Service:
        """Registers a service with the control plane.

        Args:
            service: Definition of the Service to register.
        """
        register_url = f"{self.client.control_plane_url}/services/register"
        await self.client.request("POST", register_url, json=service.model_dump())
        model_class = self._prepare(Service)
        s = model_class(id=service.service_name, client=self.client)
        self.items[service.service_name] = s
        return s

    async def deregister(self, service_name: str) -> None:
        """Deregisters a service from the control plane.

        Args:
            service_name: The name of the Service to deregister.
        """
        deregister_url = f"{self.client.control_plane_url}/services/deregister"
        await self.client.request(
            "POST",
            deregister_url,
            params={"service_name": service_name},
        )

列表 async #

list() -> list[Service]

返回一个包含所有在控制平面注册的服务的列表。

返回:

类型 描述
list[Service]

list[Service]: 在控制平面注册的服务列表。

Source code in llama_deploy/client/models/core.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
async def list(self) -> list[Service]:  # type: ignore
    """Returns a list containing all the services registered with the control plane.

    Returns:
        list[Service]: List of services registered with the control plane.
    """
    services_url = f"{self.client.control_plane_url}/services"
    response = await self.client.request("GET", services_url)
    services = []
    model_class = self._prepare(Service)

    for name, service in response.json().items():
        services.append(model_class(client=self.client, id=name))

    return services

注册 async #

register(service: ServiceDefinition) -> Service

向控制平面注册一个服务。

参数:

名称 类型 描述 默认值
service ServiceDefinition

要注册的服务的定义。

required
Source code in llama_deploy/client/models/core.py
253
254
255
256
257
258
259
260
261
262
263
264
async def register(self, service: ServiceDefinition) -> Service:
    """Registers a service with the control plane.

    Args:
        service: Definition of the Service to register.
    """
    register_url = f"{self.client.control_plane_url}/services/register"
    await self.client.request("POST", register_url, json=service.model_dump())
    model_class = self._prepare(Service)
    s = model_class(id=service.service_name, client=self.client)
    self.items[service.service_name] = s
    return s

注销 async #

deregister(service_name: str) -> None

从控制平面注销一个服务。

参数:

名称 类型 描述 默认值
service_name str

要注销的服务的名称。

required
Source code in llama_deploy/client/models/core.py
266
267
268
269
270
271
272
273
274
275
276
277
async def deregister(self, service_name: str) -> None:
    """Deregisters a service from the control plane.

    Args:
        service_name: The name of the Service to deregister.
    """
    deregister_url = f"{self.client.control_plane_url}/services/deregister"
    await self.client.request(
        "POST",
        deregister_url,
        params={"service_name": service_name},
    )

核心 #

基类: Model

Source code in llama_deploy/client/models/core.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
class Core(Model):
    @property
    def services(self) -> ServiceCollection:
        """Returns a collection containing all the services registered with the control plane.

        Returns:
            ServiceCollection: Collection of services registered with the control plane.
        """
        model_class = self._prepare(ServiceCollection)
        return model_class(client=self.client, items={})

    @property
    def sessions(self) -> SessionCollection:
        """Returns a collection to access all the sessions registered with the control plane.

        Returns:
            SessionCollection: Collection of sessions registered with the control plane.
        """
        model_class = self._prepare(SessionCollection)
        return model_class(client=self.client, items={})

服务 property #

返回一个包含所有在控制平面注册的服务的集合。

返回:

名称 类型 描述
ServiceCollection ServiceCollection

在控制平面注册的服务集合。

会话 property #

返回一个集合,用于访问所有在控制平面注册的会话。

返回:

名称 类型 描述
SessionCollection SessionCollection

在控制平面注册的会话集合。

优云智算