扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这篇文章主要介绍oracle数据如何通过goldengate实时同步到kafka消息队列中,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
创新互联公司成立于2013年,先为昭通等服务建站,昭通等地企业,进行企业商务咨询服务。为昭通企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。
组件版本
组件 | 版本 | 描述 |
源端oracle | oracle 11.2.0.4 for linux x64 | 源端oracle |
源端ogg | oracle ogg 11.2.0.1.20 for oracle linux x64 | 源端ogg,用于抽取源端oracle的数据变更,并将变更日志发送目标端 |
目标端kafka | kafka_2.11-0.11.0.2 for linux x64 | 消息队列,接收目标端ogg推送过来的数据 |
目标端ogg | 目标端ogg,接收源端发送的oracle事物变更日志,并将变更推送到kafka消息队列中 |
1.OGG Manager
OGG Manager用于配置和管理其它OGG组件,配置数据抽取、数据推送、数据复制,启动和停止相关组件,查看相关组件的运行情况。
2.数据抽取(Extract)
抽取源端数据库的变更(DML, DDL)。数据抽取主要分如下几种类型:本地抽取从本地数据库捕获增量变更数据,写入到本地Trail文件数据推送(Data Pump)从本地Trail文件读取数据,推送到目标端。初始数据抽取从数据库表中导出全量数据,用于初次数据加载
3.数据推送(Data Pump)
Data Pump是一种特殊的数据抽取(Extract)类型,从本地Trail文件中读取数据,并通过网络将数据发送到目标端OGG
4.Trail文件
数据抽取从源端数据库抓取到的事物变更信息会写入到Trail文件。
5.数据接收(Collector)
数据接收程序运行在目标端机器,用于接收Data Pump发送过来的Trail日志,并将数据写入到本地Trail文件。
6.数据复制(Replicat)
数据复制运行在目标端机器,从Trail文件读取数据变更,并将变更数据应用到目标端数据存储系统。本案例中,数据复制将数据推送到kafka消息队列。
7.检查点(Checkpoint)
检查点用于记录数据库事物变更。
源端Oracle数据库配置
开启源端归档
SQL> archive log list
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /u01/app/oracle/product/11.2.3/db_1/dbs/arch
Oldest online log sequence 12
Next log sequence to archive 17
Current log sequence 17
若为打开归档解决如下:
conn / as sysdba (以DBA身份连接数据库)
shutdown immediate (立即关闭数据库)
startup mount (启动实例并加载数据库,但不打开)
alter database archivelog; (更改数据库为归档模式)
alter database open; (打开数据库)
alter system archive log start; (启用自动归档)
2)OGG基于辅助日志等进行实时传输,故需要打开相关日志确保可获取事务内容,通过下面的命令查看该状态
SQL> select force_logging, supplemental_log_data_min,supplemental_log_data_all from v$database;
FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI
---------- ------------------------
YES YES
如果没有开启辅助日志,需要开启
SQL> alter database force logging;
SQL> alter database add supplemental log data;
SQL>alter database add supplemental log data(all) columns;
3.开启goldengate复制参数
SQL> alter system set enable_goldengate_replication = true;
4.创建源端Oracle账号
SQL> create tablespace tbs_ogg datafile '/oradata/dtstack/tbs_ogg.dbf' size 1024M autoextend on;
SQL> create user ggsadmin identified by oracle default tablespace tbs_ogg;
SQL> grant dba to ggsadmin;
5.创建测试表 (生产略)
SQL> create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id < 500;
SQL> alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key(object_id);
SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)
----------
436
部署ogg
源端 (oracle源端)
1、解压
先建立ogg目录
mkdir -p /ogg
tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /ogg
chown -R oracle:oinstall /ogg (使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功)
2配置ogg环境变量
为了简单方便起见,建议在生产中配置oracle的环境变量文件/home/oracle/.bash_profile里配置
export JAVA_HOME=/usr/local/java1.8
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar
export JAVA=$JAVA_HOME/bin/java
export OGG_HOME=/ogg
export PATH=$PATH:$OGG_HOME
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so
生效环境变量
source /home/oracle/.bash_profile
3、OGG初始化
ggsci
create subdirs
ggsci
Oracle GoldenGate Command Interpreter for Oracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
GGSCI (ambari.master.com) 1> create subdirs
Creating subdirectories under current directory /root
Parameter files /root/dirprm: created
Report files /root/dirrpt: created
Checkpoint files /root/dirchk: created
Process status files /root/dirpcs: created
SQL script files /root/dirsql: created
Database definitions files /root/dirdef: created
Extract data files /root/dirdat: created
Temporary files /root/dirtmp: created
Stdout files /root/dirout: created
4、配置源端Manager
GGSCI (dtproxy) 4> dblogin userid ggsadmin password oracle
GGSCI (dtproxy as ggsadmin@dtstack) 5> edit param ./globals
--添加
oggschema ggsadmin
GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr
----添加
PORT 7810 --默认监听端口
DYNAMICPORTLIST 7811-7820 --动态端口列表
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --进程有问题,每3分钟重启一次,一共重启五次
PURGEOLDEXTRACTS ./dirdat/, USECHECKPOINTS, MINKEEPDAYS 7 --/
LAGREPORTHOURS 1 --每隔一小时检查一次传输延迟情况
LAGINFOMINUTES 30 --传输延时超过30分钟将写入错误日志
LAGCRITICALMINUTES 45 --传输延时超过45分钟将写入警告日志
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件
--ACCESSRULE, PROG , IPADDR 172..., ALLOW --设定172网段可连接
5、添加同步表级别日志
GGSCI (dtproxy as ggsadmin@dtstack) 9> add trandata baiyang.ora_to_kfk
GGSCI (dtproxy as ggsadmin@dtstack) 10> info trandata baiyang.ora_to_kfk
目标端 (kafka目标端)
1、解压
mkdir -p /ogg
unzip V839824-01.zip
tar xf ggs_Adapters_Linux_x64.tar -C /ogg/
2配置ogg环境变量
为了简单方便起见,建议在生产中配置oracle的环境变量文件/home/oracle/.bash_profile里配置
export JAVA_HOME=/usr/local/java1.8/jre
export PATH=$JAVA_HOME/bin:$PATH
export LD_LIBRARY_PATH=$JAVA_HOME/lib/amd64/server:$JAVA_HOME/lib/amd64:$LD_LIBRARY_PATH
export OGG_HOME=/ogg
export PATH=$PATH:$OGG_HOME
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so
生效环境变量
source /home/oracle/.bash_profile
OGG初始化
ggsci
create subdirs
ggsci
Oracle GoldenGate Command Interpreter for Oracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
GGSCI (ambari.master.com) 1> create subdirs
Creating subdirectories under current directory /root
Parameter files /root/dirprm: created
Report files /root/dirrpt: created
Checkpoint files /root/dirchk: created
Process status files /root/dirpcs: created
SQL script files /root/dirsql: created
Database definitions files /root/dirdef: created
Extract data files /root/dirdat: created
Temporary files /root/dirtmp: created
Stdout files /root/dirout: created
配置源端Manager
GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr
----添加
PORT 7810 --默认监听端口
DYNAMICPORTLIST 7811-7820 --动态端口列表
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --进程有问题,每3分钟重启一次,一共重启五次
PURGEOLDEXTRACTS ./dirdat/, USECHECKPOINTS, MINKEEPDAYS 7
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件
--ACCESSRULE, PROG , IPADDR 172..., ALLOW --设定172网段可连接
GGSCI (172-16-101-242) 4> edit param ./GLOBALS
--添加
CHECKPOINTTABLE ggsadmin.checkpoint
全量数据同步(oracle to kafka)
1. 配置源端数据初始化
1) 配置源端初始化进程
GGSCI (dtproxy as ggsadmin@dtstack) 15> add extract initkfk,sourceistable
2) 配置源端初始化参数
GGSCI (dtproxy as ggsadmin@dtstack) 16> edit params initkfk
EXTRACT initkfk
SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
USERID ggsadmin,PASSWORD oracle
RMTHOST 172.16.101.242, MGRPORT 7810
RMTFILE ./dirdat/ek,maxfiles 999, megabytes 500
table baiyang.ora_to_kfk;
3)源端生成表结构define文件
GGSCI (dtproxy as ggsadmin@dtstack) 17> edit param define_kfk
-- 添加
defsfile /ogg/dirdef/define_kfk.txt
userid ggsadmin,password oracle
table baiyang.ora_to_kfk;
4)获取oracle全量数据
$cd /ogg
$./defgen paramfile dirprm/define_kfk.prm
-- Definitions generated for 1 table in /oradata/oggorcl/ogg/dirdef/define_kfk.txt
5) 将获取全量数据记录传送到目标端
- 将此文件传输到目标段dirdef文件夹
scp /ogg/dirdef/define_kfk.txt 172.16.101.242:/ogg/dirdef/define_kfk.txt
2、配置目标端数据初始化进程
1)配置目标端初始化进程
GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrun
2)配置目标端初始化参数
GGSCI (172-16-101-242) 6> edit params initkfk
-- 添加
SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
targetdb libfile libggjava.so set property=./dirprm/kafka.props
SOURCEDEFS ./dirdef/define_kfk.txt
REPLACEBADCHAR SKIP
SOURCECHARSET OVERRIDE ISO-8859-1
EXTFILE ./dirdat/ek
reportcount every 1 minutes, rate
grouptransops 10000
map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;
3)配置ogg 针对kafka相关参数
vi ./dirprm/kafka.props
--添加
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.format.includePrimaryKeys=true
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicName=test_ogg --旧版参数,本次使用旧版参数
#gg.handler.kafkahandler.topicMappingTemplate=test_ogg –-新版本参数
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/kafka/libs/*:/ogg/:/ogg/lib/*
kafka 安装的位置 ogg安装的位置
将./dirprm/kafka.props 文件复制到/ogg/AdapterExamples/big-data/kafka 目录下
vi ./dirprm/custom_kafka_producer.properties
bootstrap.servers=172.16.101.242:9092 ---kafka地址
acks=-1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400 --数据有堆积
linger.ms=10000 --数据传输kafka有延时
将./dirprm/custom_kafka_producer.properties 文件复制到/ogg/AdapterExamples/big-data/kafka
3、开启抽取全量任务
源端:
GGSCI (dtproxy) 20> start mgr
GGSCI (dtproxy) 21> start initkfk
目标端全量数据应用
GGSCI (172-16-101-242) 13> start mgr
cd /ogg
./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD
--查看应用日志是否有错误
cd /opt/ogg/dirrpt
more init01.rpt
4、验证kafka全量数据
cd /kafka
bin/kafka-console-consumer.sh --bootstrap-server 172.16.101.242:9092 --topic test_ogg --from-beginning
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}}
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}
全量数据已经同步到目标kafka topic test_ogg
增量数据同步(oracle to kafka)
源端配置
1. 源端抽取进程配置
GGSCI (dtproxy) 9> edit param extkfk
-- 添加
extract extkfk
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ggsadmin,password oracle
FETCHOPTIONS NOUSESNAPSHOT
GETUPDATEBEFORES
NOCOMPRESSDELETES
NOCOMPRESSUPDATES
exttrail ./dirdat/to
table baiyang.ora_to_kfk;
2、添加extract进程
GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now
GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk
3、配置源端推送进程
GGSCI (dtproxy) 12> edit param pupkfk
-- 添加
extract pupkfk
passthru
dynamicresolution
userid ggsadmin,password oracle
rmthost 172.16.101.242 mgrport 7810
rmttrail ./dirdat/to
table baiyang.ora_to_kfk;
4、添加投递进程
GGSCI (dtproxy) 13> add extract pupkfk,exttrailsource ./dirdat/to
GGSCI (dtproxy) 14> add rmttrail ./dirdat/to,extract pupkfk
目标端配置
1、 配置目标端恢复进程
edit param repkfk
-- 添加
REPLICAT repkfk
SOURCEDEFS ./dirdef/define_kfk.txt
targetdb libfile libggjava.so set property=./dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;
2、 添加trail文件到replicate进程
add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint
开启增量实时数据抓取
源端:
./ggsci
GGSCI (dtproxy) 5> start extkfk
Sending START request to MANAGER ...
EXTRACT EXTKFK starting
GGSCI (dtproxy) 6> start pupkfk
Sending START request to MANAGER ...
EXTRACT PUPKFK starting
GGSCI (dtproxy) 7> status all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKFK 00:00:00 00:00:10
EXTRACT RUNNING PUPKFK 00:00:00 00:00:00
目标端:
/ggsci
GGSCI (172-16-101-242) 7> start replicat repkfk
Sending START request to MANAGER ...
REPLICAT REPKFK starting
GGSCI (172-16-101-242) 8> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REPKFK 00:00:00 00:00:00
测试增量数据抓取
源端:
Oracle插入增量数据
SQL> insert into baiyang.ora_to_kfk select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id >500 and object_id < 1000;
SQL> commit;
SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)
905
目标端:
查看Kafka消息队列消费数据
cd /kafka
bin/kafka-console-consumer.sh --bootstrap-server 172.16.101.242:9092 --topic test_ogg
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042000","pos":"00000000000000075298","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS","SUBOBJECT_NAME":null,"OBJECT_ID":998,"DATA_OBJECT_ID":998,"OBJECT_TYPE":"TABLE"}}
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042001","pos":"00000000000000075459","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS_I","SUBOBJECT_NAME":null,"OBJECT_ID":999,"DATA_OBJECT_ID":999,"OBJECT_TYPE":"INDEX"}}
DDL操作
如果ogg 源端,也就是oracle 端的表增加字段或者删除字段,或者修改字段等等,只要是修改表结构定义的,就算是DDL操作,在ogg for bigdata 12.2 稳定版本中,目前是不支持同步ddl语句的,在12,3版本以后会进行ddl支持。
在12.2 ogg for bigdata 中,源端如果做ddl,需要在源端的定义表结构文件中重新生成define_kfk.txt文件的定义,并将define_kfk.txt文件传输到目标端中。
举例说明:
源端:(oracle端)
1) 源表添加id字段
alter table ORA_TO_KFK add id number;
2) ogg 源端需要重新生成表定义文件
mv /ogg/dirdef/define_kfk.txt /ogg/dirdef/define_kfk.txt.bak1
cd /ogg
/defgen paramfile dirprm/define_kfk.prm
3) 将生成的表定义文件scp 到目标端
cd /ogg
scp ./dirdef/define_kfk.txt root@192.168.56.57:/ogg/dirdef/
4) 源端抽取进程需要重启
GGSCI (edsir1p9) 2> stop EXTKFK
Sending STOP request to EXTRACT EXTKFK ...
Request processed.
GGSCI (edsir1p9) 3> start EXTKFK
Sending START request to MANAGER ...
EXTRACT EXTKFK starting
GGSCI (edsir1p9) 4> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKFK 00:00:00 00:00:08
EXTRACT RUNNING PUPKFK 00:00:00 00:00:07
目标端:(kafka端)
1)查看目标端的应用进程发生了abend
GGSCI (node) 38> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT ABENDED REPKFK 00:10:27 00:05:29
2)启动复制进程
GGSCI (node) 40> start REPKFK
Sending START request to MANAGER ...
REPLICAT REPKFK starting
GGSCI (node) 9> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REPKFK 00:00:00 00:00:04
测试:
源端插入一条数据
SQL> insert into ORA_TO_KFK(OWNER,OBJECT_NAME,OBJECT_ID,ID) values ('gg','gg',876,9);
1 row created.
SQL> commit;
目标端:
cd /kafka
bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.57:9092 --topic ogg_test
数据已经从源端oracle同步到目标端kafka中。至此oracle新添加一列,可以正常同步到kafka中。
以上是“oracle数据如何通过goldengate实时同步到kafka消息队列中”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注创新互联行业资讯频道!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流