持续监控管道#

作者:  Open on GitHubOpen on GitHub

本教程描述了在Azure ML管道中运行流程的高级用例。
关于先决条件和原则的详细解释可以在上述文章中找到。 持续监控对于维护生成式AI应用程序的质量、性能和效率是必要的。
这些因素直接影响用户体验和运营成本。

我们将对一个基本的聊天机器人流程进行评估,然后汇总结果以导出并可视化指标。
此管道中使用的流程如下所述:

continuous_monitoring_pipeline.png

1.1 导入所需的库#

安装所需的python包#

确保‘azure-ai-ml’的版本高于1.12.0

# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, load_component, Input, Output
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline
from datetime import datetime

1.2 配置凭证#

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

1.3 获取工作区的句柄#

# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

1.4.1 验证基本聊天流程#

导入流程所需的包#

pip install -r ../../../flows/chat/chat-basic/requirements.txt

!pf flow validate --source ../../../flows/chat/chat-basic

1.4.2 验证问答RAG评估流程#

导入流程所需的包#

pip install -r ../../../flows/evaluation/eval-qna-rag-metrics/requirements.txt

!pf flow validate --source ../../../flows/evaluation/eval-qna-rag-metrics

1.4.3 验证感知智能评估流程#

导入流程所需的包#

pip install -r ../../../flows/evaluation/eval-perceived-intelligence/requirements.txt

!pf flow validate --source ../../../flows/evaluation/eval-perceived-intelligence

1.4.4 验证摘要流程#

pip install -r ../flows/standard/simple-summarization/requirements.txt

!pf flow validate --source ./flows/standard/simple-summarization

1.4.5 验证摘要评估流程#

pip install -r ../../../flows/evaluation/eval-summarization/requirements.txt

!pf flow validate --source ../../../flows/evaluation/eval-summarization

2. 加载聊天流程作为组件#

chat_flow_component = load_component("../../../flows/chat/chat-basic/flow.dag.yaml")

2.1 加载QnA RAG评估流程作为组件#

eval_qna_rag_metrics_component = load_component(
    "../../../flows/evaluation/eval-qna-rag-metrics/flow.dag.yaml"
)

2.2 加载感知智能流作为组件#

eval_perceived_intelligence_component = load_component(
    "../../../flows/evaluation/eval-perceived-intelligence/flow.dag.yaml"
)

2.3 加载汇总流程作为组件#

simple_summarization_component = load_component(
    "./flows/standard/simple-summarization/flow.dag.yaml"
)

2.4 加载汇总评估流程作为组件#

eval_summarization_component = load_component(
    "../../../flows/evaluation/eval-summarization/flow.dag.yaml"
)

2.3 加载Parquet转换器#

Parquet转换器是一个命令组件,它聚合评估节点的结果并将指标记录到ML管道中。

convert_parquet.png

convert_parquet_component = load_component(
    "./components/convert_parquet/convert_parquet.yaml"
)

3.1 声明输入和输出#

data_input = Input(
    path="./data/monitoring_dataset.jsonl",
    type=AssetTypes.URI_FILE,
    mode="mount",
)

eval_results_output = Output(
    # Provide custom flow output file path if needed
    # path="azureml://datastores/<data_store_name>/paths/<path>",
    type=AssetTypes.URI_FOLDER,
    # rw_mount is suggested for flow output
    mode="rw_mount",
)

3.2.1 使用单一流程组件运行管道#

