数据同步工具有很多中,下面我们看一下阿里云的开源工具DataX,DataX已经在阿里云的Dataworks中已经在使用了,已经支持很多中主流的存储服务之间的相互转换,下面的实验主要是介绍mongodb数据同步到elaticsearch,和mongodb同步到mysql的两种同步方式,有一些在实验过程中遇到的问题值得参考,因为在网上找的时候没有找到任何一个比较全面的记录,还有遇到问题的比较清晰一点的回答,所以遇到了很多的坑,不过这些都不是重点,重点是能够理解并使用即可,下面就详细一点的介绍一下我的实验记录。
支持的服务
可以参考官网:https://github.com/alibaba/DataX
安装
环境要求:
1、JDK 1.6 + 2、maven 3.X + 3、python 2.6 +
mongodb是4.0的分片集群,elasticsearch版本是6.7,mysql版本是5.6
下载源码包
git clone https://github.com/alibaba/DataX.git
进入源码包进行进行编译打包
cd DataX mvn -U clean package assembly:assembly -Dmaven.test.skip=true
注:jdk建议1.8版本,1.7版本在编译的时候出现问题,使用1.8就没有错误。
这里打包的是所有插件,也可以根据自己的需求,修改pom.xml文件的modules模块,只保留自己需要的插件即可,其他的可以删除
[root@wulaoer DataX]# pwd /root/DataX [root@wulaoer DataX]# vim pom.xml ...................... <!-- reader --> <module>mongodbreader</module> <!-- writer --> <module>elasticsearchwriter</module> .....................
这里我们只编译mongodb的读插件和elasticsearch的写插件,所以只保留这两个。
DataX编译
下面进行编译
[root@wulaoer DataX]# mvn -U clean package assembly:assembly -Dmaven.test.skip=true
编译成功出现如下信息:
oyment. [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary for datax-all 0.0.1-SNAPSHOT: [INFO] [INFO] datax-all .......................................... SUCCESS [ 11.335 s] [INFO] datax-common ....................................... SUCCESS [04:04 min] [INFO] datax-transformer .................................. SUCCESS [01:02 min] [INFO] datax-core ......................................... SUCCESS [ 14.754 s] [INFO] plugin-unstructured-storage-util ................... SUCCESS [ 46.987 s] [INFO] mongodbreader ...................................... SUCCESS [ 4.547 s] [INFO] elasticsearchwriter ................................ SUCCESS [ 8.606 s] [INFO] plugin-rdbms-util .................................. SUCCESS [ 3.649 s] [INFO] hbase20xsqlreader .................................. SUCCESS [ 9.678 s] [INFO] hbase20xsqlwriter .................................. SUCCESS [ 1.053 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 07:47 min [INFO] Finished at: 2019-10-22T14:55:47+08:00 [INFO] ------------------------------------------------------------------------
编译后默认elasticsearch写插件没有生成json文件,所以需要我们收到创建格式如下:
{ "name": "elasticsearchwriter", "parameter": { "endpoint": "http://xxx:9999", "accessId": "xxxx", "accessKey": "xxxx", "index": "test-1", "indextype": "default", "cleanup": true, "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}}, "discovery": false, "batchSize": 1000, "splitter": ",", "column": [] }, "speed": { "concurrent": 4, "throttle": true, "mbps": "2" } }
把上面的内容写到这个文件中,然后在配置信息
[root@wulaoer DataX]# vim target/datax/datax/plugin/writer/elasticsearchwriter/plugin_job_template.json
配置信息已经配置完成,需要生成一下配置文件。
[root@wulaoer bin]# pwd /root/DataX/target/datax/datax/bin [root@ks-allinone bin]# python datax.py -r mongodbreader -w elasticsearchwriter #同步顺序python datax.py -r 源数据 -w 目标数据 DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the mongodbreader document: https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md Please refer to the elasticsearchwriter document: https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job. { "job": { "content": [ { "reader": { "name": "mongodbreader", "parameter": { "address": [], "collectionName": "", "column": [], "dbName": "", "userName": "", "userPassword": "" } }, "writer": { "name": "elasticsearchwriter", "parameter": { "accessId": "xxxx", "accessKey": "xxxx", "batchSize": 1000, "cleanup": true, "column": [], "discovery": false, "endpoint": "http://xxx:9999", "index": "test-1", "indextype": "default", "settings": { "index": { "number_of_replicas": 0, "number_of_shards": 1 } }, "splitter": "," } } } ], "setting": { "speed": { "channel": "" } } } }
上面的意思就是参考mongodbreader和elasticsearchwriter的文件,根据参数添加自己环境的值,添加的时候需要即可格式我这里使用的mongodb同步到mysql和es两个试验。
mongodb同步mysql
先说一下mongodb同步到mysql,这是我的配置文件:
{ "job": { "content": [{ "reader": { "name": "mongodbreader", "parameter": { "address": ["127.0.0.1:3717"], "collectionName": "集合名称", "column": [{ "name": "_class", "type": "string" }, { "name": "_id", "type": "string" }, { "name": "clientType", "type": "string" }, { "name": "commentNeold", "type": "string" }, { "name": "count", "type": "long" }, { "name": "createTime", "type": "long" }, { "name": "episodeNo", "type": "string" }, { "name": "extra_status", "type": "long" }, { "name": "modifyTime", "type": "long" }, { "name": "neolds", "type": "string" }, { "name": "status", "type": "long" }, { "name": "titleNo", "type": "string" }, { "name": "type", "type": "long" } ], "dbName": "mongodb库名", "userName": "用户名", "userPassword": "密码" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [ "_class", "_id", "clientType", "commentNeold", "count", "createTime", "episodeNo", "extra_status", "modifyTime", "neolds", "status", "titleNo", "type" ], "connection": [{ "jdbcUrl": "jdbc:mysql://10.211.55.36:3306/库名", "table": ["表名"] }], "password": "密码", "preSql": ["delete from 表名"], "session": ["set session sql_mode='ANSI'"], "username": "用户名", "writeMode": "insert" } } }], "setting": { "speed": { "channel": "10" } } } }
这里注意在同步之前一定要按照上面的格式在mysql中创建字段,然后同步,同步命令:
[root@wulaoer bin]# python datax.py mongodb_mysql.json ............... 2019-10-23 11:58:12.829 [job-0] INFO JobContainer - PerfTrace not enable! 2019-10-23 11:58:12.830 [job-0] INFO StandAloneJobContainerCommunicator - Total 1 records, 148 bytes | Speed 14B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 3.306s | Percentage 100.00% 2019-10-23 11:58:12.832 [job-0] INFO JobContainer - 任务启动时刻 : 2019-10-23 11:57:56 任务结束时刻 : 2019-10-23 11:58:12 任务总计耗时 : 16s 任务平均流量 : 14B/s 记录写入速度 : 0rec/s 读出记录总数 : 1 读写失败总数 : 0
mongodb_mysql.json就是我的mongodb同步到mysql的配置文件,在执行的时候会打印mongodb_mysql.json的内容,密码会进行加密。
mongodb同步到elasticsearch
同步到elasticsearch也是一样的执行方法,可以看一下我配置的mongodb_elasticsearch.json文件内容
{ "job": { "content": [{ "reader": { "name": "mongodbreader", "parameter": { "address": ["127.0.0.1:3717"], "collectionName": "集合名称", "column": [{ "name": "_id", "type": "string" }, { "name": "_id", "type": "string" }, { "name": "secret", "type": "boolean" } ], "dbName": "库名", "userName": "用户名", "userPassword": "密码" } }, "writer": { "job": { "content": [{ "writer": { "name": "elasticsearchwriter", "parameter": { "accessId": "用户名", "accessKey": "密码", "batchSize": 1000, "cleanup": true, "writeMode": "insert", "column": [{ "name": "pk", "type": "id" }, { "name": "id", "type": "keywork" }, { "name": "secret", "type": "boolean" } ], "discovery": false, "endpoint": "http://127.0.0.1:9200", "index": "索引", "settings": { "index": { "number_of_replicas": 0, "number_of_shards": 1 } }, "splitter": ",", "type": "类型" } } }], "setting": { "speed": { "channel": 1 } } } } }], "setting": { "speed": { "throttle": false, "concurrent": 4 } } } }
这里我把mongodb中的_id数据同步了两次,一次同步到elasticsearch的_id,一次同步到id。elasticsearch中的_id和id中的数据是一样的。string类型需要改成keywork。还有一个就是写入的方式,就是writeMode的字段,写入方式分为两种:一种是insert into,一种是overwrite into,前一种是追加插入,后一种是删除原始数据重新插入。测试了一下insert into模式,同步过去的数据会删除原来的数据,在插入数据。这里注意需要修改一下cleanup,修改成false,即可。每次同步的快慢需要参考并发数和网络以及进程数。
打包过程中出现报错
[ERROR] Failed to execute goal on project otsstreamreader: Could not resolve dependencies for project com.alibaba.datax:otsstreamreader:jar:0.0.1-SNAPSHOT: Could not find artifact com.aliyun.openservices:tablestore-streamclient:jar:1.0.0-SNAPSHOT -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn <goals> -rf :otsstreamreader
解决方法:
[root@wulaoer DataX]# vim otsstreamreader/pom.xml <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>tablestore-streamclient</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> 替换成 <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>tablestore-streamclient</artifactId> <version>1.0.0</version> </dependency>
从新打包一下:
[INFO] Reactor Summary: [INFO] [INFO] datax-all 0.0.1-SNAPSHOT ........................... SUCCESS [02:07 min] [INFO] datax-common ....................................... SUCCESS [ 2.273 s] [INFO] datax-transformer .................................. SUCCESS [ 1.429 s] [INFO] datax-core ......................................... SUCCESS [ 3.002 s] [INFO] plugin-rdbms-util .................................. SUCCESS [ 1.135 s] [INFO] mysqlreader ........................................ SUCCESS [ 1.040 s] [INFO] drdsreader ......................................... SUCCESS [ 1.034 s] [INFO] sqlserverreader .................................... SUCCESS [ 1.002 s] [INFO] postgresqlreader ................................... SUCCESS [ 1.217 s] [INFO] oraclereader ....................................... SUCCESS [ 1.099 s] [INFO] odpsreader ......................................... SUCCESS [ 1.920 s] [INFO] otsreader .......................................... SUCCESS [ 2.478 s] [INFO] otsstreamreader .................................... SUCCESS [ 2.150 s] [INFO] plugin-unstructured-storage-util ................... SUCCESS [ 1.103 s] [INFO] txtfilereader ...................................... SUCCESS [ 3.586 s] [INFO] hdfsreader ......................................... SUCCESS [ 10.375 s] [INFO] streamreader ....................................... SUCCESS [ 0.991 s] [INFO] ossreader .......................................... SUCCESS [ 3.418 s] [INFO] ftpreader .......................................... SUCCESS [ 2.967 s] [INFO] mongodbreader ...................................... SUCCESS [ 3.235 s] [INFO] rdbmsreader ........................................ SUCCESS [ 1.007 s] [INFO] hbase11xreader ..................................... SUCCESS [ 4.525 s] [INFO] hbase094xreader .................................... SUCCESS [ 3.232 s] [INFO] opentsdbreader ..................................... SUCCESS [ 2.277 s] [INFO] mysqlwriter ........................................ SUCCESS [ 0.776 s] [INFO] drdswriter ......................................... SUCCESS [ 0.774 s] [INFO] odpswriter ......................................... SUCCESS [ 1.597 s] [INFO] txtfilewriter ...................................... SUCCESS [ 2.691 s] [INFO] ftpwriter .......................................... SUCCESS [ 2.783 s] [INFO] hdfswriter ......................................... SUCCESS [ 8.503 s] [INFO] streamwriter ....................................... SUCCESS [ 0.925 s] [INFO] otswriter .......................................... SUCCESS [ 1.739 s] [INFO] oraclewriter ....................................... SUCCESS [ 0.996 s] [INFO] sqlserverwriter .................................... SUCCESS [ 0.969 s] [INFO] postgresqlwriter ................................... SUCCESS [ 2.233 s] [INFO] osswriter .......................................... SUCCESS [ 8.041 s] [INFO] mongodbwriter ...................................... SUCCESS [ 10.075 s] [INFO] adswriter .......................................... SUCCESS [ 7.073 s] [INFO] ocswriter .......................................... SUCCESS [ 5.983 s] [INFO] rdbmswriter ........................................ SUCCESS [ 2.679 s] [INFO] hbase11xwriter ..................................... SUCCESS [ 7.409 s] [INFO] hbase094xwriter .................................... SUCCESS [ 3.524 s] [INFO] hbase11xsqlwriter .................................. SUCCESS [ 17.167 s] [INFO] hbase11xsqlreader .................................. SUCCESS [ 28.462 s] [INFO] elasticsearchwriter ................................ SUCCESS [ 3.873 s] [INFO] tsdbwriter ......................................... SUCCESS [ 2.598 s] [INFO] adbpgwriter ........................................ SUCCESS [ 4.083 s] [INFO] gdbwriter .......................................... SUCCESS [ 5.756 s] [INFO] hbase20xsqlreader .................................. SUCCESS [ 1.134 s] [INFO] hbase20xsqlwriter 0.0.1-SNAPSHOT ................... SUCCESS [ 1.056 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 05:17 min [INFO] Finished at: 2019-09-18T01:09:39+08:00 [INFO] ------------------------------------------------------------------------
这里打包是DataX中所有服务的插件都打包,我们用不了那么多,所以可以根据自己的需要删除一下不需要的应用,这样防止错误发生和减少一些不必要的麻烦。
我只用mongodb的读插件,同步到elasticsearch,需要需要elasticsearch的写插件,下面开始编译。
出现问题:
同步到时候出现问题
DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. 2019-10-22 18:40:25.828 [main] WARN ConfigParser - 插件[mongodbreader,null]加载失败,1s后重试... Exception:Code:[Framework-12], Description:[DataX插件初始化错误, 该问题通常是由于DataX安装错误引起,请联系您的运维解决 .]. - 插件加载失败,未完成指定插件加载:[null, mongodbreader] 2019-10-22 18:40:26.835 [main] ERROR Engine - 经DataX智能分析,该任务最可能的错误原因是: com.alibaba.datax.common.exception.DataXException: Code:[Framework-12], Description:[DataX插件初始化错误, 该问题通常是由于DataX安装错误引起,请联系您的运维解决 .]. - 插件加载失败,未完成指定插件加载:[null, mongodbreader] at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26) at com.alibaba.datax.core.util.ConfigParser.parsePluginConfig(ConfigParser.java:142) at com.alibaba.datax.core.util.ConfigParser.parse(ConfigParser.java:63) at com.alibaba.datax.core.Engine.entry(Engine.java:137) at com.alibaba.datax.core.Engine.main(Engine.java:204)
解决方法:
配置文件的问题,使用了
https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md 中的job.json内容作为elasticsearchwriter的plugin_job_template.json的内容,上面虽然提示的是mongodbreader,但是我使用mongodb同步到mysql中就没有这个错误,所以我怀疑是因为我的elasticsearchwriter中的plugin_job_template.json配置错误。所以修改成:
{ "name": "elasticsearchwriter", "parameter": { "endpoint": "http://xxx:9999", "accessId": "xxxx", "accessKey": "xxxx", "index": "test-1", "indextype": "default", "cleanup": true, "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}}, "discovery": false, "batchSize": 1000, "splitter": ",", "column": [] } }
在重新生成同步执行文件,就没有上面的错误了。问题解决了,如果也遇到这种问题建议先同步到自己比较熟悉的应用上,然后在同步其他的应用。 出现问题:
2019-10-24 14:20:30.561 [job-53135295] ERROR ESClient - {"root_cause":[{"type":"mapper_parsing_exception","reason":"No handler for type [string] declared on field [categoryImage]"}],"type":"mapper_parsing_exception","reason":"No handler for type [string] declared on field [categoryImage]"} 2019-10-24 14:20:30.566 [job-53135295] ERROR JobContainer - Exception when job run com.alibaba.datax.common.exception.DataXException: Code:[ESWriter-03], Description:[mappings错误.]. - java.io.IOException: create index or mapping failed at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:34) ~[datax-common-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.writer.elasticsearchwriter.ESWriter$Job.prepare(ESWriter.java:94) ~[elasticsearchwriter-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.core.job.JobContainer.prepareJobWriter(JobContainer.java:1068) ~[datax-core-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.core.job.JobContainer.prepare(JobContainer.java:457) ~[datax-core-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:213) ~[datax-core-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.core.Engine.start(Engine.java:96) [datax-core-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.core.Engine.entry(Engine.java:246) [datax-core-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.core.Engine.main(Engine.java:279) [datax-core-0.0.1-SNAPSHOT.jar:na] 2019-10-24 14:20:30.677 [job-53135295] INFO MetricReportUtil - reportJobMetric is turn off 2019-10-24 14:20:30.678 [job-53135295] INFO LocalJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00%
您可以选择一种方式赞助本站
支付宝扫一扫赞助
微信钱包扫描赞助
赏