flink教程-详解flink 1.11 中的CDC (Change Data Capture)_大数据技术与应用实战的博客-CSDN博客_flink cdc同步数据的时候目标表有数据会覆盖吗


本站和网页 https://blog.csdn.net/zhangjun5965/article/details/107605396 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

flink教程-详解flink 1.11 中的CDC (Change Data Capture)_大数据技术与应用实战的博客-CSDN博客_flink cdc同步数据的时候目标表有数据会覆盖吗
flink教程-详解flink 1.11 中的CDC (Change Data Capture)
大数据技术与应用实战
于 2020-07-27 10:01:03 发布
29802
收藏
41
分类专栏:
flink
文章标签:
flink
1.11
canal
CDC
Debezium
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zhangjun5965/article/details/107605396
版权
flink
专栏收录该内容
58 篇文章
28 订阅
订阅专栏
这篇文章是开始的时候写了篇随笔,更深入的cdc的使用和源码分析请参考:深入解读flink sql cdc的使用以及源码分析
文章目录
CDC简介CanalCanalJson反序列化源码解析
CDC简介
CDC,Change Data Capture,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等,
用户可以在以下的场景下使用CDC:
使用flink sql进行数据同步,可以将数据从一个数据同步到其他的地方,比如mysql、elasticsearch等。可以在源数据库上实时的物化一个聚合视图因为只是增量同步,所以可以实时的低延迟的同步数据使用EventTime join 一个temporal表以便可以获取准确的结果
flink 1.11 将这些changelog提取并转化为table apa和sql,目前支持两种格式:Debezium和Canal,这就意味着源表不仅仅是append操作,而且还有upsert、delete操作。
Canal
接下来我们使用canal为例简单介绍下CDC的使用
canal 格式:
"data": [
"id": "13",
"username": "13",
"password": "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9",
"name": "Canal Manager V2"
],
"old": [
"id": "13",
"username": "13",
"password": "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9",
"name": "Canal Manager"
],
"database": "canal_manager",
"es": 1568972368000,
"id": 11,
"isDdl": false,
"mysqlType": {...},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {...},
"table": "canal_user",
"ts": 1568972369005,
"type": "UPDATE"
简单讲下几个核心的字段:
type : 描述操作的类型,包括‘UPDATE’, ‘INSERT’, ‘DELETE’。data : 代表操作的数据。如果为’INSERT’,则表示行的内容;如果为’UPDATE’,则表示行的更新后的状态;如果为’DELETE’,则表示删除前的状态。old :可选字段,如果存在,则表示更新之前的内容,如果不是update操作,则为 null。
完整的语义如下;
private String destination; // 对应canal的实例或者MQ的topic
private String groupId; // 对应mq的group id
private String database; // 数据库或schema
private String table; // 表名
private List<String> pkNames;
private Boolean isDdl;
private String type; // 类型: INSERT UPDATE DELETE
// binlog executeTime
private Long es; // 执行耗时
// dml build timeStamp
private Long ts; // 同步时间
private String sql; // 执行的sql, dml sql为空
private List<Map<String, Object>> data; // 数据列表
private List<Map<String, Object>> old; // 旧数据列表, 用于update, size和data的size一一对应
-- 定义的字段和data 里面的数据想匹配
CREATE TABLE my_table (
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'canal-json.ignore-parse-errors'='true' -- 忽略解析错误,缺省值false
);
CanalJson反序列化源码解析
canal 格式也是作为一种flink的格式,而且是source,所以也就是涉及到读取数据的时候进行反序列化,我们接下来就简单看看CanalJson的反序列化的实现。具体的实现类是CanalJsonDeserializationSchema。
我们看下这个最核心的反序列化方法:
@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
try {
//使用json反序列化器将message反序列化成RowData
RowData row = jsonDeserializer.deserialize(message);
//获取type字段,用于下面的判断
String type = row.getString(2).toString();
if (OP_INSERT.equals(type)) {
// 如果操作类型是insert,则data数组表示的是要插入的数据,则循环遍历data,然后添加一个标识INSERT,构造RowData对象,发送下游。
ArrayData data = row.getArray(0);
for (int i = 0; i < data.size(); i++) {
RowData insert = data.getRow(i, fieldCount);
insert.setRowKind(RowKind.INSERT);
out.collect(insert);
} else if (OP_UPDATE.equals(type)) {
// 如果是update操作,从data字段里获取更新后的数据、
ArrayData data = row.getArray(0);
// old字段获取更新之前的数据
ArrayData old = row.getArray(1);
for (int i = 0; i < data.size(); i++) {
// the underlying JSON deserialization schema always produce GenericRowData.
GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
for (int f = 0; f < fieldCount; f++) {
if (before.isNullAt(f)) {
//如果old字段非空,则说明进行了数据的更新,如果old字段是null,则说明更新前后数据一样,这个时候把before的数据也设置成after的,也就是发送给下游的before和after数据一样。
before.setField(f, after.getField(f));
before.setRowKind(RowKind.UPDATE_BEFORE);
after.setRowKind(RowKind.UPDATE_AFTER);
//把更新前后的数据都发送下游
out.collect(before);
out.collect(after);
} else if (OP_DELETE.equals(type)) {
// 如果是删除操作,data字段里包含将要被删除的数据,把这些数据组织起来发送给下游
ArrayData data = row.getArray(0);
for (int i = 0; i < data.size(); i++) {
RowData insert = data.getRow(i, fieldCount);
insert.setRowKind(RowKind.DELETE);
out.collect(insert);
} else {
if (!ignoreParseErrors) {
throw new IOException(format(
"Unknown \"type\" value \"%s\". The Canal JSON message is '%s'", type, new String(message)));
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(format(
"Corrupt Canal JSON message '%s'.", new String(message)), t);
参考资料: [1].https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289 [2].https://flink.apache.org/news/2020/07/06/release-1.11.0.html#table-apisql-support-for-change-data-capture-cdc
更多内容,欢迎关注我的公众号【大数据技术与应用实战】
大数据技术与应用实战
关注
关注
点赞
41
收藏
打赏
评论
flink教程-详解flink 1.11 中的CDC (Change Data Capture)
文章目录CDC简介CanalCanalJson反序列化源码解析CDC简介CDC,Change Data Capture,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等,用户可以在以下的场景下使用CDC:使用flink sql进行数据同步,可以将数据从一个数据同步到其他的地方,比如mysql、elasticsearch等。可以在源数据库上实时的物化一个聚合视图因为只是增量同步,所以可以
复制链接
扫一扫
专栏目录
Flink CDC数据同步
song_quan_的博客
10-11
733
一、什么是FLinkApache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。接下来,我们来介绍一下 Flink 架构中的重要方面。
Flink实践——CDC(二)
无语梦醒的博客
11-03
449
FLINK-窗口统计简单示例
评论 1
您还未登录,请先
登录
后发表或查看评论
Flink CDC介绍及原理
qq_42456324的博客
11-22
396
CDC (Change Data Capture) 是一种用于的技术,Flink 从 1.11 版本开始原生支持 CDC 数据(changelog)的处理,目前已经是非常成熟的变更数据处理方案。Flink CDC Connectors 是 Flink 的一组 Source 连接器,是 Flink CDC 的核心组件,这些连接器负责从等数据库读取存量历史数据和增量变更数据。
大数据流处理框架之Flink-CDC
wenqi
01-23
5091
一篇文章从入门到熟练使用FlinkCDC
Flink-----Flink CDC 实现数据实时同步
GTmustang的博客
07-16
4498
initial()模式即获取创建表有史以来的日志,但是遇见布置CDC后的日志就报错·latest()模式即获取最新的日志,但运行就报错以上两个错误都是以下显示错误说明提示没有为该表设置日志归档错误原因cdc底层自动将配置的表名转为小写,而oracle日志的表名是大写,导致cdc无法找到配置表的日志,所以就报没有为该表配置日志归档,但这一步确实已经做过了解决办法1.加配置文件a)Stream模式b)SQL模式httpshttpshttps。...
Flink 1.11.1:table sql支持cdc debezium数据源下的Interval Join
高矮
12-09
807
Flink:1.11.1
目的
Flink SQL CDC 模式输出的Json类型数据不支持 Interval Join,由于Interval Join只支持 append-only 的表,所以这里需要修改CDC模式debezium组件的输出格式,适配支持Table Interval Join
实现
这里我们通过新增一个format的形式来适配Interval Join,取名为’insert-debezium-json’,这里需要新建两个class文件如下:
DebeziumJsonDeserizat.
【Flink基础】-- Flink CDC介绍
余额不足
03-25
1万+
一、Flink CDC 是什么?
2020年 Flink cdc 首次在 Flink forward 大会上官宣,由Jark Wu & Qingsheng Ren 两位大佬介绍,原始 blog 点击链接。
Flink CDC connector 可以捕获在一个或多个表中发生的所有变更。该模式通常有一个前记录和一个后记录。Flink CDC connector 可以直接在Flink中以非约束模式(流)使用,而不需要使用类似 kafka 之类的中间件中转数据。
...
flink cdc笔记(一):flink cdc简介
YiRan_Zhao的博客
09-20
3108
flink cdc
【Flink】Flink CDC介绍和原理概述
热门推荐
一个写湿的程序猿
09-18
2万+
Flink CDC概述和原理什么是CDC?基于查询的CDC 和 基于日志的CDCFlink CDCFlink CDC原理简述基于 Flink SQL CDC 的数据同步方案实践
什么是CDC?
CDC是(Change Data Capture 变更数据获取)的简称。
核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
基于查询的CDC 和 基于日志的CDC
CD
Flink CDC 同步mysql数据
congge_study的博客
03-24
1万+
Flink CDC 同步mysql数据
FlinkCDC简介
m0_61607827的博客
02-26
1850
目录
一. Flink CDC介绍
二.Flink CDC 实操
2.1 MySQL配置
2.2 pom文件
2.3 Java代码
2.4 测试结果
一. Flink CDC介绍
CDC主要分为基于查询和基于Binlog两种方式,我们主要了解一下这两种之间的区别:
FlinkCDC其实和canal差不多,只不过就是flink社区开发的组件,用起来更方便一些。
  Flink在1.11版本中新增了CDC的特性,简称 改变数据捕获。名称来看有点乱,我们先从之前的数据架构来看CDC的内容
flink cdc整库同步
最新发布
qq_36062467的博客
11-30
580
flinkcdc整库(多表异构)同步方案
Flink 爬坑【2】cdc数据同步及分区表异常
weixin_45893488的博客
06-14
640
Flink 爬坑【2】cdc数据同步及处理中分区表异常
flink-cdc 同步 Postgre SQL 基本配置【1】
weixin_43838328的博客
04-28
2833
版本要求
PostgreSQL: 9.6, 10, 11, 12 +
连接器:flink-cdc-connectors
操作步骤
更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical
阿里云中修改参数设置会实例重启,谨慎操作
修改域信息,保证本机可以连接
新建用户
CREATE USER user WITH PASSWORD 'pwd';
给用户复制流权限
ALTER ROLE use...
Flink 实现 MySQL CDC 动态同步表结构
cloudbigdata的博客
01-25
5117
作者:陈少龙,腾讯 CSIG 高级工程师使用 Flink CDC(Change Data Capture) 实现数据同步被越来越多的人接受。本文介绍了在数据同步过程中,如何将 Schema...
Flink cdc技术在数据库同步方向的应用
qq_35709745的博客
08-10
1171
基于flink cdc实现数据库同步
Flink_CDC搭建及简单使用
weixin_43914798的博客
11-16
1万+
Flink_CDC搭建及简单使用
1.CDC简介:
​ CDC (Change Data Capture) ,在广义的概念上,只要能捕获数据变更的技术,都可以称为 CDC 。但通常我们说的CDC 技术主要面向数据库(包括常见的mysql,Oracle, MongoDB等)的变更,是一种用于捕获数据库中数据变更的技术。
​ 目前市面上的CDC技术非常多,常见的主要包括Flink CDC,DataX,Canal,Sqoop,Kettle,Oracle Goldengate,Debezi
Flink CDC介绍和简单实用
MyNameIsWangYi的博客
08-09
1089
CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。...............
Flink CDC 系列(1)—— 什么是 Flink CDC
白月蓝山
02-21
3301
本文介绍了什么是 Flink CDC, 支持的数据源,Flink CDC 的特性,以及 Flink CDC 与 Flink 的版本对应关系
Flink CDC详细教程(介绍、原理、代码样例)
少说,多做
06-24
7658
CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。例如对于电商平台,用户的订单会实时写入到某个源数据库;A 部门需要将每分钟的实时数据简单聚合处理后保存到 Redis 中以供查询,B 部门需要将当天的数据暂存到 Elasticsearch 一份来做报表展示,C 部门也需要一份数据到 ClickHous
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:大白
设计师:CSDN官方博客
返回首页
大数据技术与应用实战
CSDN认证博客专家
CSDN认证企业博客
码龄15年
暂无认证
63
原创
3万+
周排名
32万+
总排名
18万+
访问
等级
2294
积分
207
粉丝
142
获赞
100
评论
443
收藏
私信
关注
热门文章
flink教程-详解flink 1.11 中的CDC (Change Data Capture)
29802
Flink教程--flink 1.11 使用sql将流式数据写入hive
10838
flink小例子之side output 、table、sql、多sink的使用
9299
Kafka 0.11.0.0 是如何实现 Exactly-once 语义的
7688
Flink教程-flink 1.11使用sql将流式数据写入文件系统
5596
分类专栏
k8s
1篇
flink
58篇
iceberg
3篇
随笔
2篇
linux
2篇
hadoop
13篇
数据采集
1篇
java
4篇
kafka
3篇
azkaban
1篇
impala
1篇
最新评论
flink实战教程-使用set实时计算当天网站uv
夜幕.思年华:
我加了trigger聚合函数不触发,去掉了就触发,为啥啊
flink小例子之side output 、table、sql、多sink的使用
杨小明:
有测试过代码可以成功执行吗?
我看里面混用了table api 的insert和datastream的addsink
聊聊flink 1.11 中的随机数据生成器- DataGen connector
cts618:
大佬,阿里中台Dataphin用用过么
Flink教程-keyby 窗口数据倾斜的优化
yannbai08:
大佬,p99类的计算二次聚合好像解决不了
Flink教程-flink 1.11使用sql将流式数据写入文件系统
qwmwysr:
Flink SQL大数据项目实战(基于Flink1.14.3版本)
下载地址:https://download.csdn.net/download/hnmwykka/86512775
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
基于streamx闲聊flink在k8s上的部署实战
flink 写入hudi和iceberg数据湖选型对比
Flink集成iceberg在生产环境中的实践
2022年1篇
2021年1篇
2020年43篇
2019年18篇
2018年4篇
2017年18篇
目录
目录
分类专栏
k8s
1篇
flink
58篇
iceberg
3篇
随笔
2篇
linux
2篇
hadoop
13篇
数据采集
1篇
java
4篇
kafka
3篇
azkaban
1篇
impala
1篇
目录
评论 1
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
打赏作者
大数据技术与应用实战
你的鼓励将是我创作的最大动力
¥2
¥4
¥6
¥10
¥20
输入1-500的整数
余额支付
(余额:-- )
扫码支付
扫码支付:¥2
获取中
扫码支付
您的余额不足,请更换扫码支付或充值
打赏作者
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值