GPT 动作库 - Snowflake 中间件

Aug 14, 2024
Open in Github

本指南详细介绍了如何将ChatGPT与Snowflake数据仓库连接,目的是将SQL查询返回给ChatGPT以用于数据分析。GPT需要一个与中间件(如Azure函数)交互的操作,以便该操作能够正确格式化来自Snowflake的响应,以便在Python笔记本环境中使用。数据必须以文件形式返回,因此中间件函数应将SQL响应转换为CSV/Excel文件,大小不超过10MB。

本文档将概述中间件函数GPT操作。有关设置中间件函数本身的详细信息,请参阅GPT操作库(中间件)- Azure函数。您可以将此Snowflake中间件操作与直接操作Snowflake的操作结合使用,以启用一个可以在执行之前形成和测试SQL查询的GPT。

现有的Snowflake客户可以利用这些指南从他们的数据仓库中查询数据,并将这些数据加载到数据分析Python环境中以获取进一步的洞察。这使得ChatGPT驱动的分析成为可能,例如可视化数据集、识别模式/异常,或识别数据清洗的空白。这个GPT可以用于从相对较小的数据集中驱动业务决策,或通过AI探索数据子集以生成假设,同时在你探索BI工具中的整体数据集时节省时间和金钱,同时识别以前未见过的模式。

在开始之前,请确保在您的应用程序环境中完成以下步骤:

  • 配置一个Snowflake数据仓库
  • 确保通过ChatGPT认证进入Snowflake的用户具有访问数据库、模式和表的必要角色

此外,在Azure Function App中创建应用程序之前,您需要一种处理用户身份验证的方法。您需要在Azure Entra ID中设置一个OAuth应用程序注册,该注册可以与Snowflake的外部OAuth安全集成链接。Snowflake的外部OAuth安全集成允许外部系统颁发访问令牌,Snowflake可以使用这些令牌来确定访问级别。在这种情况下,外部令牌提供者是Azure Entra ID。由于ChatGPT将连接到Azure而不是Snowflake,GPT用户的OAuth令牌将由Azure根据其在Entra ID中的用户进行配置。因此,您需要一种方法将Snowflake中的用户映射到Azure中的相应用户。

以下是Azure端和Snowflake端所需的所有步骤。

我们将设置一个新的应用注册,在Azure中配置将使用的必要Snowflake范围,并检索在Snowflake和ChatGPT中都将需要的所有OAuth配置参数。本节将全部在Azure中进行,以便在下一节中,您将拥有在Snowflake端配置时链接到此应用注册的必要信息。

  1. 导航到Microsoft Azure Portal并进行身份验证。
  2. 导航到 Azure Entra ID(以前称为 Active Directory)。
  3. 点击管理下的应用注册
  4. 点击新注册
  5. 输入 Snowflake GPT OAuth Client,或类似的值作为 名称
  6. 验证支持的账户类型设置为单租户
  7. 暂时忽略重定向URI。在配置GPT时,您将回到这里。
  8. 点击注册
  9. 记下Essentials下的Directory (tenant) ID (TENANT_ID)。你将使用它来生成你的AZURE_AD_ISSUERAZURE_AD_JWS_KEY_ENDPOINT
    • AZURE_AD_ISSUERhttps://sts.windows.net/TENANT_ID/
    • AZURE_AD_JWS_KEY_ENDPOINThttps://login.microsoftonline.com/TENANT_ID/discovery/v2.0/keys
  10. 点击概述界面中的端点
  11. 在右侧,注意OAuth 2.0 授权端点 (v2)AZURE_AD_OAUTH_AUTHORIZATION_ENDPOINTOAuth 2.0 令牌端点 (v2)AZURE_AD_OAUTH_TOKEN_ENDPOINT
    • 端点应类似于 https://login.microsoftonline.com/90288a9b-97df-4c6d-b025-95713f21cef9/oauth2/v2.0/authorizationhttps://login.microsoftonline.com/90288a9b-97df-4c6d-b025-95713f21cef9/oauth2/v2.0/token
  12. 点击管理下的**公开API**。
  13. 点击Application ID URI旁边的Set链接来设置Application ID URI
    • Application ID URI必须在您组织的目录中是唯一的,例如https://your.company.com/4d2a8c2b-a5f4-4b86-93ca-294185f45f2e。在后续的配置步骤中,此值将被称为
  14. 要为OAuth流程添加Snowflake角色作为OAuth范围,其中编程客户端代表用户操作,请点击添加范围以添加代表Snowflake角色的范围。
    • 通过使用session:scope:前缀输入Snowflake角色的名称来输入范围。例如,对于Snowflake Analyst角色,输入session:scope:analyst
    • 选择谁可以同意。
    • 输入范围的显示名称(例如:账户管理员)。
    • 输入范围的描述(例如:可以管理Snowflake账户)。
    • 点击添加范围
    • 将范围保存为AZURE_AD_SCOPE。它应该是您的应用程序ID URI范围名称的拼接。
  15. 概览部分,从应用程序(客户端)ID字段中复制ClientID。在接下来的步骤中,这将被称为OAUTH_CLIENT_ID
  16. 点击证书和机密,然后点击新建客户端机密
  17. 添加秘密的描述。
  18. 选择730天(24个月)。出于测试目的,请选择不会很快过期的密钥。
  19. 点击添加。复制密钥。在接下来的步骤中,这将被称为OAUTH_CLIENT_SECRET
  20. 对于将代表用户请求访问令牌的编程客户端,按以下方式配置应用程序的委派权限。
    • 点击API 权限
    • 点击添加权限
    • 点击我的 API
    • 点击您在在 Azure AD 中配置 OAuth 资源中创建的Snowflake OAuth 资源
    • 点击委派权限框。
    • 勾选与您希望授予此客户端的应用程序中定义的范围相关的权限。
    • 点击添加权限
    • 点击授予管理员同意按钮以授予客户端权限。请注意,出于测试目的,权限是这样配置的。然而,在生产环境中,不建议以这种方式授予权限。
    • 点击

