创新的盐城网站建设,网站权限分配代码,做内贸的什么网站效果好,徐州网站建设技术托管Flink CDC系列之#xff1a;Oracle CDC 导入 Elasticsearch 一、深入理解Flink Oracle CDC Connector二、创建docker-compose.yml文件三、启动容器四、下载Flink Oracle CDC的jar包五、启动 Flink 集群#xff0c;再启动 SQL CLI六、检查 ElasticSearch 中的结果七、在 Oracl… Flink CDC系列之Oracle CDC 导入 Elasticsearch 一、深入理解Flink Oracle CDC Connector二、创建docker-compose.yml文件三、启动容器四、下载Flink Oracle CDC的jar包五、启动 Flink 集群再启动 SQL CLI六、检查 ElasticSearch 中的结果七、在 Oracle 制造一些变更观察 ElasticSearch 中的结果 一、深入理解Flink Oracle CDC Connector
Flink CDC系列之Oracle CDC Connector
二、创建docker-compose.yml文件
version: 2.1
services:oracle:image: yuxialuo/oracle-xe-11g-r2-cdc-demo:v1.0ports:- 1521:1521elasticsearch:image: elastic/elasticsearch:7.6.0environment:- cluster.namedocker-cluster- bootstrap.memory_locktrue- ES_JAVA_OPTS-Xms512m -Xmx512m- discovery.typesingle-nodeports:- 9200:9200- 9300:9300ulimits:memlock:soft: -1hard: -1nofile:soft: 65536hard: 65536kibana:image: elastic/kibana:7.6.0ports:- 5601:5601volumes:- /var/run/docker.sock:/var/run/docker.sock该 Docker Compose 中包含的容器有:
Oracle: Oracle 11g, 已经预先创建了 products 和 orders表并插入了一些数据Elasticsearch: orders 表将和 products 表进行joinjoin的结果写入Elasticsearch中Kibana: 可视化 Elasticsearch 中的数据
三、启动容器
在 docker-compose.yml 所在目录下运行如下命令以启动所有容器
docker-compose up -d该命令会以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。 你可以通过 docker ps 来观察上述的容器是否正常启动了。 也可以访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。 另外可以通过如下命令停止所有的容器
docker-compose down四、下载Flink Oracle CDC的jar包
下载以下 jar 包到 FLINK_HOME/lib/:
flink-sql-connector-elasticsearch7-3.0.1-1.17.jarflink-sql-connector-oracle-cdc-2.4.1.jar
五、启动 Flink 集群再启动 SQL CLI
-- Flink SQL
-- checkpoint every 3000 milliseconds
Flink SQL SET execution.checkpointing.interval 3s;Flink SQL CREATE TABLE products (ID INT,NAME STRING,DESCRIPTION STRING,PRIMARY KEY (ID) NOT ENFORCED) WITH (connector oracle-cdc,hostname localhost,port 1521,username flinkuser,password flinkpw,database-name XE,schema-name flinkuser, table-name products);Flink SQL CREATE TABLE orders (ORDER_ID INT,ORDER_DATE TIMESTAMP_LTZ(3),CUSTOMER_NAME STRING,PRICE DECIMAL(10, 5),PRODUCT_ID INT,ORDER_STATUS BOOLEAN) WITH (connector oracle-cdc,hostname localhost,port 1521,username flinkuser,password flinkpw,database-name XE,schema-name flinkuser, table-name orders);创建elasticsearch
Flink SQL CREATE TABLE enriched_orders (ORDER_ID INT,ORDER_DATE TIMESTAMP_LTZ(3),CUSTOMER_NAME STRING,PRICE DECIMAL(10, 5),PRODUCT_ID INT,ORDER_STATUS BOOLEAN,PRODUCT_NAME STRING,PRODUCT_DESCRIPTION STRING,PRIMARY KEY (ORDER_ID) NOT ENFORCED) WITH (connector elasticsearch-7,hosts http://localhost:9200,index enriched_orders_1关联处理后插入数据
Flink SQL INSERT INTO enriched_ordersSELECT o.*, p.NAME, p.DESCRIPTIONFROM orders AS oLEFT JOIN products AS p ON o.PRODUCT_ID p.ID;六、检查 ElasticSearch 中的结果
检查最终的结果是否写入ElasticSearch中, 可以在Kibana看到ElasticSearch中的数据
七、在 Oracle 制造一些变更观察 ElasticSearch 中的结果
进入Oracle容器中并通过如下的SQL语句对Oracle数据库进行一些修改, 然后就可以看到每执行一条SQL语句Elasticsearch中的数据都会实时更新。
docker-compose exec sqlplus flinkuser/flinkpw插入更新数据
INSERT INTO flinkuser.orders VALUES (10004, to_date(2020-07-30 15:22:00, yyyy-mm-dd hh24:mi:ss), Jark, 29.71, 104, 0);UPDATE flinkuser.orders SET ORDER_STATUS 1 WHERE ORDER_ID 10004;DELETE FROM flinkuser.orders WHERE ORDER_ID 10004;