autogen_ext.code_executors.jupyter#

class JupyterCodeExecutor(kernel_name: str = 'python3', timeout: int = 60, output_dir: 路径 = Path('.'))[源代码]#

基础: CodeExecutor, Component[JupyterCodeExecutorConfig]

一个代码执行器类,使用jupyter/nbclient来有状态地执行代码。

危险

这将在本地机器上执行代码。如果与LLM生成的代码一起使用,应谨慎操作。

直接使用它的示例:

import asyncio
from autogen_core import CancellationToken
from autogen_core.code_executor import CodeBlock
from autogen_ext.code_executors.jupyter import JupyterCodeExecutor


async def main() -> None:
    async with JupyterCodeExecutor() as executor:
        cancel_token = CancellationToken()
        code_blocks = [CodeBlock(code="print('hello world!')", language="python")]
        code_result = await executor.execute_code_blocks(code_blocks, cancel_token)
        print(code_result)


asyncio.run(main())

使用PythonCodeExecutionTool的示例:

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.code_executors.jupyter import JupyterCodeExecutor
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.tools.code_execution import PythonCodeExecutionTool


async def main() -> None:
    async with JupyterCodeExecutor() as executor:
        tool = PythonCodeExecutionTool(executor)
        model_client = OpenAIChatCompletionClient(model="gpt-4o")
        agent = AssistantAgent("assistant", model_client=model_client, tools=[tool])
        result = await agent.run(task="What is the 10th Fibonacci number? Use Python to calculate it.")
        print(result)


asyncio.run(main())

CodeExecutorAgent中使用它的示例:

import asyncio
from autogen_agentchat.agents import CodeExecutorAgent
from autogen_agentchat.messages import TextMessage
from autogen_ext.code_executors.jupyter import JupyterCodeExecutor
from autogen_core import CancellationToken


async def main() -> None:
    async with JupyterCodeExecutor() as executor:
        code_executor_agent = CodeExecutorAgent("code_executor", code_executor=executor)
        task = TextMessage(
            content='''Here is some code
    ```python
    print('Hello world')
    ```
    ''',
            source="user",
        )
        response = await code_executor_agent.on_messages([task], CancellationToken())
        print(response.chat_message)


asyncio.run(main())
Parameters:
  • kernel_name (str) – 使用的内核名称。默认情况下为“python3”。

  • timeout (int) – 代码执行的超时时间,默认值为60。

  • output_dir (Path) – 保存输出文件的目录,默认为“.”。

component_config_schema#

别名 JupyterCodeExecutorConfig

component_provider_override: ClassVar[str | ] = 'autogen_ext.code_executors.jupyter.JupyterCodeExecutor'#

覆盖组件的提供商字符串。这应用于防止内部模块名称成为模块名称的一部分。

async execute_code_blocks(code_blocks: list[CodeBlock], cancellation_token: CancellationToken) JupyterCodeResult[源代码]#

执行代码块并返回结果。

Parameters:

code_blocks (list[CodeBlock]) – 要执行的代码块。

Returns:

JupyterCodeResult - 代码执行的结果。

async restart() [源代码]#

重新启动代码执行器。

async start() [源代码]#
async stop() [源代码]#

停止内核。

class JupyterCodeResult(exit_code: int, output: str, output_files: list[路径])[源代码]#

基类: CodeResult

一个用于Jupyter代码执行器的代码结果类。

output_files: list[路径]#