mongoshake概述
MongoShake是采用GO语言编写的MongoDB数据同步订阅工具,其通过读取oplog操作日志来捕获数据,业务通过消费对应的日志数据来实现不同的场景,例如灾备或多活。项目地址:alibaba/MongoShake
MongoShake将捕获到的日志数据发送给不同的tunnel通道,其类型包含:
- Direct:直接写入目标MongoDB
 - RPC:通过net/rpc方式连接
 - TCP:通过TCP方式连接
 - File:通过文件的方式关联
 - Kafka:利用kafka作为消息中转
 - Mock:测试试用,不写入tunnel
 
MongoShake在源端上支持Sharding、replset、mongod三种架构,目标端则支持mongod和mongos。对于replset环境,建议优先从secondary节点读取,减轻primary的压力。
除此之外,MongoShake还具备以下特性:
- 并行复制:支持并行复制,粒度可为id、collection或auto。
 - HA:定期对上下文进行保存,当服务切换或重启后能够继续提供服务
 - 黑白名单:可以进行db或collection的筛选
 - 压缩:发送日志数据前进行日志压缩,格式支持gzip、zlib或deflate
 - Checkpoint:checkpoint记录了同步位点信息,根据位点信息能够知道当前同步到哪个位置
 - 同步模式:支持全量同步、增量同步和全量+增量
 
安装配置mongoshake
下载mongoshake二进制包,当前版本为v2.8.4
 | 
 | 
解压mongoshake安装包
 | 
 | 
修改配置文件(collector.conf)
| 参数 | 说明 | 
|---|---|
| full_sync.http_port | 全量监控端口 | 
| incr_sync.http_port | 增量监控端口 | 
| sync_mode | 同步模式:all(全量+增量),full(全量),incr(增量) | 
| mongo_urls | 源端mongodb连接串,如果源端为sharding,则配置为后端所有shard副本集连接地址,分号分割 | 
| mongo_cs_url | 如果源端为sharding,则配置为config节点连接地址 | 
| mongo_s_url | 如果源端为sharding,则配置为mongos的地址,多个mongos以逗号分割 | 
| tunnel | 通道模式,可选direct、rpc、tcp、kafka等。direct用于直接写入目标mongodb | 
| tunnel.address | 目标端mongodb地址,sharding配置mongos地址,格式与mongo_urls对齐 | 
| mongo_connect_mode | 连接模式。secondaryPreferred表示优先从secondary拉取 | 
| filter.namespace.black | 黑名单过滤。可以是db,也可以是db.collection,分号分割 | 
| filter.namespace.white | 白名单过滤。可以是db,也可以是db.collection,分号分割 | 
| filter.ddl_enable | 是否开启DDL同步,源端sharding架构暂不支持开启 | 
| checkpoint.storage.url | checkpoint集合保存地址,默认写入源端 | 
| checkpoint.storage.db | checkpoint集合的库名 | 
| checkpoint.storage.collection | checkpoint集合名 | 
| checkpoint.start_position | 如果checkpoint不存在,则根据指定时间进行增量同步,仅适用于incr_sync.mongo_fetch_method=oplog的场景 | 
| transform.namespace | namespace对象重命名,例如db1.tab1:db2:tab2 | 
| skip.nsshardkey.verify | 分片键一致性检查,sharding->sharding场景下,如果分片键不一致检查失败会退出同步 | 
| full_sync.reader.collection_parallel | 全量同步并发拉取的表数量 | 
| full_sync.reader.write_document_parallel | 全量同步时同一张表并发写进程数量 | 
| full_sync.reader.document_batch_size | 全量同步时目标端写入时每个线程的文档数量 | 
| full_sync.reader.fetch_batch_size | 全量同步时源端每次读取文档数量 | 
| full_sync.collection_exist_drop | 目标库存在时是否在同步前先删除 | 
| full_sync.create_index | 全量同步完成后索引的创建方式,none不创建,background后台创建,foreground前台创建 | 
| incr_sync.mongo_fetch_method | 增量同步模式,可选oplog或change_stream(需要>4.0) | 
| incr_sync.worker | 增量同步并行写入线程数 | 
| incr_sync.target_delay | 配置复制延迟 | 
| incr_sync.executor.upsert | update语句是否采用upsert选项 | 
| incr_sync.executor.insert_on_dup_update | 插入数据存在时,是否转换为update | 
启动mongoshake开启同步
 | 
 | 
