全网唯一标准王
(19)国家知识产权局 (12)发明 专利申请 (10)申请公布号 (43)申请公布日 (21)申请 号 202210746292.8 (22)申请日 2022.06.28 (71)申请人 武汉众邦银行股份有限公司 地址 432200 湖北省武汉市黄陂区盘龙城 经济开发区汉口北大道88号汉口北国 际商品交易中心D2区1-2层、 2 2-23层 (72)发明人 严东 程峰 李耀 彭磊 刘珏  (74)专利代理 机构 成都正煜知识产权代理事务 所(普通合伙) 51312 专利代理师 袁宇霞 (51)Int.Cl. G06F 16/22(2019.01) G06F 16/23(2019.01) G06F 16/242(2019.01) G06F 16/27(2019.01) (54)发明名称 一种基于kafka和log的数据抽取和同步方 法 (57)摘要 本发明涉及信息技术领域, 提供了一种基于 kafka和log的数据抽取和同步方法, 通过使用各 个系统的增量log作为数据基础, 数据使用方通 过订阅kafka来消费log解决上述方案存在的问 题。 在此方案中, 提供搜索服务的使用方将日志 保存到HBase中, 提供缓存服务的使用方可以将 缓存数据保存到Redis中, 数据同步的使用方可 以将数据保存到自 己的数据库中, 由于kafka中 的日志数据是可以重复消费的, 并且会缓存一段 时间, 各个使用方可以通过消费kafka中保存的 日志来达到既能保持与数据库的一致性, 也能保 证数据的实时性。 权利要求书3页 说明书8页 附图1页 CN 115185942 A 2022.10.14 CN 115185942 A 1.一种基于kafka和l og的数据抽取和同步方法, 其特 征在于: 包括以下步骤: 步骤1、 将日志从源系统实时抽出, 并以增量或者全量的方式进行抽取, 获得抽取的日 志, 写入kafka指定的日志抽取同步topic中; 步骤2、 将日志抽取同步topic中的日志, 进行解析脱敏, 规定统一消息格式, 放入到 kafka对应的脱敏日志消息topic中; 步骤3、 将kafka 中的数据, 通过消费端, 重放入HDFS中; 步骤4、 HDFS中的数据经 过清洗, 放入到 HBase中进行保存, 供其 他系统来 查询调用。 2.根据权利 要求1所述的一种基于kafka和log的数据抽取和同步方法, 其特征在于: 步 骤1包括以下步骤: 步骤1.1: 从MySQ L集群中进行日志解析, 采用Ro w模式进行读取bi narylog日志; 步骤1.2: 使用canal读取bi narylog日志, 包括以下步骤: 步骤1.2.1: canal模拟MySQL  Slave的交互协议, 伪装自己为MySQL  Slave, 向MySQL   Slave发送dump协议; 步骤1.2.2: MySQ L master收到dump请求, 开始推送bi narylog给canal; 步骤1.2.3: Canal 解析binary log对象; 步骤1.3: 通过订阅Canal  Server的方式, 抽取步骤2获得的binarylog日志中对应的 MySQL的增量日志, 包括以下步骤: 步骤1.3.1: 按照Canal的输出, 日志是protobuf格式, 通过增量Storm模块, 将数据实时 转换为约定的统一消息格式, 并保存到kafka 中; 步骤1.3.2: 增量Storm模块还负责捕获数据库对象集合schema 的变化, 以控制在kafka 中保存的日志消息的版本号; 步骤1.3.3: 增量Storm模块的配置信息保存在Zo okeeper中, 以满足高可用需求; 步骤1.3.4: Kafka既作为输出结果也作为处理过程中的缓冲器和消息解构区, 最终以 消息版本号的递增来获得增量日志; 步骤1.4: 对于增量无法满足的日志抽取场景, 需要 进行全量抽取, 包括以下步骤: 步骤1.4.1: 对拉取请求进行流 量控制, 避免影响生产数据库; 步骤1.4.2: 消费端对数据进行分片, 控制最大拉取字节数和拉取记录条 数; 步骤1.4.3: 每次拉取一个分片完成, 升一次kafka 中保存的日志消息的版本号; 步骤1.4.4: 通知zookeeper, 记录一个全量的拉取过程启动, 并向zookeeper中写入心 跳信息; 步骤1.4.5: 检查数据库的元 数据meta的兼容 性, 保证拉取请求有效; 步骤1.4.6: 确定拆片使用的列, 设定优先级规则由大到小: 按照用户指定的列, 主键索 引PK, 唯一索引UK, 普通索引的优先级来确定拆片列, 多列时, 取其中第一列; 步骤1.4.7: 根据拆片列拆分数据, 将拆片结果写入kafka指定的日志抽取同步topic 中。 3.根据权利 要求1所述的一种基于kafka和log的数据抽取和同步方法, 其特征在于: 步 骤2: 将抽取的日志, 进行解析脱敏, 规定统一消 息格式, 放入到kafka对应的脱敏日志消 息 topic中, 包括以下步骤: 步骤2.1: 定义统一的消息格式, 并保证数据的唯一 性, 包括以下步骤:权 利 要 求 书 1/3 页 2 CN 115185942 A 2步骤2.1.1: 定义消息的命名空间, 由类型+数据源名+schema名+表名+版本号+分库号+ 分表号组成, 能够描述所有表, 通过命名空间可以唯一定位; 步骤2.1.2: 定义_ums_op_字段, 表明数据的类型是I, U, D, 其中I代表insert, 新增, U代 表update, 修改, D代 表delete, 删除; 步骤2.1.3: 定义_ums_ts_字段, 记录发生增删改的事件的时间戳, 新的数据发生的时 间戳会更新; 步骤2.1.4: 定义_ums_id_字段, 表明消息的唯一id, 保证消息的唯一性, 且保证了消息 的先后顺序; 步骤2.1.5: 对于全量支取, _ums_id_是唯一的, 从zookeeper中每个并发度分别取不同 的id片区, 保证了唯一性和性能, 填写负数, 不会与增量数据冲突, 也保证他们是早于增量 消息的; 步骤2.1.6: 对于增量支取, 使用MySQL的日志文件号+日志偏移量作为唯一id, id作为 64位的long整数, 高7位用于日志文件号, 低12位作为日志偏移量; 步骤2.2: 针对抽取流 程, 进行心跳监控和预警, 包括以下步骤: 步骤2.2.1: 每分钟对每 个被抽取的表插 入一条心 态数据并保存发送时间; 步骤2.2.2: 心跳表也被抽取, 当收到心跳包时, 即便没有任何增删改的数据, 也能证 明 链路的通畅; 步骤2.2.3: 将心跳数据发送kafka上公共的topic, 再消费保存到开源的时序性数据库 influxdb中, 使用开源的数据可视化应用grafana进行展示; 步骤2.2.4: 如果出现心跳延时, 通过监控发送邮件报警或短信报警; 步骤2.3: 考虑到数据的安全性, 需要对数据进行实时脱敏, 包括以下步骤: 步骤2.3.1: 默认脱敏, 适用于简单的场景, 配置为默认缺省值, 并替换规定好的字段 值; 步骤2.3.2: 替换脱敏, 适用于简单的场景, 配置为指定的值, 并替换规定好的字段值; 步骤2.3.3: Hash脱敏, 适用于需要映射的场景, 使用md5和murmur算法, 提供加盐功能, 保证hash后的数据一 致。 4.根据权利 要求1所述的一种基于kafka和log的数据抽取和同步方法, 其特征在于: 步 骤3: 将kafka 中的数据, 通过消费端, 重放入HDFS中, 包括以下步骤: 步骤3.1: 根据kafka中统一消息格式的消息进行消费, 使用spark将kafaka中的消息生 成消费流, 对各个不同命名空间的消息进行消费; 步骤3.2: 消费kafka中全量的统一消息, 并在HDFS中保存所有历史信息, 包括以下步 骤: 步骤3.2.1: 重放HDFS中的日志, 还原任意时间的历史快照; 步骤3.2.2: 通过链 表, 还原每一条记录的历史信息, 分析信息日志; 步骤3.2.3: 当程序出现错 误时, 通过回灌, 重新消费消息, 重新形成新的快照; 步骤3.3: 将消息保存到 HDFS中, 并进行分布存 储, 包括以下步骤: 步骤3.3.1: 使用列式存储格式parquet, 将统一消息格式日志保存落地到HDFS中, parquet的内容是所有日志的增删改信息以及_ums_id、 _ums_ts_, 由于Spark原生对 parquet的良好支持, 可以使用SparkSQL对parquet进行日期和时间区间的查下, 用于后续权 利 要 求 书 2/3 页 3 CN 115185942 A 3

PDF文档 专利 一种基于kafka和log的数据抽取和同步方法

文档预览
中文文档 13 页 50 下载 1000 浏览 0 评论 0 收藏 3.0分
温馨提示:本文档共13页,可预览 3 页,如浏览全部内容或当前文档出现乱码,可开通会员下载原始文档
专利 一种基于kafka和log的数据抽取和同步方法 第 1 页 专利 一种基于kafka和log的数据抽取和同步方法 第 2 页 专利 一种基于kafka和log的数据抽取和同步方法 第 3 页
下载文档到电脑,方便使用
本文档由 SC 于 2024-02-24 00:49:23上传分享
友情链接
站内资源均来自网友分享或网络收集整理,若无意中侵犯到您的权利,敬请联系我们微信(点击查看客服),我们将及时删除相关资源。