数据同步工具有很多中,下面我们看一下阿里云的开源工具DataX,DataX已经在阿里云的Dataworks中已经在使用了,已经支持很多中主流的存储服务之间的相互转换,下面的实验主要是介绍mongodb数据同步到elaticsearch,和mongodb同步到mysql的两种同步方式,有一些在实验过程中遇到的问题值得参考,因为在网上找的时候没有找到任何一个比较全面的记录,还有遇到问题的比较清晰一点的回答,所以遇到了很多的坑,不过这些都不是重点,重点是能够理解并使用即可,下面就详细一点的介绍一下我的实验记录。
支持的服务
可以参考官网:https://github.com/alibaba/DataX
安装
环境要求:
1、JDK 1.6 + 2、maven 3.X + 3、python 2.6 +
mongodb是4.0的分片集群,elasticsearch版本是6.7,mysql版本是5.6
下载源码包
git clone https://github.com/alibaba/DataX.git
进入源码包进行进行编译打包
cd DataX mvn -U clean package assembly:assembly -Dmaven.test.skip=true
注:jdk建议1.8版本,1.7版本在编译的时候出现问题,使用1.8就没有错误。
这里打包的是所有插件,也可以根据自己的需求,修改pom.xml文件的modules模块,只保留自己需要的插件即可,其他的可以删除
[root@wulaoer DataX]# pwd
/root/DataX
[root@wulaoer DataX]# vim pom.xml
......................
<!-- reader -->
<module>mongodbreader</module>
<!-- writer -->
<module>elasticsearchwriter</module>
.....................
这里我们只编译mongodb的读插件和elasticsearch的写插件,所以只保留这两个。
DataX编译
下面进行编译
[root@wulaoer DataX]# mvn -U clean package assembly:assembly -Dmaven.test.skip=true
编译成功出现如下信息:
oyment. [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary for datax-all 0.0.1-SNAPSHOT: [INFO] [INFO] datax-all .......................................... SUCCESS [ 11.335 s] [INFO] datax-common ....................................... SUCCESS [04:04 min] [INFO] datax-transformer .................................. SUCCESS [01:02 min] [INFO] datax-core ......................................... SUCCESS [ 14.754 s] [INFO] plugin-unstructured-storage-util ................... SUCCESS [ 46.987 s] [INFO] mongodbreader ...................................... SUCCESS [ 4.547 s] [INFO] elasticsearchwriter ................................ SUCCESS [ 8.606 s] [INFO] plugin-rdbms-util .................................. SUCCESS [ 3.649 s] [INFO] hbase20xsqlreader .................................. SUCCESS [ 9.678 s] [INFO] hbase20xsqlwriter .................................. SUCCESS [ 1.053 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 07:47 min [INFO] Finished at: 2019-10-22T14:55:47+08:00 [INFO] ------------------------------------------------------------------------
编译后默认elasticsearch写插件没有生成json文件,所以需要我们收到创建格式如下:
{
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://xxx:9999",
"accessId": "xxxx",
"accessKey": "xxxx",
"index": "test-1",
"indextype": "default",
"cleanup": true,
"settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": []
},
"speed": {
"concurrent": 4,
"throttle": true,
"mbps": "2"
}
}
把上面的内容写到这个文件中,然后在配置信息
[root@wulaoer DataX]# vim target/datax/datax/plugin/writer/elasticsearchwriter/plugin_job_template.json
配置信息已经配置完成,需要生成一下配置文件。
[root@wulaoer bin]# pwd
/root/DataX/target/datax/datax/bin
[root@ks-allinone bin]# python datax.py -r mongodbreader -w elasticsearchwriter
#同步顺序python datax.py -r 源数据 -w 目标数据
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the mongodbreader document:
https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md
Please refer to the elasticsearchwriter document:
https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": [],
"collectionName": "",
"column": [],
"dbName": "",
"userName": "",
"userPassword": ""
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"accessId": "xxxx",
"accessKey": "xxxx",
"batchSize": 1000,
"cleanup": true,
"column": [],
"discovery": false,
"endpoint": "http://xxx:9999",
"index": "test-1",
"indextype": "default",
"settings": {
"index": {
"number_of_replicas": 0,
"number_of_shards": 1
}
},
"splitter": ","
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
上面的意思就是参考mongodbreader和elasticsearchwriter的文件,根据参数添加自己环境的值,添加的时候需要即可格式我这里使用的mongodb同步到mysql和es两个试验。
mongodb同步mysql
先说一下mongodb同步到mysql,这是我的配置文件:
{
"job": {
"content": [{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["127.0.0.1:3717"],
"collectionName": "集合名称",
"column": [{
"name": "_class",
"type": "string"
},
{
"name": "_id",
"type": "string"
},
{
"name": "clientType",
"type": "string"
},
{
"name": "commentNeold",
"type": "string"
},
{
"name": "count",
"type": "long"
},
{
"name": "createTime",
"type": "long"
},
{
"name": "episodeNo",
"type": "string"
},
{
"name": "extra_status",
"type": "long"
},
{
"name": "modifyTime",
"type": "long"
},
{
"name": "neolds",
"type": "string"
},
{
"name": "status",
"type": "long"
},
{
"name": "titleNo",
"type": "string"
},
{
"name": "type",
"type": "long"
}
],
"dbName": "mongodb库名",
"userName": "用户名",
"userPassword": "密码"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"_class", "_id", "clientType", "commentNeold", "count", "createTime", "episodeNo", "extra_status", "modifyTime", "neolds", "status", "titleNo", "type"
],
"connection": [{
"jdbcUrl": "jdbc:mysql://10.211.55.36:3306/库名",
"table": ["表名"]
}],
"password": "密码",
"preSql": ["delete from 表名"],
"session": ["set session sql_mode='ANSI'"],
"username": "用户名",
"writeMode": "insert"
}
}
}],
"setting": {
"speed": {
"channel": "10"
}
}
}
}
这里注意在同步之前一定要按照上面的格式在mysql中创建字段,然后同步,同步命令:
[root@wulaoer bin]# python datax.py mongodb_mysql.json ............... 2019-10-23 11:58:12.829 [job-0] INFO JobContainer - PerfTrace not enable! 2019-10-23 11:58:12.830 [job-0] INFO StandAloneJobContainerCommunicator - Total 1 records, 148 bytes | Speed 14B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 3.306s | Percentage 100.00% 2019-10-23 11:58:12.832 [job-0] INFO JobContainer - 任务启动时刻 : 2019-10-23 11:57:56 任务结束时刻 : 2019-10-23 11:58:12 任务总计耗时 : 16s 任务平均流量 : 14B/s 记录写入速度 : 0rec/s 读出记录总数 : 1 读写失败总数 : 0
mongodb_mysql.json就是我的mongodb同步到mysql的配置文件,在执行的时候会打印mongodb_mysql.json的内容,密码会进行加密。
mongodb同步到elasticsearch
同步到elasticsearch也是一样的执行方法,可以看一下我配置的mongodb_elasticsearch.json文件内容
{
"job": {
"content": [{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["127.0.0.1:3717"],
"collectionName": "集合名称",
"column": [{
"name": "_id",
"type": "string"
},
{
"name": "_id",
"type": "string"
},
{
"name": "secret",
"type": "boolean"
}
],
"dbName": "库名",
"userName": "用户名",
"userPassword": "密码"
}
},
"writer": {
"job": {
"content": [{
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"accessId": "用户名",
"accessKey": "密码",
"batchSize": 1000,
"cleanup": true,
"writeMode": "insert",
"column": [{
"name": "pk",
"type": "id"
},
{
"name": "id",
"type": "keywork"
},
{
"name": "secret",
"type": "boolean"
}
],
"discovery": false,
"endpoint": "http://127.0.0.1:9200",
"index": "索引",
"settings": {
"index": {
"number_of_replicas": 0,
"number_of_shards": 1
}
},
"splitter": ",",
"type": "类型"
}
}
}],
"setting": {
"speed": {
"channel": 1
}
}
}
}
}],
"setting": {
"speed": {
"throttle": false,
"concurrent": 4
}
}
}
}
这里我把mongodb中的_id数据同步了两次,一次同步到elasticsearch的_id,一次同步到id。elasticsearch中的_id和id中的数据是一样的。string类型需要改成keywork。还有一个就是写入的方式,就是writeMode的字段,写入方式分为两种:一种是insert into,一种是overwrite into,前一种是追加插入,后一种是删除原始数据重新插入。测试了一下insert into模式,同步过去的数据会删除原来的数据,在插入数据。这里注意需要修改一下cleanup,修改成false,即可。每次同步的快慢需要参考并发数和网络以及进程数。
打包过程中出现报错
[ERROR] Failed to execute goal on project otsstreamreader: Could not resolve dependencies for project com.alibaba.datax:otsstreamreader:jar:0.0.1-SNAPSHOT: Could not find artifact com.aliyun.openservices:tablestore-streamclient:jar:1.0.0-SNAPSHOT -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn <goals> -rf :otsstreamreader
解决方法:
[root@wulaoer DataX]# vim otsstreamreader/pom.xml
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore-streamclient</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
替换成
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore-streamclient</artifactId>
<version>1.0.0</version>
</dependency>
从新打包一下:
[INFO] Reactor Summary: [INFO] [INFO] datax-all 0.0.1-SNAPSHOT ........................... SUCCESS [02:07 min] [INFO] datax-common ....................................... SUCCESS [ 2.273 s] [INFO] datax-transformer .................................. SUCCESS [ 1.429 s] [INFO] datax-core ......................................... SUCCESS [ 3.002 s] [INFO] plugin-rdbms-util .................................. SUCCESS [ 1.135 s] [INFO] mysqlreader ........................................ SUCCESS [ 1.040 s] [INFO] drdsreader ......................................... SUCCESS [ 1.034 s] [INFO] sqlserverreader .................................... SUCCESS [ 1.002 s] [INFO] postgresqlreader ................................... SUCCESS [ 1.217 s] [INFO] oraclereader ....................................... SUCCESS [ 1.099 s] [INFO] odpsreader ......................................... SUCCESS [ 1.920 s] [INFO] otsreader .......................................... SUCCESS [ 2.478 s] [INFO] otsstreamreader .................................... SUCCESS [ 2.150 s] [INFO] plugin-unstructured-storage-util ................... SUCCESS [ 1.103 s] [INFO] txtfilereader ...................................... SUCCESS [ 3.586 s] [INFO] hdfsreader ......................................... SUCCESS [ 10.375 s] [INFO] streamreader ....................................... SUCCESS [ 0.991 s] [INFO] ossreader .......................................... SUCCESS [ 3.418 s] [INFO] ftpreader .......................................... SUCCESS [ 2.967 s] [INFO] mongodbreader ...................................... SUCCESS [ 3.235 s] [INFO] rdbmsreader ........................................ SUCCESS [ 1.007 s] [INFO] hbase11xreader ..................................... SUCCESS [ 4.525 s] [INFO] hbase094xreader .................................... SUCCESS [ 3.232 s] [INFO] opentsdbreader ..................................... SUCCESS [ 2.277 s] [INFO] mysqlwriter ........................................ SUCCESS [ 0.776 s] [INFO] drdswriter ......................................... SUCCESS [ 0.774 s] [INFO] odpswriter ......................................... SUCCESS [ 1.597 s] [INFO] txtfilewriter ...................................... SUCCESS [ 2.691 s] [INFO] ftpwriter .......................................... SUCCESS [ 2.783 s] [INFO] hdfswriter ......................................... SUCCESS [ 8.503 s] [INFO] streamwriter ....................................... SUCCESS [ 0.925 s] [INFO] otswriter .......................................... SUCCESS [ 1.739 s] [INFO] oraclewriter ....................................... SUCCESS [ 0.996 s] [INFO] sqlserverwriter .................................... SUCCESS [ 0.969 s] [INFO] postgresqlwriter ................................... SUCCESS [ 2.233 s] [INFO] osswriter .......................................... SUCCESS [ 8.041 s] [INFO] mongodbwriter ...................................... SUCCESS [ 10.075 s] [INFO] adswriter .......................................... SUCCESS [ 7.073 s] [INFO] ocswriter .......................................... SUCCESS [ 5.983 s] [INFO] rdbmswriter ........................................ SUCCESS [ 2.679 s] [INFO] hbase11xwriter ..................................... SUCCESS [ 7.409 s] [INFO] hbase094xwriter .................................... SUCCESS [ 3.524 s] [INFO] hbase11xsqlwriter .................................. SUCCESS [ 17.167 s] [INFO] hbase11xsqlreader .................................. SUCCESS [ 28.462 s] [INFO] elasticsearchwriter ................................ SUCCESS [ 3.873 s] [INFO] tsdbwriter ......................................... SUCCESS [ 2.598 s] [INFO] adbpgwriter ........................................ SUCCESS [ 4.083 s] [INFO] gdbwriter .......................................... SUCCESS [ 5.756 s] [INFO] hbase20xsqlreader .................................. SUCCESS [ 1.134 s] [INFO] hbase20xsqlwriter 0.0.1-SNAPSHOT ................... SUCCESS [ 1.056 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 05:17 min [INFO] Finished at: 2019-09-18T01:09:39+08:00 [INFO] ------------------------------------------------------------------------
这里打包是DataX中所有服务的插件都打包,我们用不了那么多,所以可以根据自己的需要删除一下不需要的应用,这样防止错误发生和减少一些不必要的麻烦。
我只用mongodb的读插件,同步到elasticsearch,需要需要elasticsearch的写插件,下面开始编译。
出现问题:
同步到时候出现问题
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2019-10-22 18:40:25.828 [main] WARN ConfigParser - 插件[mongodbreader,null]加载失败,1s后重试... Exception:Code:[Framework-12], Description:[DataX插件初始化错误, 该问题通常是由于DataX安装错误引起,请联系您的运维解决 .]. - 插件加载失败,未完成指定插件加载:[null, mongodbreader]
2019-10-22 18:40:26.835 [main] ERROR Engine -
经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Framework-12], Description:[DataX插件初始化错误, 该问题通常是由于DataX安装错误引起,请联系您的运维解决 .]. - 插件加载失败,未完成指定插件加载:[null, mongodbreader]
at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
at com.alibaba.datax.core.util.ConfigParser.parsePluginConfig(ConfigParser.java:142)
at com.alibaba.datax.core.util.ConfigParser.parse(ConfigParser.java:63)
at com.alibaba.datax.core.Engine.entry(Engine.java:137)
at com.alibaba.datax.core.Engine.main(Engine.java:204)
解决方法:
配置文件的问题,使用了
https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md 中的job.json内容作为elasticsearchwriter的plugin_job_template.json的内容,上面虽然提示的是mongodbreader,但是我使用mongodb同步到mysql中就没有这个错误,所以我怀疑是因为我的elasticsearchwriter中的plugin_job_template.json配置错误。所以修改成:
{
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://xxx:9999",
"accessId": "xxxx",
"accessKey": "xxxx",
"index": "test-1",
"indextype": "default",
"cleanup": true,
"settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": []
}
}
在重新生成同步执行文件,就没有上面的错误了。问题解决了,如果也遇到这种问题建议先同步到自己比较熟悉的应用上,然后在同步其他的应用。 出现问题:
2019-10-24 14:20:30.561 [job-53135295] ERROR ESClient - {"root_cause":[{"type":"mapper_parsing_exception","reason":"No handler for type [string] declared on field [categoryImage]"}],"type":"mapper_parsing_exception","reason":"No handler for type [string] declared on field [categoryImage]"}
2019-10-24 14:20:30.566 [job-53135295] ERROR JobContainer - Exception when job run
com.alibaba.datax.common.exception.DataXException: Code:[ESWriter-03], Description:[mappings错误.]. - java.io.IOException: create index or mapping failed
at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:34) ~[datax-common-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.writer.elasticsearchwriter.ESWriter$Job.prepare(ESWriter.java:94) ~[elasticsearchwriter-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.job.JobContainer.prepareJobWriter(JobContainer.java:1068) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.job.JobContainer.prepare(JobContainer.java:457) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:213) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.Engine.start(Engine.java:96) [datax-core-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.Engine.entry(Engine.java:246) [datax-core-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.Engine.main(Engine.java:279) [datax-core-0.0.1-SNAPSHOT.jar:na]
2019-10-24 14:20:30.677 [job-53135295] INFO MetricReportUtil - reportJobMetric is turn off
2019-10-24 14:20:30.678 [job-53135295] INFO LocalJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00%

您可以选择一种方式赞助本站
支付宝扫一扫赞助
微信钱包扫描赞助
赏