开发者生态
morning
探索 Snowflake 与 Postgres 之间的双向数据流动模式 | 技术实践
2026-06-23
1 阅读
Tom Manfredi
2026年,智能体将在企业级应用中取得哪些突破? 点击下载《2026年AI与数据发展预测》白皮书,获悉专家一手前瞻,先拥抱新的工作方式!现代数据架构越来越要求事务型与分析工作型负载能够无缝共存。PostgreSQL仍然是事务型的核心基础设施——支撑电商订单处理、实时库存系统以及面向客户的API——而Snowflake则将所有分析型数据与AI封装在一起的基础平台。提出的挑战:如何让数据在这两个系统之间稳定流动,同时将延迟与运维开销降到最低。历史上,要打通 OLTP 与 OLAP 系统,需要整合外部 ETL 工具、管理云存储桶、配置 IAM 角色,并维护脆弱的 CDC 数据管道。团队花在基础设施“打通”上的时间,往往超过数据中产生价值的时间。本文的目标是探索如何利用 Snowflake的一些最新的创新能力,在 Postgres 与 Snowflake 之间支持五种关键的数据流动模式。如下所示: 有哪些变化? 近期 Snowflake 的一些产品能力显着缓解了这个问题: Snowflake Postgres(PuPr):一种完全托管的 PostgreSQL 服务,手动运行在 Snowflake 生态系统中。它消除了外部 PostgreSQL 托管的需求,同时与 Snowflake 数据平台实现了群集成;pg_lake(PuPr):PostgreSQL的扩展能力,允许在 Postgres 内部直接创建 Apache Iceberg 表。在 Postgres 中写入的数据,可以通过共享 Iceberg 元数据被 Snowflake 直接查询——消耗文件导出、消耗中转储、消耗 ETL 管道;pg_incremental(PuPr):用于调度式增量同步的扩展组件。与 pg_lake 结合使用时,可以实现轻量级 CDC,只同步更新发生的数据行;Snowflake 托管 Iceberg存储(PuPr):Postgres 管理的 Iceberg 使用 Snowflake 内部存储,并通过托管依赖访问。内部 S3、内部 IAM、内部存储集成配置;Openflow(正式发布):Snowflake 的托管数据集成平台(基于 Apache NiFi 构建),提供预置的 CDC 连接能力,包括基于 PostgreSQL WAL 的更新集群,以及通过 Snowpipe Streaming 进行数据传输。这些能力共同构建了一个完整的数据流动能力体系——从简单的批量加载,到实时CDC——全部在一个统一平台内完成。这里中的所有模式都使用ORDERS作为示例数据源。该表模拟典型电商订单生命周期,约28000行数据。本例中数据来源于SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.Orders。CREATE TABLE cdc_demo.orders ( order_id BIGINT PRIMARY KEY、customer_id BIGINT、order_status VARCHAR(1)、total_price DECIMAL(15,2)、order_date DATE、order_priority VARCHAR(15)、店员 VARCHAR(15)、ship_priority INTEGER、comment VARCHAR(79)、created_at TIMESTAMP DEFAULT NOW()、updated_at TIMESTAMP DEFAULT NOW() ); 模式1:批量数据流动——Postgres到Snowflake业务场景:某零售公司在PostgreSQL电商系统中全天处理订单。每天晚上,分析团队在Snowflake中获取所有订单需要的完整快照,用于报表分析、需求预测以及财务对账;技术方案:在Postgres中创建Iceberg表。“USING Iceberg”子句允许pg_lake在Postgres内写入并创建Iceberg表。然后通过INSERT/SELECT将订单数据写入该表。在 Snowflake 侧,创建目录集成(Catalog Integration)后再创建 Iceberg 表。这些操作属于元数据层操作,不涉及实际数据搬运。当 Iceberg 表创建完成后,即可在 Snowflake 中直接查询。步骤 1:在 Postgres 中启用 pg_lake -- 连接到 Snowflake Postgres 实例 CREATE EXTENSION IF NOT EXISTS pg_lake CASCADE; 步骤 2:创建 Iceberg表并批量加载数据 -- 创建 Iceberg 表并将所有订单批量加载到其中 CREATE TABLE cdc_demo.orders_iceberg ( order_id BIGINT, customer_id BIGINT, order_status VARCHAR(1),total_price DECIMAL(15,2), order_date DATE, order_priority VARCHAR(15), clerk VARCHAR(15), Ship_priority INTEGER, comment VARCHAR(79),created_atTIMESTAMP,updated_atTIMESTAMP)USINGiceberg; --从源表批量加载 INSERT INTO cdc_demo.orders_iceberg SELECT * FROM cdc_demo.orders; 步骤3:在Snowflake中创建目录集成 --在Snowflake中:创建一个指向Postgres实例的目录集成 CREATE OR REPLACE CATALOG INTEGRATION pg_orders_catalog CATALOG_SOURCE = SNOWFLAKE_POSTGRES TABLE_FORMAT = ICEBERG CATALOG_NAMESPACE = 'cdc_demo' REST_CONFIG = ( POSTGRES_INSTANCE = 'Snowflake_Postgres_Demo' CATALOG_NAME = 'postgres' ACCESS_DELEGATION_MODE = VENDED_CREDENTIALS ) 已启用= TRUE; 步骤 4:在 Snowflake 创建 Iceberg 表 -- 创建引用 Postgres 管理的 Iceberg 数据的 Snowflake Iceberg 表 CREATE OR REPLACE ICEBERG TABLEorders_iceberg CATALOG = 'pg_orders_catalog' CATALOG_TABLE_NAME = 'orders_iceberg' CATALOG_NAMESPACE = 'cdc_demo' AUTO_REFRESH = TRUE; 5:查询验证数据 SELECT COUNT(*) FROMorders_iceberg; SELECT order_status, COUNT(*), SUM(total_price