开源湖仓框架LakeSoul支持Flink CDC整库同步 您所在的位置:网站首页 阅读开源app书源网址是什么 开源湖仓框架LakeSoul支持Flink CDC整库同步

开源湖仓框架LakeSoul支持Flink CDC整库同步

2023-05-08 18:49| 来源: 网络整理| 查看: 265

首先,附上Github连接

LakeSoul:https://github.com/meta-soul/LakeSoul

一、导语

LakeSoul 是由数元灵科技研发的云原生湖仓一体框架,具备高可扩展的元数据管理、ACID 事务、高效灵活的 upsert 操作、Schema 演进和批流一体化处理等特性。LakeSoul Flink CDC Sink 支持从 MySQL 数据源整库同步到 LakeSoul,能够支持自动建表、自动 Schema 变更、Exactly Once 语义等。

本篇文章将介绍Flink CDC的功能优势以及LakeSoul Flink CDC整库同步的使用教程,并完整地演示:将一个 MySQL 库整库同步到 LakeSoul 中,涵盖自动建表、DDL 变更等操作。

二、Flink CDC

1.CDC简介

CDC 是变更数据捕获(Change Data Capture)的简称,它可以监测并捕获源数据库的增量变动记录,同步到一个或多个数据目的(Sink)。可捕获的数据库变动包括数据或数据表的插入(INSERT)、更新(UPDATE)、删除(DELETE)等等,将这些变更按发生的顺序完整记录下来并写入到消息中间件中。

LakeSoul 提供了一套独立的 CDC 语义表达规范,通过表属性设置一个 CDC Op 列,即可表示每条数据的操作类型,在后续 Merge 时会自动根据操作语义进行合并。可以通过 Debezium、Canal、Flink 等将 CDC 数据转换后导入 LakeSoul。

2.为什么使用Flink CDC

最初LakeSoul框架没有使用Flink CDC,而是通过 Debezium将 CDC 数据转换后导入 LakeSoul:

整体上可以分为一下几个流程:

1. 对接 Mysql 和 kafka

2. 创建 Debezium CDC 同步任务

3. 使用 Spark Streaming,消费 Kafka 数据并同步更新至 LakeSoul

可以看出通过Debezium将 CDC 数据转换后导入LakeSoul整体的处理链路较长,需要用到的组件也比较多。Debezium是通过Kafka Streams 实现的 CDC 功能,而LakeSoul现在使用的是Flink CDC 模块,可以跳过 Debezium 和 Kafka 的中转,使用flink-cdc-connectors对上游数据源的变动进行直接的订阅处理。

Flink社区开发的flink-cdc-connectors 组件是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。flink-cdc-connectors可以捕获在一个或多个表中发生的所有变更。该模式通常有一个前记录和一个后记录。flink-cdc-connectors可以直接在Flink中以非约束模式使用,而不需要使用类似kafka的中间件来转数据。

Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析,从内部实现上讲,flink-cdc-connectors组件内置了一套Debezium和Kafka组件,但这个细节对用户屏蔽。

可以看到通过Flink将 CDC 数据转换后导入 LakeSoul如下图所示,数据不再通过kafka进行同步,简化了整体架构:

LakeSoul Flink 作业启动后初始化阶段,首先会读取配置的 MySQL DB 中的所有表(排除掉不需要同步的表)。对每一个表,首先判断在 LakeSoul 中是否存在,如果不存在则自动创建一个 LakeSoul 表,其 Schema 与 MySQL 对应表一致。完成初始化后,会读取所有表的 CDC Stream,以 Upsert 的方式写入到对应的各个 LakeSoul 表中。在同步期间如果 MySQL 表发生 DDL Schema 变更,则该变更也会同样应用到对应的 LakeSoul 表中。下面将完整地讲解LakeSoul Flink CDC整库同步的使用教程。

三、LakeSoul Flink CDC整库同步

1. 准备环境

1.1 启动一个本地MySQL数据库

推荐使用 MySQL Docker 镜像来快速启动一个 MySQL 数据库实例:

cd docker/lakesoul-docker-compose-env/

docker run--name lakesoul-test-mysql -e MYSQL_ROOT_PASSWORD=root -e MYSQL_DATABASE=test_cdc -p 3306:3306 -d mysql:8

1.2 配置 LakeSoul 元数据库以及 Spark 环境

1.2.1 启动一个 PostgreSQL 数据库并进行初始化

可以通过docker使用下面命令快速搭建一个pg数据库:

docker run -d --name lakesoul-test-pg -p 5432:5432 -e POSTGRES_USER=lakesoul_test -e POSTGRES_PASSWORD=lakesoul_test -e POSTGRES_DB=lakesoul_test -d swr.cn-north-4.myhuaweicloud.com/dmetasoul-repo/postgres:14.5

