2024年8月14日

GPT 操作库 - Snowflake 中间件

本指南详细介绍了如何将ChatGPT与Snowflake数据仓库连接,目的是向ChatGPT返回SQL查询以用于数据分析。该GPT需要一个与中间件(如Azure函数)交互的动作,以便该动作能正确格式化来自Snowflake的响应,供Python笔记本环境使用。数据必须以文件形式返回,因此中间件函数应将SQL响应转换为CSV/Excel文件,大小不超过10MB。

本文档将概述中间件函数GPT操作。有关设置中间件函数本身的信息,请参阅GPT Actions库(中间件) - Azure Functions。您可以将这个Snowflake中间件操作与直接访问Snowflake的操作结合使用,从而创建一个能够在执行SQL查询前构建并测试查询的GPT。

现有Snowflake客户可以利用这些指南从其数据仓库查询数据,并将这些数据加载到Python数据分析环境中以获取更深入的洞察。这支持了由ChatGPT驱动的分析功能,例如可视化数据集、识别模式/异常,或发现数据清洗的缺口。该GPT可用于基于相对较小的数据集推动业务决策,或通过AI探索数据子集以生成假设,同时您在BI工具中探索整体数据集,从而节省时间和成本,并识别出以往未发现的模式。

在开始之前,请确保您已在应用程序环境中完成以下步骤:

  • 配置一个Snowflake数据仓库
  • 确保通过ChatGPT验证进入Snowflake的用户拥有必要的角色权限,可以访问数据库、模式和表

此外,在Azure Function App中创建应用程序之前,您需要一种处理用户身份验证的方法。您需要在Azure Entra ID中设置一个OAuth应用注册,该注册可以与Snowflake的外部OAuth安全集成相关联。Snowflake的外部OAuth安全集成允许外部系统颁发访问令牌,Snowflake可以使用这些令牌来确定访问级别。在这种情况下,外部令牌提供者是Azure Entra ID。由于ChatGPT将连接到Azure而不是Snowflake,GPT用户的OAuth令牌将由Azure根据其在Entra ID中的关联用户进行配置。因此,您需要一种方法将Snowflake中的用户映射到Azure中的相应用户。

以下是Azure端和Snowflake端所需的所有必要步骤。

我们将创建一个新的应用注册,在Azure中配置将使用的必要Snowflake权限范围,并获取在Snowflake和ChatGPT中所需的所有OAuth配置参数。本节内容都将在Azure中完成,以便在下一节中,当在Snowflake端进行配置时,您将拥有链接到此应用注册所需的必要信息。

  1. 导航至Microsoft Azure Portal并进行身份验证。
  2. 导航到Azure Entra ID(原Active Directory)。
  3. 管理下点击应用注册
  4. 点击新注册
  5. 输入Snowflake GPT OAuth Client或类似值作为名称
  6. 确认支持的账户类型设置为单租户
  7. 暂时忽略重定向URI。在配置您的GPT时,您会回来处理这个
  8. 点击注册
  9. Note down the Directory (tenant) ID (TENANT_ID) under Essentials. You will use this to generate your AZURE_AD_ISSUER and AZURE_AD_JWS_KEY_ENDPOINT.
    • AZURE_AD_ISSUERhttps://sts.windows.net/TENANT_ID/
    • AZURE_AD_JWS_KEY_ENDPOINThttps://login.microsoftonline.com/TENANT_ID/discovery/v2.0/keys
  10. 概览界面中点击端点
  11. On the right-hand side, note the OAuth 2.0 authorization endpoint (v2) as the AZURE_AD_OAUTH_AUTHORIZATION_ENDPOINT and OAuth 2.0 token endpoint (v2) as the AZURE_AD_OAUTH_TOKEN_ENDPOINT.
    • 端点应该类似于 https://login.microsoftonline.com/90288a9b-97df-4c6d-b025-95713f21cef9/oauth2/v2.0/authorizationhttps://login.microsoftonline.com/90288a9b-97df-4c6d-b025-95713f21cef9/oauth2/v2.0/token
  12. 管理下点击**公开API**。
  13. Click on the Set link next to Application ID URI to set the Application ID URI.
    • Application ID URI 必须在您组织的目录中保持唯一,例如 https://your.company.com/4d2a8c2b-a5f4-4b86-93ca-294185f45f2e。在后续配置步骤中,该值将被引用为
  14. To add a Snowflake Role as an OAuth scope for OAuth flows where the programmatic client acts on behalf of a user, click on Add a scope to add a scope representing the Snowflake role.
    • 输入作用域时,使用Snowflake角色名称并添加session:scope:前缀。例如,对于Snowflake Analyst角色,输入session:scope:analyst
    • 选择谁可以同意。
    • 输入范围的显示名称(例如:账户管理员)。
    • 输入范围的描述(例如:可以管理Snowflake账户)。
    • 点击添加范围
    • 将作用域保存为AZURE_AD_SCOPE。它应该是您的应用程序ID URI作用域名称的连接
  15. 概述部分,从应用程序(客户端) ID字段复制ClientID。在后续步骤中,这将被称为OAUTH_CLIENT_ID
  16. 点击证书和机密,然后点击新建客户端机密
  17. 添加秘密的描述。
  18. 选择730天(24个月)。出于测试目的,请选择不会很快过期的密钥。
  19. 点击添加。复制密钥。在后续步骤中,这将被称为OAUTH_CLIENT_SECRET
  20. For programmatic clients that will request an Access Token on behalf of a user, configure Delegated permissions for Applications as follows.
    • 点击API权限
    • 点击添加权限
    • 点击我的API
    • 点击你在在Azure AD中配置OAuth资源中创建的Snowflake OAuth资源
    • 点击委托权限框。
    • 检查与您希望授予此客户端的应用程序中定义的范围相关的权限。
    • 点击添加权限
    • 点击授予管理员同意按钮,向客户端授予权限。请注意,出于测试目的,权限是这样配置的。但在生产环境中,不建议以这种方式授予权限。
    • 点击