在Azure Entra ID中完成应用注册后,下一步是通过外部OAuth安全集成将该应用注册链接到Snowflake。安全集成的external_oauth_audience_list参数必须与您在配置Azure Entra ID时指定的应用程序ID URI匹配。

发行者JWS密钥端点也将来自前面步骤中收集的值。用户映射属性可以设置为EMAIL_ADDRESSLOGIN_NAME,这是如何将用户的Microsoft登录凭证映射到他们在Snowflake中的用户,以确保Snowflake中的权限由颁发给ChatGPT的访问令牌所尊重。

CREATE OR REPLACE SECURITY INTEGRATION AZURE_OAUTH_INTEGRATION
  TYPE = EXTERNAL_OAUTH
  ENABLED = TRUE
  EXTERNAL_OAUTH_TYPE = 'AZURE'
  EXTERNAL_OAUTH_ISSUER = '<AZURE_AD_ISSUER>'
  EXTERNAL_OAUTH_JWS_KEYS_URL = '<AZURE_AD_JWS_KEY_ENDPOINT>'
  EXTERNAL_OAUTH_AUDIENCE_LIST = ('<SNOWFLAKE_APPLICATION_ID_URI>')
  EXTERNAL_OAUTH_TOKEN_USER_MAPPING_CLAIM = 'upn'
  EXTERNAL_OAUTH_SNOWFLAKE_USER_MAPPING_ATTRIBUTE = 'EMAIL_ADDRESS';

确保你在Azure环境中完成以下步骤:

  • Azure Portal 或 VS Code,具有创建 Azure Function Apps 和 Azure Entra 应用注册的权限
  • 本指南中有一个详细部分,涉及部署和设计所需的函数,以包装来自Snowflake的响应,以便将查询结果作为CSV返回给ChatGPT。Azure Function App允许您的GPT摄取更大的数据集,因为ChatGPT可以从文件响应中摄取更多数据,而不是从application/json有效负载中。此外,这些数据集仅适用于以CSV文件格式响应的数据分析(即代码解释器)。

现在我们已经创建了GPT并处理了Azure/Snowflake身份验证,我们可以创建Azure Function App本身来执行SQL查询并处理响应格式,使GPT能够将结果下载为CSV以用于数据分析。

请遵循此Azure Cookbook指南以获取部署Azure Function App的更多详细信息。下面您将找到要添加到函数中的示例代码。

此代码旨在提供方向性指导 - 虽然它应该可以开箱即用,但您应根据您的GPT和IT设置的具体需求进行定制。