进行PG数据库初始化,进入docker容器中,将meta_init.sql拷贝到容器,可通过docker ps命令查看容器id:

docker exec -it 容器id /bin/bash

docker cp script/meta_init.sql 容器id:script/meta_init.sql

在docker容器中执行:

PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -f script/meta_init.sql

1.2.2 安装 Spark 环境

由于 Apache Spark 官方的下载安装包不包含 hadoop-cloud 以及 AWS S3 等依赖,我们提供了一个 Spark 安装包,其中包含了 hadoop cloud 、s3 等必要的依赖:https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/spark/spark-3.3.2-bin-lakesoul-8e167b33.tgz

wget https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/spark/spark-3.3.2-bin-hadoop-3.3.5.tgz

tar xf spark-3.3.2-bin-hadoop-3.3.5.tgz

export SPARK_HOME=${PWD}/spark-3.3.2-bin-dmetasoul

如果是生产部署,推荐下载不打包 Hadoop 的 Spark 安装包:

https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-without-hadoop.tgz

并参考 https://spark.apache.org/docs/latest/hadoop-provided.html这篇文档使用集群环境中的 Hadoop 依赖和配置。

LakeSoul 发布 jar 包可以从 GitHub Releases 页面下载:https://github.com/meta-soul/LakeSoul/releases。

wget https://github.com/meta-soul/LakeSoul/releases/download/v2.2.0/lakesoul-spark-2.2.0-spark-3.3.jar -P $SPARK_HOME/jars

下载后请将 jar 包放到 Spark 安装目录下的 jars 目录中

如果访问 Github 有问题,也可以从如下链接下载:https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/lakesoul/lakesoul-spark-2.2.0-spark-3.3.jar

从 2.1.0 版本起,LakeSoul 自身的依赖已经通过 shade 方式打包到一个 jar 包中。之前的版本是多个 jar 包以 tar.gz 压缩包的形式发布。

1.2.3 为 LakeSoul 增加 PG 数据库配置

默认情况下,pg数据库连接到本地数据库,配置信息如下:

lakesoul.pg.driver=com.lakesoul.shaded.org.postgresql.Driver

lakesoul.pg.url=jdbc:postgresql://127.0.0.1:5432/lakesoul_test?stringtype=unspecified

lakesoul.pg.username=lakesoul_test

lakesoul.pg.password=lakesoul_test

自定义 PG 数据库配置信息,需要在程序启动前增加一个环境变量 lakesoul_home,将配置文件信息引入进来。假如 PG 数据库配置信息文件路径名为:/opt/soft/pg.property,则在程序启动前需要添加这个环境变量:

export lakesoul_home=/opt/soft/pg.property

用户可以在这里自定义数据库配置信息,这样用户自定义 PG DB 的配置信息就会在 Spark 作业中生效。

1.2.4 启动Spark环境

启动一个 spark-sql SQL 交互式查询命令行环境:

$SPARK_HOME/bin/spark-sql --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --conf spark.sql.warehouse.dir=/tmp/lakesoul --conf spark.dmetasoul.lakesoul.snapshot.cache.expire.seconds=10

这里启动 Spark 本地任务,增加了两个选项:

1. spark.sql.warehouse.dir=/tmp/lakesoul 设置这个参数是因为 Spark SQL 中默认表保存位置,需要和 Flink 作业产出目录设置为同一个目录。

2. spark.dmetasoul.lakesoul.snapshot.cache.expire.seconds=10 设置这个参数是因为 LakeSoul 在 Spark 中缓存了元数据信息,设置一个较小的缓存过期时间方便查询到最新的数据。

启动 Spark SQL 命令行后,可以执行:

SHOW DATABASES;

SHOW TABLES IN default;

可以看到 LakeSoul 中目前只有一个 default database,其中也没有表。

1.3 预先在 MySQL 中创建一张表并写入数据

1. 安装mycli

pip install mycli

2. 启动 mycli 并连接 MySQL 数据库

mycli mysql://root@localhost:3306/test_cdc -p root

3. 创建表并写入数据

CREATE TABLE mysql_test_1 (id INT PRIMARY KEY, name VARCHAR(255), type SMALLINT);

INSERT INTO mysql_test_1 VALUES (1, 'Bob', 10);

SELECT * FROM mysql_test_1;

2. 启动同步作业

2.1 启动一个本地的 Flink Cluster

可以从 Flink 下载页面下载 Flink 1.14.5,也可以从我们的国内镜像地址下载(与Apache官网完全相同)。

解压下载的 Flink 安装包:

tar xf flink-1.14.5-bin-scala_2.12.tgz