在Azure Entra ID中完成应用注册后,下一步是通过外部OAuth安全集成将该应用注册与Snowflake关联。安全集成的external_oauth_audience_list参数必须与配置Azure Entra ID时指定的应用程序ID URI相匹配。

Issuer(颁发者)和JWS Keys endpoint(JWS密钥端点)也将来自前几个步骤中收集的值。User Mapping Attribute(用户映射属性)可以设置为EMAIL_ADDRESSLOGIN_NAME,这将决定如何将用户的Microsoft登录凭证映射到他们在Snowflake中的用户,以确保颁发给ChatGPT的访问令牌能够遵循Snowflake中的权限设置。

CREATE OR REPLACE SECURITY INTEGRATION AZURE_OAUTH_INTEGRATION
  TYPE = EXTERNAL_OAUTH
  ENABLED = TRUE
  EXTERNAL_OAUTH_TYPE = 'AZURE'
  EXTERNAL_OAUTH_ISSUER = '<AZURE_AD_ISSUER>'
  EXTERNAL_OAUTH_JWS_KEYS_URL = '<AZURE_AD_JWS_KEY_ENDPOINT>'
  EXTERNAL_OAUTH_AUDIENCE_LIST = ('<SNOWFLAKE_APPLICATION_ID_URI>')
  EXTERNAL_OAUTH_TOKEN_USER_MAPPING_CLAIM = 'upn'
  EXTERNAL_OAUTH_SNOWFLAKE_USER_MAPPING_ATTRIBUTE = 'EMAIL_ADDRESS';

确保在您的Azure环境中完成以下步骤:

  • Azure Portal 或 VS Code,需具备创建 Azure Function Apps 和 Azure Entra 应用注册的权限
  • 本指南中有一个详细章节涉及部署和设计所需函数,用于封装Snowflake的响应以便将查询结果以CSV格式返回给ChatGPT。Azure Function App允许您的GPT处理更大数据集,因为ChatGPT从文件响应中能比从application/json负载中获取更多数据。此外,这些数据集仅适用于数据分析(即代码解释器),且响应需格式化为CSV文件。

现在我们已经创建了GPT并处理了Azure/Snowflake身份验证,接下来可以创建Azure Function App本身来执行SQL查询并处理响应格式,使GPT能够将结果下载为CSV文件用于数据分析。

有关部署Azure函数应用的更多详细信息,请参阅Azure Cookbook指南。下方您将找到可添加到函数中的示例代码。

这段代码旨在提供方向性指导 - 虽然它应该可以直接使用,但您应根据您的GPT和IT设置的具体需求进行定制。

