Title: Snowflake Locale: zh URL: https://sensorswave.com/docs/data-center/pipeline/destinations/snowflake/ Description: 配置 Snowflake 导出管道,将事件数据和用户数据定期同步到 Snowflake 数据仓库 Snowflake 连接器将 Sensors Wave 中的事件数据和用户数据定期批量同步到您的 Snowflake 数据仓库,帮助数据团队在熟悉的 SQL 环境中进行深度分析,并与其他业务数据进行联合查询。 典型使用场景: - 将用户行为事件同步到 Snowflake,与订单、CRM 数据进行 JOIN 分析 - 将用户画像数据导出,结合 dbt 建模构建数据集市 - 保留 Sensors Wave 中的历史事件数据,满足合规或长期分析需求 Snowflake 连接器支持两种导出类型: - **事件数据导出**:将用户行为事件增量同步到 `SENSORSWAVE_EVENTS` 表(默认,可在创建时自定义) - **用户数据导出**:将用户画像属性增量同步到 `SENSORSWAVE_USERS` 表(默认,可在创建时自定义) 每种导出类型需要分别创建一条数据流。 ## 前提条件 在 Sensors Wave 中创建数据流之前,需要在您的 Snowflake 账户中完成以下准备工作。建议为 Sensors Wave 创建专用的角色、用户和数据库,避免权限过大带来安全风险。 ### 第 1 步:创建虚拟仓库 如果已有可用的虚拟仓库,可跳过此步骤。 ```sql CREATE WAREHOUSE SENSORSWAVE_WH WITH WAREHOUSE_SIZE = 'XSMALL' AUTO_SUSPEND = 600 AUTO_RESUME = TRUE INITIALLY_SUSPENDED = TRUE; ``` ### 第 2 步:创建目标数据库和 Schema ```sql CREATE DATABASE SENSORSWAVE_DB; CREATE SCHEMA SENSORSWAVE_DB.SENSORSWAVE_SCHEMA; ``` ### 第 3 步:创建专用角色并授权 ```sql CREATE ROLE SENSORSWAVE_ROLE; GRANT USAGE ON WAREHOUSE SENSORSWAVE_WH TO ROLE SENSORSWAVE_ROLE; GRANT USAGE ON DATABASE SENSORSWAVE_DB TO ROLE SENSORSWAVE_ROLE; GRANT USAGE ON SCHEMA SENSORSWAVE_DB.SENSORSWAVE_SCHEMA TO ROLE SENSORSWAVE_ROLE; GRANT CREATE TABLE ON SCHEMA SENSORSWAVE_DB.SENSORSWAVE_SCHEMA TO ROLE SENSORSWAVE_ROLE; ``` ### 第 4 步:生成 RSA 密钥对 Snowflake 连接器使用密钥对认证(Key Pair Authentication),不支持用户名密码方式。在本地终端执行以下命令生成密钥对: ```bash # 生成 2048 位 RSA 私钥 openssl genrsa -out sensorswave_rsa_key.p8 208 # 提取公钥 openssl rsa -in sensorswave_rsa_key.p8 -pubout -out sensorswave_rsa_key.pub ``` 提取公钥内容(去除 PEM 头尾行和换行符)用于下一步: ```bash grep -v "PUBLIC KEY" sensorswave_rsa_key.pub | tr -d '\n' ``` > **提示**:如需为私钥设置密码保护,在生成命令中添加 `-aes256` 参数,例如:`openssl genrsa -aes256 -out sensorswave_rsa_key.p8 2048`。若设置了密码,需在「配置参考」的 `Private Key Passphrase` 字段中填写该密码。 ### 第 5 步:创建用户并配置公钥 ```sql CREATE USER SENSORSWAVE_USER DEFAULT_ROLE = SENSORSWAVE_ROLE DEFAULT_WAREHOUSE = SENSORSWAVE_WH RSA_PUBLIC_KEY = 'MIIBIjANBgkqh...'; -- 替换为您的公钥内容(不含头尾行) GRANT ROLE SENSORSWAVE_ROLE TO USER SENSORSWAVE_USER; ``` > **注意**:私钥文件(`sensorswave_rsa_key.p8`)是敏感凭证,请妥善保管。Sensors Wave 会对私钥进行加密存储,保存后不再回显。 ## 创建导出管道 完成 Snowflake 侧的准备工作后,在 Sensors Wave 中创建数据流。 1. 进入**数据中心 → 数据流**,点击**新建数据流**按钮。 2. 在弹出的对话框中,选择**导出数据流**,点击**下一步**。 3. 选择**数据源类型**: - **事件数据**:同步用户行为事件 - **用户数据**:同步用户信息与属性 4. 填写 Snowflake 连接配置(详见下方「配置参考」)。 5. 点击**测试连接**,等待连接验证通过。 6. 配置**执行频率**(详见下方「执行频率」)。 7. 如需仅同步特定事件,在**高级选项**中配置事件过滤(仅事件数据导出支持)。 8. 点击**保存**,数据流将立即启动,并自动在 Snowflake 中创建目标表。 ## 配置参考 ### 连接配置 | 字段 | 必填 | 说明 | 示例 | |------|------|------|------| | `Account URL` | 是 | Snowflake 账户 URL | `https://abc12345.us-east-1.snowflakecomputing.com` | | `User` | 是 | Snowflake 用户名 | `SENSORSWAVE_USER` | | `Private Key` | 是 | PEM 格式私钥内容(包含头尾行)| `-----BEGIN RSA PRIVATE KEY-----\n...` | | `Private Key Passphrase` | 否 | 加密私钥时设置的密码;未加密则留空 | — | | `Database` | 是 | 目标数据库名称 | `SENSORSWAVE_DB` | | `Warehouse` | 是 | 计算仓库名称 | `SENSORSWAVE_WH` | | `Schema` | 是 | 目标 Schema | `SENSORSWAVE_SCHEMA` | | `Table` | 是 | 目标表名,可自定义;默认为 `SENSORSWAVE_EVENTS`(事件)或 `SENSORSWAVE_USERS`(用户),保存后系统自动创建 | `SENSORSWAVE_EVENTS` | | `Role` | 否 | 使用的 Snowflake 角色;留空则使用用户默认角色 | `SENSORSWAVE_ROLE` | ### 执行频率 **间隔执行**:每隔固定时长(1–23 小时)触发,从数据流创建时刻开始计算。 | 选项 | 适用场景 | |------|---------| | 每 1 小时(默认) | 适合大多数分析场景 | | 每 2 小时 | 数据量较大时降低频率 | | 每 6 小时 | 日常增量同步 | | 每 12 小时 | 用户数据等更新频率较低的场景 | | 每 24 小时 | 每日批量导出 | **定时执行**:在指定时区的固定时刻触发,适合需要在业务低峰期执行的场景。选择时区和执行时刻后,系统将每天在该时刻触发一次。 ### 时区配置 定时执行模式下需指定时区。时区配置遵循以下优先级: > **用户配置时区 > 项目时区 > 系统默认时区(国内集群为 `Asia/Shanghai`,海外集群为 `UTC`)** 如果不配置时区,系统将使用项目级别的时区设置。 ## 数据模型 ### 事件表(默认 `SENSORSWAVE_EVENTS`) 事件数据导出到此表,采用追加写入(Append-only)模式。以下以默认表名为例。 | 列名 | 类型 | 说明 | |------|------|------| | `time` | `TIMESTAMP_NTZ` | 事件发生时间(SensorsWave 服务器时区的本地时间) | | `distinct_id` | `VARCHAR(128)` | 客户端用户唯一标识 | | `trace_id` | `VARCHAR(128)` | 请求追踪 ID | | `ssid` | `BIGINT` | 服务端用户唯一标识(SSID) | | `anon_id` | `VARCHAR(128)` | 匿名 ID | | `login_id` | `VARCHAR(128)` | 登录 ID | | `event` | `VARCHAR(128)` | 事件名称,如 `$pageview`、`purchase` | | `properties` | `VARIANT` | 事件属性 JSON | | `user_properties` | `VARIANT` | 事件触发时的用户属性快照 JSON | | `received_at` | `TIMESTAMP_NTZ` | 服务端接收时间(SensorsWave 服务器时区的本地时间) | 查询 `VARIANT` 类型属性的示例: ```sql -- 查询事件属性中的 order_id 字段 SELECT time, event, properties:order_id::STRING AS order_id, properties:total_amount::FLOAT AS total_amount FROM SENSORSWAVE_EVENTS WHERE event = 'purchase' AND time >= '2026-04-01' LIMIT 100; ``` ### 用户表(默认 `SENSORSWAVE_USERS`) 用户数据导出到此表,采用 MERGE 写入模式(按 `ssid` 匹配,存在则更新、不存在则插入)。 | 列名 | 类型 | 说明 | |------|------|------| | `ssid` | `BIGINT` | 服务端用户唯一标识,主键 | | `login_id` | `VARCHAR(128)` | 登录 ID | | `anon_id` | `VARCHAR(128)` | 匿名 ID | | `properties` | `VARIANT` | 用户属性 JSON,包含所有用户画像属性 | | `created_at` | `TIMESTAMP_NTZ` | 用户首次创建时间 | | `updated_at` | `TIMESTAMP_NTZ` | 用户属性最近更新时间 | 查询 `VARIANT` 类型用户属性的示例: ```sql -- 查询用户属性中的会员等级 SELECT ssid, login_id, properties:membership_level::STRING AS membership_level, updated_at FROM SENSORSWAVE_USERS WHERE properties:membership_level IS NOT NULL LIMIT 100; ``` ## 事件过滤 事件过滤仅适用于**事件数据导出**,支持以下两种模式: - **包含事件(白名单)**:只导出列表中指定的事件,其余事件不导出 - **排除事件(黑名单)**:导出所有事件,但跳过列表中指定的事件 在**高级选项**中选择过滤模式并填写事件名称列表(如 `$pageview`、`purchase`)。 > **注意**:事件过滤修改后,在当前同步窗口完成后的下一个窗口生效,不会重新处理已导出的数据。 ## 测试连接 点击**测试连接**后,系统将按以下步骤验证 Snowflake 配置: 1. **建立连接**:验证 `Account URL`、用户名和私钥是否有效 2. **执行 SQL**:验证用户具有基本查询权限 3. **检查仓库**:验证指定的 `Warehouse` 是否存在且用户有权限使用 4. **检查数据库**:验证指定的 `Database` 是否存在且用户有权限访问 5. **检查 Schema**:验证指定的 `Schema` 是否存在且用户有权限访问,并确认具有建表权限 每一步的验证结果会实时显示,方便快速定位问题。 ### 常见连接错误 | 错误提示 | 可能原因 | 解决方法 | |---------|---------|---------| | 连接建立失败 | `Account URL` 格式错误或私钥不匹配 | 检查 URL 格式,确认使用了正确的私钥文件 | | Warehouse 不存在 | 仓库名称拼写错误或仓库未创建 | 确认仓库名称大小写,在 Snowflake 中检查仓库是否存在 | | 无建表权限 | 角色缺少 `CREATE TABLE` 权限 | 执行 `GRANT CREATE TABLE ON SCHEMA ... TO ROLE ...` | | 私钥格式错误 | 粘贴私钥时格式被破坏 | 确认私钥包含完整的头尾行(`-----BEGIN RSA PRIVATE KEY-----`) | ## 管道管理与监控 ### 管道状态 数据流的生命周期状态如下: ``` 新建 → 运行中 → 已停止(不可逆) ``` > **注意**:停止数据流是不可逆操作。停止后,所有调度将终止,不再产生新的同步任务。如需恢复导出,必须重新创建数据流。历史运行记录在停止后仍可查看。 ### 查看运行记录 在数据流详情页面的**运行记录**标签页,可以查看每次同步任务的执行情况: - **状态**:运行中(Running)/ 成功(Success)/ 失败(Failed) - **数据范围**:本次同步覆盖的时间窗口 - **导出行数**:本次成功写入 Snowflake 的行数 - **执行时间**:任务开始和结束时间 - **日志**:点击单条记录可展开查看详细执行日志 ### 查看统计指标 **指标**标签页展示数据流的整体运行状态(默认最近 30 天): - 成功 / 失败运行次数趋势图 - 每日导出行数趋势图 - 支持自定义时间范围筛选 ## 历史数据回填 ### 什么是回填 数据流创建后,增量同步仅处理从创建时刻起的新数据。如果需要将创建时刻之前的历史数据也导入 Snowflake,需要使用回填(Backfill)功能。 ### 操作步骤 1. 进入数据流详情页,切换到**回填**标签页。 2. 点击**发起回填**,选择需要补充的日期范围(开始日期和结束日期)。 3. 确认预计执行的窗口数后,点击**确认执行**。 回填使用项目时区切分自然日窗口,串行执行。 ### 回填与增量同步的关系 - 回填运行期间,该数据流的增量定时同步会**自动暂停**,不会丢失数据——回填完成或取消后,增量同步从上次的位置继续 - 同一数据流同时只允许一个回填任务在运行 - 回填中断(如服务重启)后,系统自动从上次完成的窗口继续,已完成的窗口不会重复执行 ### 取消回填 在回填进行中,可点击**取消**终止当前回填。取消操作在当前窗口执行完成后生效,不会中断正在进行的窗口。 ## 投递语义与数据一致性 | 数据类型 | 写入方式 | 投递语义 | |---------|---------|---------| | 事件数据 | `COPY INTO`(追加) | at-least-once | | 用户数据 | `MERGE INTO`(按 `ssid` 匹配) | exactly-once | ## 时区处理 ### 时间列的时区 Snowflake 中所有时间列(`time`、`received_at`、`created_at`、`updated_at`)使用 `TIMESTAMP_NTZ` 类型(不含时区信息),存储的是**SensorsWave 服务器时区的本地时间**。国内集群为 `Asia/Shanghai`(UTC+8),海外集群为 `UTC`。 **示例(国内集群)**:服务端时区为 `Asia/Shanghai`(UTC+8),某事件发生在北京时间 2026-04-13 15:30:00,则 Snowflake 中存储的值为 `2026-04-13 15:30:00`。 ### 时区影响的配置项 | 配置项 | 时区影响 | |-------|---------| | 定时执行的触发时刻 | 按配置的时区计算(如配置 `Asia/Shanghai` 08:00,则每天北京时间 8 点触发) | | 回填日期范围 | 使用项目时区的自然日(0:00–24:00)切分窗口 | | Snowflake 中的时间列值 | 固定为SensorsWave 服务器时区,与用户配置时区无关 | ## 注意事项 - **停止管道不可逆**:停止后需重新创建数据流,历史运行记录保留 - **私钥格式要求**:`Private Key` 必须为完整的 PEM 格式,包含 `-----BEGIN RSA PRIVATE KEY-----` 和 `-----END RSA PRIVATE KEY-----` 头尾行 - **目标表自动创建**:保存数据流时,系统自动在 Snowflake 中创建目标表;无需提前手动建表 - **回填期间增量暂停**:回填运行期间定时增量同步暂停,回填完成后自动恢复,数据不丢失 - **时间列为本地时间**:Snowflake 中的时间列存储SensorsWave 服务器时区的本地时间,不是 UTC;跨时区分析时注意换算 - **事件重复行**:`SENSORSWAVE_EVENTS` 原始表中可能出现少量重复行。 **发生原因**:增量同步从实时数据流读取,回填从历史存储读取。两个数据源各自独立,在时间边界处无法精确对齐,因此回填覆盖的时间范围如果与增量同步已处理的范围存在交集,同一条事件可能被写入两次。用户数据通过 MERGE 写入,不存在此问题。 **影响范围**:仅影响事件表,且仅在回填覆盖范围与已增量同步的范围重叠时发生。 **建议**:建议在 Snowflake 中创建去重视图,作为日常查询的默认入口: ```sql CREATE VIEW SENSORSWAVE_EVENTS_DEDUP AS SELECT * EXCLUDE (rn) FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY time, event, distinct_id, trace_id ORDER BY received_at DESC NULLS LAST ) AS rn FROM SENSORSWAVE_EVENTS ) WHERE rn = 1; ``` ## 常见问题 **Q:测试连接失败,提示「无建表权限」,如何处理?** 检查是否已执行 `GRANT CREATE TABLE ON SCHEMA ... TO ROLE ...`,并确认数据流使用的角色与授权的角色一致。 **Q:数据流已在「运行中」,能否修改连接配置或事件过滤?** 支持修改。进入数据流详情,点击**编辑**修改配置。修改在当前同步窗口完成后的下一个窗口生效;调度频率修改后立即生效。 **Q:回填数据和增量同步数据在事件表中有重复怎么办?** 使用上方「注意事项」中的去重视图 `SENSORSWAVE_EVENTS_DEDUP` 进行日常查询,或按 `time`、`event`、`distinct_id`、`trace_id` 四个字段组合使用 `ROW_NUMBER()` 去重,保留 `received_at` 最新的记录。 **Q:`properties` 和 `user_properties` 中的 `VARIANT` 数据如何查询?** 使用 Snowflake 的冒号(`:`)路径语法访问嵌套字段,并通过 `::TYPE` 进行类型转换: ```sql -- 访问 properties 中的 page_url 字段(字符串类型) SELECT properties:page_url::STRING FROM SENSORSWAVE_EVENTS; -- 访问数值类型字段 SELECT properties:item_count::INT FROM SENSORSWAVE_EVENTS; ``` **Q:Snowflake 中的 `time` 列是什么时区?** `time` 列使用 `TIMESTAMP_NTZ` 类型,存储的是SensorsWave 服务器时区的本地时间,不含时区信息。国内集群为 `Asia/Shanghai`(UTC+8),海外集群为 `UTC`。如需转换为其他时区,请根据您的集群实际时区调整,例如(国内集群): ```sql CONVERT_TIMEZONE('Asia/Shanghai', 'America/New_York', time) ``` --- **最后更新时间**:2026 年 4 月 13 日