# Define the pipeline as a function
@pipeline()
def continuous_monitoring(
    # Function inputs will be treated as pipeline input data or parameters.
    # Pipeline input could be linked to step inputs to pass data between steps.
    # Users are not required to define pipeline inputs.
    # With pipeline inputs, user can provide the different data or values when they trigger different pipeline runs.
    pipeline_input_data: Input,
    parallel_node_count: int = 1,
):
    # Declare pipeline step 'flow_node' by using flow component
    chat_flow_node = chat_flow_component(
        # Bind the pipeline input data to the port 'data' of the flow component
        # If you don't have pipeline input, you can directly pass the 'data_input' object to the 'data'
        # But with this approach, you can't provide different data when you trigger different pipeline runs.
        data=pipeline_input_data,
        # Declare which column of input data should be mapped to flow input
        # the value pattern follows ${data.<column_name_from_data_input>}
        chat_history="${data.chat_history}",
        question="${data.question}",
        # Provide the connection values of the flow component
        # The value of connection and deployment_name should align with your workspace connection settings.
        connections={
            "chat": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            }
        },
    )

    # Provide run settings of your flow component
    # Only 'compute' is required and other setting will keep default value if not provided.
    # If the workspace has been created with Azure AI Studio is inside a hub,
    # a Compute Cluster cannot be used, use a Serverless instance instead.
    chat_flow_node.environment_variables = {
        "PF_INPUT_FORMAT": "jsonl",
    }
    chat_flow_node.compute = "serverless"
    chat_flow_node.resources = {"instance_count": parallel_node_count}
    chat_flow_node.mini_batch_size = 5
    chat_flow_node.max_concurrency_per_instance = 2
    chat_flow_node.retry_settings = {
        "max_retries": 1,
        "timeout": 1200,
    }
    chat_flow_node.error_threshold = -1
    chat_flow_node.mini_batch_error_threshold = -1
    chat_flow_node.logging_level = "DEBUG"

    # QnA RAG Metrics Evaluation Node
    eval_qna_rag_metrics_node = eval_qna_rag_metrics_component(
        # Bind the pipeline input data to the port 'data' of the flow component
        # If you don't have pipeline input, you can directly pass the 'data_input' object to the 'data'
        # But with this approach, you can't provide different data when you trigger different pipeline runs.
        data=pipeline_input_data,
        # run_outputs connects the output of a previous node
        run_outputs=chat_flow_node.outputs.flow_outputs,
        # Declare which column of input data should be mapped to flow input
        # the value pattern follows ${data.<column_name_from_data_input>}
        documents="${data.documents}",
        question="${data.question}",
        # Declare which column of previous node output should be mapped to flow input
        # the value pattern follows ${run.outputs.<column_name_from_data_input>}
        answer="${run.outputs.answer}",
        # Provide the connection values of the flow component
        # The value of connection and deployment_name should align with your workspace connection settings.
        connections={
            "gpt_groundedness": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            },
            "gpt_relevance": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            },
            "gpt_retrieval_score": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            },
        },
    )

    # Provide run settings of your flow component
    # Only 'compute' is required and other setting will keep default value if not provided.
    # If the workspace has been created with Azure AI Studio is inside a hub,
    # a Compute Cluster cannot be used, use a Serverless instance instead.
    eval_qna_rag_metrics_node.environment_variables = {
        "PF_INPUT_FORMAT": "jsonl",
    }
    eval_qna_rag_metrics_node.compute = "serverless"
    eval_qna_rag_metrics_node.resources = {"instance_count": parallel_node_count}
    eval_qna_rag_metrics_node.mini_batch_size = 5
    eval_qna_rag_metrics_node.max_concurrency_per_instance = 2
    eval_qna_rag_metrics_node.retry_settings = {
        "max_retries": 1,
        "timeout": 1200,
    }
    eval_qna_rag_metrics_node.error_threshold = -1
    eval_qna_rag_metrics_node.mini_batch_error_threshold = -1
    eval_qna_rag_metrics_node.logging_level = "DEBUG"

    # Perceived Intelligence Evaluation Node
    eval_perceived_intelligence_node = eval_perceived_intelligence_component(
        # Bind the pipeline input data to the port 'data' of the flow component
        # If you don't have pipeline input, you can directly pass the 'data_input' object to the 'data'
        # But with this approach, you can't provide different data when you trigger different pipeline runs.
        data=pipeline_input_data,
        # run_outputs connects the output of a previous node
        run_outputs=chat_flow_node.outputs.flow_outputs,
        # Declare which column of input data should be mapped to flow input
        # the value pattern follows ${data.<column_name_from_data_input>}
        question="${data.question}",
        context="${data.context}",
        # Declare which column of previous node output should be mapped to flow input
        # the value pattern follows ${run.outputs.<column_name_from_data_input>}
        answer="${run.outputs.answer}",
        # Provide the connection values of the flow component
        # The value of connection and deployment_name should align with your workspace connection settings.
        connections={
            "gpt_perceived_intelligence": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            }
        },
    )

    # Provide run settings of your flow component
    # Only 'compute' is required and other setting will keep default value if not provided.
    # If the workspace has been created with Azure AI Studio is inside a hub,
    # a Compute Cluster cannot be used, use a Serverless instance instead.
    eval_perceived_intelligence_node.environment_variables = {
        "PF_INPUT_FORMAT": "jsonl",
    }
    eval_perceived_intelligence_node.compute = "serverless"
    eval_perceived_intelligence_node.resources = {"instance_count": parallel_node_count}
    eval_perceived_intelligence_node.mini_batch_size = 5
    eval_perceived_intelligence_node.max_concurrency_per_instance = 2
    eval_perceived_intelligence_node.retry_settings = {
        "max_retries": 1,
        "timeout": 1200,
    }
    eval_perceived_intelligence_node.error_threshold = -1
    eval_perceived_intelligence_node.mini_batch_error_threshold = -1
    eval_perceived_intelligence_node.logging_level = "DEBUG"

    # Summarization Node
    simple_summarization_node = simple_summarization_component(
        # Bind the pipeline input data to the port 'data' of the flow component
        # If you don't have pipeline input, you can directly pass the 'data_input' object to the 'data'
        # But with this approach, you can't provide different data when you trigger different pipeline runs.
        data=pipeline_input_data,
        # run_outputs connects the output of a previous node
        run_outputs=chat_flow_node.outputs.flow_outputs,
        # Declare which column of previous node output should be mapped to flow input
        # the value pattern follows ${run.outputs.<column_name_from_data_input>}
        answer="${run.outputs.answer}",
        # Provide the connection values of the flow component
        # The value of connection and deployment_name should align with your workspace connection settings.
        connections={
            "summarize_text_content": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            }
        },
    )

    # Provide run settings of your flow component
    # Only 'compute' is required and other setting will keep default value if not provided.
    # If the workspace has been created with Azure AI Studio is inside a hub,
    # a Compute Cluster cannot be used, use a Serverless instance instead.
    simple_summarization_node.environment_variables = {
        "PF_INPUT_FORMAT": "jsonl",
    }
    simple_summarization_node.compute = "serverless"
    simple_summarization_node.resources = {"instance_count": parallel_node_count}
    simple_summarization_node.mini_batch_size = 5
    simple_summarization_node.max_concurrency_per_instance = 2
    simple_summarization_node.retry_settings = {
        "max_retries": 1,
        "timeout": 1200,
    }
    simple_summarization_node.error_threshold = -1
    simple_summarization_node.mini_batch_error_threshold = -1
    simple_summarization_node.logging_level = "DEBUG"

    # Summarization Evaluation Node
    eval_summarization_node = eval_summarization_component(
        # Bind the pipeline input data to the port 'data' of the flow component
        # If you don't have pipeline input, you can directly pass the 'data_input' object to the 'data'
        # But with this approach, you can't provide different data when you trigger different pipeline runs.
        data=simple_summarization_node.outputs.flow_outputs,
        # run_outputs connects the output of a previous node
        run_outputs=chat_flow_node.outputs.flow_outputs,
        # Declare which column of input data should be mapped to flow input
        # the value pattern follows ${data.<column_name_from_data_input>}
        summary="${data.summary}",
        # Declare which column of previous node output should be mapped to flow input
        # the value pattern follows ${run.outputs.<column_name_from_data_input>}
        document="${run.outputs.answer}",
        # Provide the connection values of the flow component
        # The value of connection and deployment_name should align with your workspace connection settings.
        connections={
            "score_fluency": {
                "connection": "azure_open_ai_connection",
            },
            "score_consistency": {
                "connection": "azure_open_ai_connection",
            },
            "score_relevance": {
                "connection": "azure_open_ai_connection",
            },
            "score_coherence": {
                "connection": "azure_open_ai_connection",
            },
        },
    )

    # Provide run settings of your flow component
    # Only 'compute' is required and other setting will keep default value if not provided.
    # If the workspace has been created with Azure AI Studio is inside a hub,
    # a Compute Cluster cannot be used, use a Serverless instance instead.
    eval_summarization_node.environment_variables = {
        "PF_INPUT_FORMAT": "jsonl",
    }
    eval_summarization_node.compute = "serverless"
    eval_summarization_node.resources = {"instance_count": parallel_node_count}
    eval_summarization_node.mini_batch_size = 5
    eval_summarization_node.max_concurrency_per_instance = 2
    eval_summarization_node.retry_settings = {
        "max_retries": 1,
        "timeout": 1200,
    }
    eval_summarization_node.error_threshold = -1
    eval_summarization_node.mini_batch_error_threshold = -1
    eval_summarization_node.logging_level = "DEBUG"

    convert_parquet_node = convert_parquet_component(
        # Bind the evaluation nodes outputs to the command component's input
        eval_qna_rag_metrics_output_folder=eval_qna_rag_metrics_node.outputs.flow_outputs,
        eval_perceived_intelligence_output_folder=eval_perceived_intelligence_node.outputs.flow_outputs,
        eval_summarization_output_folder=eval_summarization_node.outputs.flow_outputs,
    )

    # Provide run settings of your flow component
    # Only 'compute' is required and other setting will keep default value if not provided.
    # If the workspace has been created with Azure AI Studio is inside a hub,
    # a Compute Cluster cannot be used, use a Serverless instance instead.
    convert_parquet_node.compute = "serverless"
    # Function return will be treated as pipeline output. This is not required.
    return {
        "eval_results_output_folder": convert_parquet_node.outputs.eval_results_output
    }