你需要在Azure Function App中设置以下流程:

  • 从HTTP请求中提取令牌并使用它连接到Snowflake
  • 执行SQL查询并将结果写入CSV文件
  • 将该CSV临时存储在Blob Storage*中
  • 生成一个预签名URL以安全访问该CSV文件*
  • 响应返回一个openaiFileResponse

*如果使用file stream选项而非url选项将文件返回给您的GPT,可能不需要这些步骤。更多详情见下文。

确保已安装必要的库并将其导入脚本中。除了Python标准库外,此示例脚本还使用了以下内容:

import azure.functions as func
from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions, ContentSettings
import snowflake.connector
import jwt    # pyjwt for token decoding

要连接到Snowflake,您需要从授权标头中提取Azure Entra ID分配的访问令牌,并在连接到Snowflake服务器时使用该令牌。

在这个示例中,Snowflake用户名是电子邮件地址,这简化了从HTTP访问令牌提取的Entra ID用户与连接所需的Snowflake用户ID之间的映射。如果您的组织不符合这种情况,可以在Python应用程序中将电子邮件地址映射到Snowflake用户ID。

我的应用程序设计为与单个Snowflake账户(例如ab12345.eastus2.azure)和数据仓库对接。如果您需要访问多个账户或仓库,可以考虑在GPT操作参数中传递这些参数,以便从HTTP请求中提取它们。

# Extract the token from the Authorization header
auth_header = req.headers.get('Authorization')
token_type, token = auth_header.split()

try:
    # Extract email address from token to use for Snowflake user mapping
    # If Snowflake usernames are not emails, then identify the username accordingly
    decoded_token = jwt.decode(token, options={"verify_signature": False})
    email = decoded_token.get('upn') 
    
    conn = snowflake.connector.connect(
        user=email, # Snowflake username, i.e., user's email in my example
        account=SNOWFLAKE_ACCOUNT, # Snowflake account, i.e., ab12345.eastus2.azure
        authenticator="oauth",
        token=token,
        warehouse=SNOWFLAKE_WAREHOUSE # Replace with Snowflake warehouse
    )
    logging.info("Successfully connected to Snowflake.")
except Exception as e:
    logging.error(f"Failed to connect to Snowflake: {e}")

连接到Snowflake后,您需要执行查询并将结果存储为CSV文件。虽然Snowflake中的角色设置应能防止任何有害查询的可能性,但您可能仍希望在应用程序中对查询进行清理(如下未包含),就像处理其他编程式SQL查询执行一样。

# Extract SQL query from request parameters or body
sql_query = req.params.get('sql_query')

try:
    # Use the specified warehouse
    cursor = conn.cursor()

    # Execute the query
    cursor.execute(sql_query)
    results = cursor.fetchall()
    column_names = [desc[0] for desc in cursor.description]
    logger.info(f"Query executed successfully: {sql_query}")

    # Convert results to CSV
    csv_file_path = write_results_to_csv(results, column_names)
except Exception as e:
    logger.error(f"Error executing query or processing data: {e}")


def write_results_to_csv(results, column_names):
    try:
        # Create a temporary file
        temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w', newline='')
        csv_writer = csv.writer(temp_file)
        csv_writer.writerow(column_names)  # Write the column headers
        csv_writer.writerows(results)      # Write the data rows
        temp_file.close()  # Close the file to flush the contents
        return temp_file.name  # Return file path
    except Exception as e:
        logger.error(f"Error writing results to CSV: {e}")

有两种方法可以将文件返回给ChatGPT进行处理。您可以在openaiFileResponse列表响应中流式传输带有mimeType和文件名的base64编码数据,或者返回一个URL列表。在本解决方案中,我们将重点讨论后者。

为此,您需要将CSV文件上传至Azure Blob存储,并返回一个预签名URL以便在ChatGPT中安全访问该文件。需要注意的是,要在ChatGPT中下载URL,您必须确保该URL包含content_type和content_disposition参数,如下例所示。若想检查某个URL是否包含必要的头部信息,您可以在任何终端中使用curl -I 命令。

你需要按照此处的说明获取Azure存储桶的连接字符串。