export FLINK_HOME=${PWD}/flink-1.14.5

然后启动一个本地的 Flink Cluster:

$FLINK_HOME/bin/start-cluster.sh

可以打开 http://localhost:8081 查看 Flink 本地 cluster 是否已经正常启动:

2.2 提交 LakeSoul Flink CDC Sink 作业

向上面启动的 Flink cluster 提交一个 LakeSoul Flink CDC Sink 作业:

./bin/flink run-ys1-yjm1G-ytm2G \

-corg.apache.flink.lakesoul.entry.MysqlCdc \

  lakesoul-flink-2.2.0-flink-1.14.jar \

--source_db.host localhost \

--source_db.port3306\

--source_db.db_name test_cdc \

--source_db.user root \

--source_db.password root \

--source.parallelism1\

--sink.parallelism1\

--warehouse_pathfile:/tmp/lakesoul \

--flink.checkpoint file:/tmp/flink/chk \

--flink.savepoint file:/tmp/flink/svp \

--job.checkpoint_interval10000\

--server_time_zoneUTC

其中 lakesoul-flink 的 jar 包可以从 Github Release 页面下载。如果访问 Github 有问题,也可以通过这个链接下载:https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/lakesoul/lakesoul-flink-2.2.0-flink-1.14.jar

在 http://localhost:8081 Flink 作业页面中,点击 Running Job,进入查看 LakeSoul 作业是否已经处于 Running 状态。

可以点击进入作业页面,此时应该可以看到已经同步了一条数据:

2.3 使用 Spark SQL 读取 LakeSoul 表中已经同步的数据

在 Spark SQL Shell 中执行:

SHOW DATABASES;

SHOW TABLES IN test_cdc;

DESC test_cdc.mysql_test_1;

SELECT * FROM test_cdc.mysql_test_1;

可以看到每条语句的运行结果,即 LakeSoul 中自动新建了一个test_cdcdatabase,其中自动新建了一张mysql_test_1表,表的字段、主键与 MySQL 相同(多了一个 rowKinds 列)

LakeSoul 使用一个额外的操作列(列名可以自定义)来记录 CDC 的操作类型,可以支持从 Debezium, canal 和 Flink CDC 中导入 CDC 数据。

创建 LakeSoul CDC 表,需要添加一个表属性 lakesoul_cdc_change_column 来配置 CDC 变更类型的列名。这一列需要是 string 类型,包含三种取值之一: update, insert, delete.

在 LakeSoul 读数据自动合并时,会保留最新的 insert、update 数据,并自动过滤掉 delete 的行。

2.4 MySQL 中执行 Update 后观察同步情况

在 mycli 中执行更新:

UPDATE mysql_test_1 SET name='Peter' WHERE id=1;

然后在 LakeSoul 中再次读取:

SELECT * FROM test_cdc.mysql_test_1;

可以看到已经读到了最新的数据:

2.5 MySQL 中执行 DDL 后观察同步情况,并读取新、旧数据

在 mycli 中修改表的结构:

ALTER TABLE mysql_test_1 ADD COLUMN new_col FLOAT;

即在最后新增一列,默认为 null。在 mycli 中验证执行结果:

此时,LakeSoul 中已经同步了表结构,我们可以在 spark-sql 中查看表结构:

DESC test_cdc.mysql_test_1;

这时,从 LakeSoul 中读取数据,新增列同样为 null:

SELECT * FROM test_cdc.mysql_test_1;

向 MySQL 中新插入一条数据:

INSERT INTO mysql_test_1 VALUES (2,'Alice',20,9.9);

从 LakeSoul 中再次读取:

从 MySQL 中删除一条数据:

delete from mysql_test_1 where id=1;

从 LakeSoul 中读取:

可以看到 LakeSoul 每次都读取到了同步后的结果,与 MySQL 中完全一致。

2.6 MySQL 中新建表后观察同步情况

在 MySQL 中新建一张表,schema 与之前表不同:

CREATE TABLE mysql_test_2 (name VARCHAR(100) PRIMARY KEY, phone_no VARCHAR(20));

在 LakeSoul 可以看到新表已经自动创建,可以查看表结构:

往 MySQL 新表中插入一条数据:

INSERT INTO mysql_test_2 VALUES ('Bob', '10010');

LakeSoul 中也成功同步并读取到新表的数据:

四、结束语

LakeSoul Flink CDC 支持从 MySQL 数据源整库同步到 LakeSoul;支持 Schema 变更(DDL)自动同步到 LakeSoul;支持运行过程中上游数据库中新建表自动感知,在 LakeSoul 中自动建表;支持严格一次(Exactly Once)语义,即使 Flink 作业发生 Failover,能够保证数据不丢不重。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有