模型组成

模型组合允许您结合多个模型来构建复杂的AI应用程序,如RAG和AI代理。BentoML提供了简单的Service APIs,用于创建需要模型协同工作的工作流——无论是顺序(一个接一个)还是并行(同时)工作。

当您需要时,您可能希望在 BentoML 中使用模型组合:

  • 使用不同的模型一起处理不同类型的数据(例如,图像和文本)

  • 通过结合多个模型的结果来提高准确性和性能

  • 在专用硬件上运行不同的模型(例如,GPU和CPU)

  • 使用专门的模型或服务编排预处理、推理和后处理等顺序步骤

另请参阅

欲了解更多信息,请参阅博客文章 A Guide to Model Composition

示例

BentoML 中的模型组合可以涉及单个或多个 Services,具体取决于您的应用程序。

对于每个服务,您可以在@bentoml.service装饰器中使用resources来配置部署所需的资源,例如GPU。请注意,此字段仅在BentoCloud上生效

在一个服务中运行多个模型

您可以在同一硬件设备上运行多个模型,并为它们暴露单独或组合的API。

import bentoml
from bentoml.models import HuggingFaceModel
from transformers import pipeline
from typing import List

# Run two models in the same Service on the same hardware device
@bentoml.service(
    resources={"gpu": 1, "memory": "4GiB"},
    traffic={"timeout": 20},
)
class MultiModelService:
    # Retrieve model references from HF by specifying its HF ID
    model_a_path = HuggingFaceModel("FacebookAI/roberta-large-mnli")
    model_b_path = HuggingFaceModel("distilbert/distilbert-base-uncased")

    def __init__(self) -> None:
        # Initialize pipelines for each model
        self.pipeline_a = pipeline(task="zero-shot-classification", model=self.model_a_path, hypothesis_template="This text is about {}")
        self.pipeline_b = pipeline(task="sentiment-analysis", model=self.model_b_path)

    # Define an API for data processing with model A
    @bentoml.api
    def process_a(self, input_data: str, labels: List[str] = ["positive", "negative", "neutral"]) -> dict:
        return self.pipeline_a(input_data, labels)

    # Define an API for data processing with model B
    @bentoml.api
    def process_b(self, input_data: str) -> dict:
        return self.pipeline_b(input_data)[0]

    # Define an API endpoint that combines the processing of both models
    @bentoml.api
    def combined_process(self, input_data: str, labels: List[str] = ["positive", "negative", "neutral"]) -> dict:
        classification = self.pipeline_a(input_data, labels)
        sentiment = self.pipeline_b(input_data)[0]
        return {
            "classification": classification,
            "sentiment": sentiment
        }

注意

HuggingFaceModel 函数返回下载的模型路径作为字符串。您必须传入 Hugging Face 上显示的模型 ID(例如,HuggingFaceModel("FacebookAI/roberta-large-mnli"))。详情请参阅 加载和管理模型

在单独的服务中独立运行和扩展多个模型

当您的模型需要独立扩展或不同的硬件时,将它们拆分为单独的服务。

顺序

你可以让模型按顺序工作,其中一个模型的输出成为另一个模型的输入。这对于创建需要在用于预测之前对数据进行预处理的管道非常有用。

import bentoml
from bentoml.models import HuggingFaceModel
from transformers import pipeline
from typing import Dict, Any


@bentoml.service(resources={"cpu": "2", "memory": "2Gi"})
class PreprocessingService:
    model_a_path = HuggingFaceModel("distilbert/distilbert-base-uncased")

    def __init__(self) -> None:
        # Initialize pipeline for model A
        self.pipeline_a = pipeline(task="text-classification", model=self.model_a_path)

    @bentoml.api
    def preprocess(self, input_data: str) -> Dict[str, Any]:
        # Dummy preprocessing steps
        return self.pipeline_a(input_data)[0]


@bentoml.service(resources={"gpu": 1, "memory": "4Gi"})
class InferenceService:
    model_b_path = HuggingFaceModel("distilbert/distilroberta-base")
    preprocessing_service = bentoml.depends(PreprocessingService)

    def __init__(self) -> None:
        # Initialize pipeline for model B
        self.pipeline_b = pipeline(task="text-classification", model=self.model_b_path)

    @bentoml.api
    async def predict(self, input_data: str) -> Dict[str, Any]:
        # Dummy inference on preprocessed data
        # Implement your custom logic here
        preprocessed_data = await self.preprocessing_service.to_async.preprocess(input_data)
        final_result = self.pipeline_b(input_data)[0]
        return {
            "preprocessing_result": preprocessed_data,
            "final_result": final_result
        }

您使用 bentoml.depends 从一个服务访问另一个服务。它接受依赖的服务类作为参数,并允许您调用其可用的函数。详情请参见 运行分布式服务

您使用服务的.to_async属性将同步方法转换为异步方法。请注意,不建议在异步上下文中直接调用同步阻塞函数,因为它可能会阻塞事件循环。

并发

您可以同时运行多个独立的模型,然后将它们的结果结合起来。这对于集成模型非常有用,您希望聚合来自不同模型的预测以提高准确性。