def upload_csv_to_azure(file_path, container_name, blob_name, connect_str):
    try:
        # Create the BlobServiceClient object which will be used to create a container client
        blob_service_client = BlobServiceClient.from_connection_string(connect_str)
        
        # Create a blob client using the local file name as the name for the blob
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

        # Upload the file with specified content settings
        with open(file_path, "rb") as data:
            blob_client.upload_blob(data, overwrite=True, content_settings=ContentSettings(
                content_type='text/csv',
                content_disposition=f'attachment; filename="{blob_name}"'
            ))
        logger.info(f"Successfully uploaded {file_path} to {container_name}/{blob_name}")

        # Generate a SAS token for the blob
        sas_token = generate_blob_sas(
            account_name=blob_service_client.account_name,
            container_name=container_name,
            blob_name=blob_name,
            account_key=blob_service_client.credential.account_key,
            permission=BlobSasPermissions(read=True),
            expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=1)  # Token valid for 1 hour
        )

        # Generate a presigned URL using the SAS token
        url = f"https://{blob_service_client.account_name}.blob.core.windows.net/{container_name}/{blob_name}?{sas_token}"
        logger.info(f"Generated presigned URL: {url}")

        return url
    except Exception as e:
        logger.error(f"Error uploading file to Azure Blob Storage: {e}")
        raise

最后,您需要适当格式化响应,以指示ChatGPT将该响应作为文件或一系列文件进行处理。openaiFileResponse是一个列表,最多可包含10个URL(如果使用inline选项,则可以是base64编码)。

# Format the response so ChatGPT treats it as a file
response = {
    'openaiFileResponse': [csv_url]
}
cursor.close()
conn.close()
return func.HttpResponse(
    json.dumps(response), 
    status_code=200
)

该应用程序涉及许多动态组件,因此测试您的Azure Function App可能非常重要。鉴于ChatGPT的请求和响应有时可能不够透明,不利于调试,它可能是一个较难使用的测试环境。通过cURL或Postman从更可控的环境发起HTTP请求来对应用程序进行初始测试,将使您更容易调试和排查问题。一旦确认这些工具中的响应符合预期,您就可以开始构建GPT了。

创建自定义GPT后,可以参考以下文本作为指令面板的灵感。有问题吗?查看入门示例了解该步骤的详细操作。

为了让ChatGPT正确构建SQL查询,理解您的表结构非常重要。实现这一目标有多种方法,本指令集代表了最直接的途径。我们正在努力发布更多针对不同版本Snowflake GPT的指导说明,这些说明适用于您可能需要构建的、用于处理多个不同表、模式和数据库的场景,甚至能够动态学习那些随时间变化的模式结构。

以下是处理单一模式和表时的一些基本说明。此GPT已针对单一用例(分析2013年1月纽约出发的航班数据)进行了优化,这使得最简单的指令即可提供最可靠的GPT性能。

您是编写SQL查询从Snowflake获取数据的专家。您帮助用户将他们的提示转换为SQL查询。任何关于航班数据的问题都将转换为针对FLIGHTS.PUBLIC.JAN_2013_NYC表的Snowflake SQL查询。请将所有查询传入"sql_query"参数

该表的架构包括

ID	NUMBER	A unique identifier for each flight
YEAR	NUMBER	The year of the flight
MONTH	NUMBER	The month of the flight
DAY		NUMBER	The day of the month on which the flight departed
DEP_TIME	NUMBER	The actual departure time of the flight
SCHED_DEP_TIME	NUMBER	The scheduled departure time of the flight
DEP_DELAY	NUMBER	The departure delay in minutes (negative values indicate early departures)
ARR_TIME	NUMBER	The actual arrival time of the flight
SCHED_ARR_TIME	NUMBER	The scheduled arrival time of the flight
ARR_DELAY	NUMBER	The arrival delay in minutes (negative values indicate early arrivals)
CARRIER_CODE	TEXT	The carrier code of the airline
FLIGHT	NUMBER	The flight number
TAILNUM	TEXT	The aircraft tail number
ORIGIN_AIRPORT_CODE	TEXT	The origin airport code
DEST_AIRPORT_CODE	TEXT	The destination airport code
AIR_TIME	NUMBER	The total airtime of the flight in minutes
DISTANCE	NUMBER	The distance traveled by the flight in miles
HOUR	NUMBER	The hour part of the scheduled departure time
MINUTE	NUMBER	The minute part of the scheduled departure time
TIME_HOUR	NUMBER	The time at which the flight departed (rounded to the nearest hour)
CARRIER_NAME	TEXT	The full name of the airline carrier
ORIGIN_AIRPORT_NAME	TEXT	The full name of the origin airport
ORIGIN_REGION	TEXT	The region code of the origin airport
ORIGIN_MUNICIPALITY	TEXT	The city where the origin airport is located
ORIGIN_COORDINATES	TEXT	The geographical coordinates of the origin airport
DEST_AIRPORT_NAME	TEXT	The full name of the destination airport
DEST_REGION	TEXT	The region code of the destination airport
DEST_MUNICIPALITY	TEXT	The city where the destination airport is located
DEST_COORDINATES	TEXT	The geographical coordinates of the destination airport