# create pipeline instance
pipeline_job_def = continuous_monitoring(pipeline_input_data=data_input)
pipeline_job_def.outputs.eval_results_output_folder = eval_results_output

3.2.2 提交任务#

# Submit the pipeline job to your workspace
pipeline_job_run = ml_client.jobs.create_or_update(
    pipeline_job_def, experiment_name="Continuous Monitoring"
)
pipeline_job_run

ml_client.jobs.stream(pipeline_job_run.name)

4.1 下一步 - 为您的管道设置调度器#

from datetime import datetime
from azure.ai.ml.entities import JobSchedule, RecurrenceTrigger, RecurrencePattern
from azure.ai.ml.constants import TimeZone

schedule_name = "simple_sdk_create_schedule_recurrence"
schedule_start_time = datetime.utcnow()

recurrence_trigger = RecurrenceTrigger(
    frequency="day",  # could accept "hour", "minute", "day", "week", "month"
    interval=1,
    schedule=RecurrencePattern(hours=10, minutes=[0, 1]),
    start_time=schedule_start_time,
    time_zone=TimeZone.UTC,
)

job_schedule = JobSchedule(
    name=schedule_name,
    trigger=recurrence_trigger,
    # Declare the pipeline job to be scheduled. Here we uses the pipeline job created in previous example.
    create_job=pipeline_job_def,
)

启动调度器#

job_schedule = ml_client.schedules.begin_create_or_update(
    schedule=job_schedule
).result()
print(job_schedule)

禁用或关闭正在运行的调度程序#

job_schedule = ml_client.schedules.begin_disable(name=schedule_name).result()
job_schedule.is_enabled

4.2 下一步 - 将管道部署到端点#

from azure.ai.ml.entities import BatchEndpoint, PipelineComponentBatchDeployment

# from azure.ai.ml.entities import ModelBatchDeployment, ModelBatchDeploymentSettings, Model, AmlCompute, Data, BatchRetrySettings, CodeConfiguration, Environment, Data
# from azure.ai.ml.constants import BatchDeploymentOutputAction


endpoint_name = "hello-my-pipeline-endpoint"
endpoint = BatchEndpoint(
    name=endpoint_name,
    description="A hello world endpoint for pipeline",
)

ml_client.batch_endpoints.begin_create_or_update(endpoint).result()