您需要在您的 Azure 函数应用中设置以下流程:

  • 从HTTP请求中提取令牌并使用它连接到Snowflake
  • 执行SQL查询并将结果写入CSV
  • 暂时将该CSV存储在Blob Storage*中
  • 生成一个预签名的URL以安全地访问该CSV文件*
  • 响应一个openaiFileResponse

*如果您使用文件流选项而不是URL选项将文件返回给您的GPT,则可能不需要这些步骤。更多内容见下文。

确保您已安装并将必要的库导入到脚本中。除了Python标准库外,此示例脚本还利用了以下内容:

import azure.functions as func
from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions, ContentSettings
import snowflake.connector
import jwt    # pyjwt for token decoding

要连接到Snowflake,您需要从授权头中提取从Azure Entra ID分配的访问令牌,并在连接到Snowflake服务器时使用该令牌。

在这个例子中,Snowflake 用户名是电子邮件地址,这简化了从 HTTP 访问令牌中提取的 Entra ID 用户与连接所需的 Snowflake 用户 ID 的映射。如果您的组织不是这种情况,您可以在 Python 应用程序中将电子邮件地址映射到 Snowflake 用户 ID。

我的应用程序是为了与单个Snowflake账户(即ab12345.eastus2.azure)和仓库进行接口而构建的。如果您需要访问多个账户或仓库,您可以考虑在您的GPT操作参数中传递这些参数,以便您可以从HTTP请求中提取它们。

# Extract the token from the Authorization header
auth_header = req.headers.get('Authorization')
token_type, token = auth_header.split()

try:
    # Extract email address from token to use for Snowflake user mapping
    # If Snowflake usernames are not emails, then identify the username accordingly
    decoded_token = jwt.decode(token, options={"verify_signature": False})
    email = decoded_token.get('upn') 
    
    conn = snowflake.connector.connect(
        user=email, # Snowflake username, i.e., user's email in my example
        account=SNOWFLAKE_ACCOUNT, # Snowflake account, i.e., ab12345.eastus2.azure
        authenticator="oauth",
        token=token,
        warehouse=SNOWFLAKE_WAREHOUSE # Replace with Snowflake warehouse
    )
    logging.info("Successfully connected to Snowflake.")
except Exception as e:
    logging.error(f"Failed to connect to Snowflake: {e}")

一旦连接到Snowflake,您需要执行查询并将结果存储到CSV中。虽然Snowflake中的角色应该可以防止任何有害查询的可能性,但您可能希望在应用程序中清理您的查询(如下未包含),就像您处理任何其他程序化SQL查询执行一样。

# Extract SQL query from request parameters or body
sql_query = req.params.get('sql_query')

try:
    # Use the specified warehouse
    cursor = conn.cursor()

    # Execute the query
    cursor.execute(sql_query)
    results = cursor.fetchall()
    column_names = [desc[0] for desc in cursor.description]
    logger.info(f"Query executed successfully: {sql_query}")

    # Convert results to CSV
    csv_file_path = write_results_to_csv(results, column_names)
except Exception as e:
    logger.error(f"Error executing query or processing data: {e}")


def write_results_to_csv(results, column_names):
    try:
        # Create a temporary file
        temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w', newline='')
        csv_writer = csv.writer(temp_file)
        csv_writer.writerow(column_names)  # Write the column headers
        csv_writer.writerows(results)      # Write the data rows
        temp_file.close()  # Close the file to flush the contents
        return temp_file.name  # Return file path
    except Exception as e:
        logger.error(f"Error writing results to CSV: {e}")

有两种方法可以将文件返回给ChatGPT进行处理。您可以在openaiFileResponse列表响应中流式传输base64编码的数据以及mimeType和文件名,或者您可以返回URL列表。在本解决方案中,我们将重点介绍后者。

为此,您需要将CSV上传到Azure Blob Storage,并返回一个预签名URL,以便在ChatGPT中安全地访问该文件。需要注意的是,为了在ChatGPT中下载URL,您需要确保该URL包含content_type和content_disposition,如下例所示。如果您想检查URL是否具有必要的头信息,您可以在任何终端中使用curl -I

您需要按照这里的说明获取Azure存储桶的连接字符串。