当用户询问航班相关数据时,请执行以下操作:

  1. 使用executeSQL操作向Azure函数端点发送POST请求
  2. 接收作为Action响应返回的文件,并将其显示为电子表格
  3. 对文件进行分析,并提供用户所请求的必要信息

用户希望询问有关代码解释器中数据的问题,因此请利用您提取的数据集进行任何数据分析洞察。

创建自定义GPT后,请将以下文本复制到操作面板中,将占位值替换为您的具体函数详情,并根据您在Azure Function App中构建的任何额外输入更新参数。

有问题吗?查看入门示例了解此步骤的详细工作原理。

openapi: 3.1.0
info:
  title: Snowflake GPT API
  description: API to execute SQL queries on Snowflake and get the results as a CSV file URL.
  version: 1.0.0
servers:
  - url: https://<server-name>.azurewebsites.net
    description: Azure Function App server running Snowflake integration application
paths:
  /api/<function_name>?code=<code>:
    post:
      operationId: executeSQL
      summary: Executes a SQL query on Snowflake and returns the result file URL as a CSV.
      requestBody:
        required: true
        content:
          application/json:
            schema:
              type: object
              properties:
                sql_query:
                  type: string
                  description: The SQL query to be executed on Snowflake.
              required:
                - sql_query
      responses:
        '200':
          description: Successfully executed the query.
          content:
            application/json:
              schema:
                type: object
                properties:
                  openaiFileResponse:
                    type: array
                    items:
                      type: string
                      format: uri
                    description: Array of URLs pointing to the result files.
        '401':
          description: Unauthorized. Missing or invalid authentication token.
        '400':
          description: Bad Request. The request was invalid or cannot be otherwise served.
        '500':
          description: Internal Server Error. An error occurred on the server.
components:
  schemas: {} 
  • 返回给ChatGPT的文件大小限制为10MB。如果返回的文件超过此大小,您的请求可能会失败。如果您发现遇到这些限制,请确保在SQL命令中包含LIMIT限制。
  • 为什么首先需要Azure Function App? ChatGPT的数据分析功能(又称代码解释器)依赖于一个与模型上下文窗口隔离的安全Python环境。目前传递给数据分析的数据必须通过上传文件的方式完成。GPT操作返回数据时,必须以CSV或其他数据文件类型返回。为了通过GPT操作返回文件,响应必须封装在openaiFileResponse对象中。这需要自定义代码来正确格式化响应。
  • 我的公司使用的云服务提供商不是Azure。 如需通过GPT操作将其他中间件功能连接到ChatGPT,请参考其他AWSGCP中间件操作指南。您可以使用本指南中讨论的概念来建议构建中间件应用时的注意事项,但将该中间件连接到Snowflake可能因云服务提供商而异。例如,Snowflake专门构建了与Azure Entra ID链接的外部OAuth集成。
  • How do I limit the datasets that my GPT has access to? It can be imporant to limit the scope of access ChatGPT has within Snowflake. There are a few ways to do this:
    • Snowflake角色可以限制用户对哪些表的访问权限,并且Azure Entra ID提供的GPT用户访问令牌会遵守这些限制
    • 在您的中间件函数中,可以添加健全性检查来验证访问的表是否已获得该应用程序的批准
    • 您可能需要生成一个全新的数据库/数据仓库,专门用于与ChatGPT集成,并清除所有敏感信息,如个人身份信息(PII)。
  • Schema调用了错误的仓库或数据集: 如果ChatGPT调用了错误的仓库或数据库,建议更新您的指令,使其更明确地指出(a)应该调用哪个仓库/数据库,或者(b)要求在运行查询前用户必须提供这些确切细节

是否有您希望我们优先考虑的集成方案?我们的集成是否存在错误?请在GitHub上提交PR或问题,我们会尽快查看。