如需停止同步可执行stop.sh
 | 
 | 
监控同步进程
全量监控
用户可以通过curl对full_sync.http_port的值来进行全量监控,提供了下列接口
- conf:查看配置文件
 - Progress:查看全量同步进度
 - Index:查看索引同步情况
 - Sentinel:控制全量运行,目前仅支持限速
 
 | 
 | 
如数据库资源负载过高,可进行限速
 | 
 | 
查看同步速率
 | 
 | 
增量监控
用户可以通过curl对full_sync.http_port的值来进行全量监控,提供了下列接口
- Conf:查看配置文件信息
 - Repl:复制的整体信息
 - Queue:syncer内部的队列情况
 - Worker: worker内部的情况
 - Executer:写入端的统计信息
 - Persist:内部角色persist的内部情况
 - Sentinel:内部控制情况
 
 | 
 | 
除了上诉方式,也可以通过mongoshake-stat --port=9100的方式查看同步状态
| 参数 | 说明 | 
|---|---|
| logs_get/sec | 每秒获取的oplog数量 | 
| logs_repl/sec | 每秒执行重放的oplog数量 | 
| logs_success/sec | 每秒重放成功的oplog数量 | 
| lsn.time | 最后发送oplog的时间 | 
| lsn_ack.time | 目标端确认写入的时间 | 
| lsn_ckpt.time | Checkpoint持久化的时间 | 
| now.time | 当前时间 | 
| replset | 源数据库副本集名称 | 
| tps/sec | 每秒TPS数量 | 
存在的问题
源端只暴露mongos地址
当源端为sharding架构,且只能暴露mongos地址时,按文档最佳实践——常见场景配置 · alibaba/MongoShake Wiki · GitHub 配置,同步存在下列问题:
1. checkpoint集合数据不更新
实际使用中发现在全量同步结束后,checkpoint集合会生成一个怪异的TS,根据日志信息该checkpoint转换为时间戳后为2038-01-19 11:14:07,且无论任何环境任何时间生成的checkpoint都相同
并且checkpoint数据不会更新,这样就会导致任务重启后增量同步异常,无法同步数据
猜想时间:源端只配置mongos的情况下,进行ALL模式同步,全量同步结束后内存和checkpoint集合的数据不一致,不重启就没问题,重启后就会读取checkpoint集合里面的数据来恢复,但默认的时间是2038-01-19 11:14:07,导致增量同步获取不到数据
github issues早已有人反馈这个bug,但暂未修复该问题,最新的2.4.8版本依旧存在该问题https://github.com/alibaba/MongoShake/issues/834
根据文档显示,mongoshake对于change stream的实现是通过解析ts来构建resumeToken,我们可以指定一个略早一点的LSN_ACK对应的ts。目前想到的解决方案就是任务重新启动前获取监控或日志信息中的LSN_ACK值来手动更新到checkpoint表中。
 | 
 | 
手动调整后检查增量同步恢复正常,并且能够自动更新维护checkpoint表了.
2. mongos启动报错
如果mongo_s_url配置了1个以上的mongos地址时,mongoshake启动会报下列错误,只填一个地址则正常
 | 
 | 
3. checkpoint.start_position参数无法转换
incr+change_stream配置下,checkpoint.start_position无法正确转换,mongoshake会将时间转换为oplog模式下的时间戳,与change_stream格式下的checkpoint格式不同,导致checkpoint.start_position无效,无法同步启动前的差异数据

如果了解如何转换为change stream的ts,可以手动创建一个checkpoint来完成增量同步。据了解,目前有一个aliyun改写版本优化了,但未公开