import asyncio
import bentoml
from bentoml.models import HuggingFaceModel
from transformers import pipeline
from typing import Dict, Any, List

@bentoml.service(resources={"gpu": 1, "memory": "4Gi"})
class ModelAService:
    model_a_path = HuggingFaceModel("FacebookAI/roberta-large-mnli")

    def __init__(self) -> None:
        # Initialize pipeline for model A
        self.pipeline_a = pipeline(task="zero-shot-classification", model=self.model_a_path, hypothesis_template="This text is about {}")

    @bentoml.api
    def predict(self, input_data: str, labels: List[str] = ["positive", "negative", "neutral"]) -> Dict[str, Any]:
        # Dummy preprocessing steps
        return self.pipeline_a(input_data, labels)

@bentoml.service(resources={"gpu": 1, "memory": "4Gi"})
class ModelBService:
    model_b_path = HuggingFaceModel("distilbert/distilbert-base-uncased")

    def __init__(self) -> None:
        # Initialize pipeline for model B
        self.pipeline_b = pipeline(task="sentiment-analysis", model=self.model_b_path)

    @bentoml.api
    def predict(self, input_data: str) -> Dict[str, Any]:
        # Dummy preprocessing steps
        return self.pipeline_b(input_data)[0]

@bentoml.service(resources={"cpu": "4", "memory": "8Gi"})
class EnsembleService:
    service_a = bentoml.depends(ModelAService)
    service_b = bentoml.depends(ModelBService)

    @bentoml.api
    async def ensemble_predict(self, input_data: str, labels: List[str] = ["positive", "negative", "neutral"]) -> Dict[str, Any]:
        result_a, result_b = await asyncio.gather(
            self.service_a.to_async.predict(input_data, labels),
            self.service_b.to_async.predict(input_data)
        )
        # Dummy aggregation
        return {
            "zero_shot_classification": result_a,
            "sentiment_analysis": result_b
        }

推理图

您可以创建更复杂的工作流程,结合并行和顺序处理。

import asyncio
import typing as t

import transformers

import bentoml

MAX_LENGTH = 128
NUM_RETURN_SEQUENCE = 1

@bentoml.service(
    resources={"gpu": 1, "memory": "4Gi"}
)
class GPT2:
    def __init__(self):
        self.generation_pipeline_1 = transformers.pipeline(
            task="text-generation",
            model="gpt2",
        )

    @bentoml.api
    def generate(self, sentence: str) -> t.List[t.Any]:
        return self.generation_pipeline_1(sentence)

@bentoml.service(
    resources={"gpu": 1, "memory": "4Gi"}
)
class DistilGPT2:
    def __init__(self):
        self.generation_pipeline_2 = transformers.pipeline(
            task="text-generation",
            model="distilgpt2",
        )

    @bentoml.api
    def generate(self, sentence: str) -> t.List[t.Any]:
        return self.generation_pipeline_2(sentence)

@bentoml.service(
    resources={"cpu": "2", "memory": "2Gi"}
)
class BertBaseUncased:
    def __init__(self):
        self.classification_pipeline = transformers.pipeline(
            task="text-classification",
            model="bert-base-uncased",
            tokenizer="bert-base-uncased",
        )

    @bentoml.api
    def classify_generated_texts(self, sentence: str) -> float | str:
        score = self.classification_pipeline(sentence)[0]["score"]  # type: ignore
        return score

@bentoml.service(
    resources={"cpu": "4", "memory": "8Gi"}
)
class InferenceGraph:
    gpt2_generator = bentoml.depends(GPT2)
    distilgpt2_generator = bentoml.depends(DistilGPT2)
    bert_classifier = bentoml.depends(BertBaseUncased)

    @bentoml.api
    async def generate_score(
        self, original_sentence: str = "I have an idea!"
    ) -> t.List[t.Dict[str, t.Any]]:
        generated_sentences = [  # type: ignore
            result[0]["generated_text"]
            for result in await asyncio.gather(  # type: ignore
                self.gpt2_generator.to_async.generate(  # type: ignore
                    original_sentence,
                    max_length=MAX_LENGTH,
                    num_return_sequences=NUM_RETURN_SEQUENCE,
                ),
                self.distilgpt2_generator.to_async.generate(  # type: ignore
                    original_sentence,
                    max_length=MAX_LENGTH,
                    num_return_sequences=NUM_RETURN_SEQUENCE,
                ),
            )
        ]

        results = []
        for sentence in generated_sentences:  # type: ignore
            score = await self.bert_classifier.to_async.classify_generated_texts(
                sentence
            )  # type: ignore
            results.append(
                {
                    "generated": sentence,
                    "score": score,
                }
            )

        return results

此示例创建了一个工作流程,该流程:

  1. 将文本提示作为输入

  2. 使用GPT2和DistilGPT2并行生成新文本

  3. 使用BERT依次对每个生成的文本响应进行评分

  4. 返回生成的文本及其分数

注意

在某些情况下,您可能希望直接将一个LLM的输出流式传输到另一个LLM作为输入,以构建一个复合LLM系统。这在BentoML中尚不支持,但已在路线图中。如果您对这个话题感兴趣,欢迎加入我们在BentoML Slack社区的讨论,或者在GitHub上提出问题