def upload_csv_to_azure(file_path, container_name, blob_name, connect_str):
    try:
        # Create the BlobServiceClient object which will be used to create a container client
        blob_service_client = BlobServiceClient.from_connection_string(connect_str)
        
        # Create a blob client using the local file name as the name for the blob
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

        # Upload the file with specified content settings
        with open(file_path, "rb") as data:
            blob_client.upload_blob(data, overwrite=True, content_settings=ContentSettings(
                content_type='text/csv',
                content_disposition=f'attachment; filename="{blob_name}"'
            ))
        logger.info(f"Successfully uploaded {file_path} to {container_name}/{blob_name}")

        # Generate a SAS token for the blob
        sas_token = generate_blob_sas(
            account_name=blob_service_client.account_name,
            container_name=container_name,
            blob_name=blob_name,
            account_key=blob_service_client.credential.account_key,
            permission=BlobSasPermissions(read=True),
            expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=1)  # Token valid for 1 hour
        )

        # Generate a presigned URL using the SAS token
        url = f"https://{blob_service_client.account_name}.blob.core.windows.net/{container_name}/{blob_name}?{sas_token}"
        logger.info(f"Generated presigned URL: {url}")

        return url
    except Exception as e:
        logger.error(f"Error uploading file to Azure Blob Storage: {e}")
        raise

最后,您需要适当地格式化响应,以指示ChatGPT将该响应作为文件或一系列文件进行处理。openaiFileResponse是一个列表,最多可以包含10个URL(如果使用内联选项,则可以是base64编码)。

# Format the response so ChatGPT treats it as a file
response = {
    'openaiFileResponse': [csv_url]
}
cursor.close()
conn.close()
return func.HttpResponse(
    json.dumps(response), 
    status_code=200
)

这个应用程序有很多移动的部分,因此测试你的Azure Function App可能很重要。ChatGPT可能是一个困难的测试场地,因为请求和响应有时可能比调试所需的更加不透明。通过cURL或Postman从更受控的环境中调用HTTP请求来初始测试你的应用程序,将使你更容易调试和分类问题。一旦你确定在这些工具中响应是按预期返回的,你就可以开始构建你的GPT了。

一旦你创建了一个自定义GPT,可以在指令面板中使用下面的文本作为灵感。有问题吗?查看入门示例以更详细地了解这一步的工作原理。

重要的是,ChatGPT 理解您的表结构以正确形成 SQL 查询。有不同的方法可以做到这一点,本指令集代表了最直接的方式。我们正在努力发布针对您可能希望构建的不同版本的 Snowflake GPT 的额外指令,以便能够处理多个不同的表、模式和数据库,甚至能够动态学习那些随时间变化的模式。

以下是一些在使用单一模式和表时的基本说明。此GPT已针对单一用例(分析2013年1月从纽约出发的航班数据)进行了优化,这使得最简单的说明能够提供最可靠的GPT性能。

您是编写SQL查询以从Snowflake获取数据的专家。您帮助用户将他们的提示转换为SQL查询。任何关于航班数据的问题都将转换为一个针对FLIGHTS.PUBLIC.JAN_2013_NYC表的Snowflake SQL查询。将任何查询传递给"sql_query"参数。

表的模式包括

ID	NUMBER	A unique identifier for each flight
YEAR	NUMBER	The year of the flight
MONTH	NUMBER	The month of the flight
DAY		NUMBER	The day of the month on which the flight departed
DEP_TIME	NUMBER	The actual departure time of the flight
SCHED_DEP_TIME	NUMBER	The scheduled departure time of the flight
DEP_DELAY	NUMBER	The departure delay in minutes (negative values indicate early departures)
ARR_TIME	NUMBER	The actual arrival time of the flight
SCHED_ARR_TIME	NUMBER	The scheduled arrival time of the flight
ARR_DELAY	NUMBER	The arrival delay in minutes (negative values indicate early arrivals)
CARRIER_CODE	TEXT	The carrier code of the airline
FLIGHT	NUMBER	The flight number
TAILNUM	TEXT	The aircraft tail number
ORIGIN_AIRPORT_CODE	TEXT	The origin airport code
DEST_AIRPORT_CODE	TEXT	The destination airport code
AIR_TIME	NUMBER	The total airtime of the flight in minutes
DISTANCE	NUMBER	The distance traveled by the flight in miles
HOUR	NUMBER	The hour part of the scheduled departure time
MINUTE	NUMBER	The minute part of the scheduled departure time
TIME_HOUR	NUMBER	The time at which the flight departed (rounded to the nearest hour)
CARRIER_NAME	TEXT	The full name of the airline carrier
ORIGIN_AIRPORT_NAME	TEXT	The full name of the origin airport
ORIGIN_REGION	TEXT	The region code of the origin airport
ORIGIN_MUNICIPALITY	TEXT	The city where the origin airport is located
ORIGIN_COORDINATES	TEXT	The geographical coordinates of the origin airport
DEST_AIRPORT_NAME	TEXT	The full name of the destination airport
DEST_REGION	TEXT	The region code of the destination airport
DEST_MUNICIPALITY	TEXT	The city where the destination airport is located
DEST_COORDINATES	TEXT	The geographical coordinates of the destination airport

当用户请求关于航班的数据时,执行以下操作:

  1. 使用executeSQL操作向Azure函数端点发送POST请求
  2. 接收作为Action响应的一部分返回的文件。将其显示为电子表格
  3. 对文件进行分析并提供用户所需的必要信息

用户希望询问有关代码解释器中数据的问题,因此请使用它来从您提取的数据集中获取任何数据分析见解。

一旦你创建了一个自定义GPT,请在操作面板中复制以下文本,将占位符值替换为你的具体函数详细信息,并根据你在Azure函数应用中构建的任何额外输入更新你的参数。

有问题吗?查看入门示例以详细了解此步骤的工作原理。

openapi: 3.1.0
info:
  title: Snowflake GPT API
  description: API to execute SQL queries on Snowflake and get the results as a CSV file URL.
  version: 1.0.0
servers:
  - url: https://<server-name>.azurewebsites.net
    description: Azure Function App server running Snowflake integration application
paths:
  /api/<function_name>?code=<code>:
    post:
      operationId: executeSQL
      summary: Executes a SQL query on Snowflake and returns the result file URL as a CSV.
      requestBody:
        required: true
        content:
          application/json:
            schema:
              type: object
              properties:
                sql_query:
                  type: string
                  description: The SQL query to be executed on Snowflake.
              required:
                - sql_query
      responses:
        '200':
          description: Successfully executed the query.
          content:
            application/json:
              schema:
                type: object
                properties:
                  openaiFileResponse:
                    type: array
                    items:
                      type: string
                      format: uri
                    description: Array of URLs pointing to the result files.
        '401':
          description: Unauthorized. Missing or invalid authentication token.
        '400':
          description: Bad Request. The request was invalid or cannot be otherwise served.
        '500':
          description: Internal Server Error. An error occurred on the server.
components:
  schemas: {} 
  • 返回给ChatGPT的文件大小限制为10MB。如果返回的文件大于此大小,您的请求可能会失败。如果您发现遇到这些限制,请确保在SQL命令中包含LIMIT。
  • 为什么首先需要Azure Function App? ChatGPT的数据分析功能(又称代码解释器)依赖于一个与模型的上下文窗口分离的安全Python环境。目前,传递给数据分析的数据必须通过上传文件来完成。返回数据的GPT操作必须将该数据作为CSV或其他数据文件类型返回。为了通过GPT操作返回文件,响应必须包装在openaiFileResponse对象中。这需要自定义代码来正确格式化响应。
  • 我的公司使用的云服务提供商与Azure不同。 对于通过GPT操作将其他中间件功能连接到ChatGPT,请参考其他AWSGCP中间件手册。您可以使用本手册中讨论的概念来建议在构建中间件应用程序时的考虑事项,但将该中间件连接到Snowflake可能因不同的云服务提供商而有所不同。例如,Snowflake构建了一个专门用于与Azure Entra ID链接的外部OAuth集成。
  • 如何限制我的GPT可以访问的数据集? 限制ChatGPT在Snowflake中的访问范围可能很重要。有几种方法可以做到这一点:
    • Snowflake角色可以限制谁可以访问哪些表,并且将由Azure Entra ID提供的GPT用户的访问令牌所尊重
    • 在您的中间件函数中,您可以添加健全性检查,以验证访问的表是否被该应用程序批准
    • 您可能希望生成一个全新的数据库/仓库,专门用于与ChatGPT集成,并清除任何敏感信息,如PII。
  • Schema 调用了错误的仓库或数据集: 如果 ChatGPT 调用了错误的仓库或数据库,考虑更新您的指令,使其更明确地说明 (a) 应该调用哪个仓库/数据库,或者 (b) 在运行查询之前要求用户提供这些确切的详细信息。

有没有您希望我们优先考虑的集成?我们的集成中有错误吗?在我们的github上提交一个PR或问题,我们会看一看。