@

1、MySQL和ES的主要区别?

1.1 功能性

MySQL作为最常用的DB之一,在DB-Ranking排名常年保持前三,仅次于Oracle。为什么还需要把数据同步至排名只有第八的Elasticsearch?相信不了解这两个数据库的区别的同学很容易有这样的疑问。下面我来解答这个问题。
在这里插入图片描述

首先两款数据库在功能上的区别:

MySQL Elasticsearch
关系型 搜索引擎
单体 分布式
OLTP OLAP
支持事务 不支持事务
SQL DSL

1.2 性能指标

再来看一下两种数据库的性能对比

MySQL Elasticsearch
检索性能 ★★★ ★★★★★
扩展能力 ★★ ★★★★★
写入实时性 ★★★★ ★★(可配置)
准确性 ★★★★ ★★
支持的数据类型 ★★★ ★★★★
灵活性 ★★★ ★★★★
事务支持 ★★★ ★★★★
可以看到,Elasticsearch在检索和扩展能力方面碾压MySQL。实际上MySQL在数据量达到 2-3千万的时候查询性能开始显著下降,而ES的检索性能,实测在数据量1亿左右,单节点1G内存,无任何优化的前提下仍能达到10ms以内的检索性能。并且ES具有MySQL无法相比的横向扩展能力,具备极高的可用性和扩展性。

1.3 在搜索业务上的区别

同样是检索结果,ES中的“检索”和关系型数据库的“查询”是两个不同的概念。搜索引擎的概念里,“检索”和相关度紧密耦合。

1.3.1 查询

关系数据库中“查询”的概念具有“完全相关性”,即搜索结果要么完全满足要求,要么不满足,而不存在“部分满足”的概念。

1.3.2 检索

而以ES为代表的搜索引擎中的“检索”(泛指全文检索),具备相关度的概念,即和搜索词预期搜索结果相符的程度。影响相关度的因素有很多,搜索量,数据本身等都可能影响最终结果。ES以相关度评分来衡量相关度的结果,这里不什么是相关度评分,如果感兴趣,可以异步“相关度评分”一文。

举个简单的例子:当用户搜索词为“小米手机”的时候,“查询”的执行逻辑为field_name='小米手机'或者field_name like '%小米手机%',即搜索结果的匹配逻辑为完全匹配或包含关系。而如果是检索,假设分词结果为“小米”和“手机”两个词项,搜索结果可能只包含“小米”或者只“手机”,如果设置了同义词等逻辑,匹配出手表、华为也是符合结果的。即搜索结果可能完全不包含搜索词

2、为什么要做数据同步

MySQL是关系型数据库,而Elasticsearch是搜索引擎的核心组件之一,属于非关系型数据库。有一句经典名言叫:存在即合理,两款数据库既然同时存在,那么他们必然有对方不可取代的地方。

这里只讨论两者最主要的不可替代特性,至于扩展能力、数据类型等问题本文暂不讨论

2.1 检索性能

ES擅长海量数据的检索,支持PB级数据的秒级查询,以亿为单位的数据在ES看来只是起步而已,并且搜索结果具有非完全相关性,而是全文检索。其相关度有专门的算法来计算,主要是TF-IDFBM25,两种算法这里不赘述,我的文章里有详细介绍。而MySQL虽然也有“全文索引”,但是非常积累,不管存存储效率还是检索效率都远不如ES,这也是其底层的数据结构和算法导致。普通索引目前底层数据结构为B+Trees,这种数据结构不适合存储长文本类型的索引,会导致树的深度过大,检索效率极低。其底层原理我有单独的文章详细介绍(待更新)。

2.2 写入性能

ES是OLAP系统,侧重于海量数据的检索,而写入实时性并不是很高,默认1秒,也就是ES缓冲区Buffer的刷新间隔时间,不了解Elasticsearch写入原理的同学可以暂时忽略。ES并非忽略了对写入性能的优化,而是“有意为之”,其原因就在于基于ES的写入机制,其写入实时性大数据检索性能是一个二选一的行为。实际上生产环境中我们经常通过“牺牲写入实时性”的操作来换取更高更快的“数据检索”性能

2.3 事务支持

正因为ES的写入实时性并不高,如果我们需要快速响应用户请求,我们常采取的手段就是使用缓存,但是在很多高并发的场景下,我们需要数据保持强一致性(如银行系统),因此需要使用具有ACID特性的数据库来支持,而MySQL就是一个比较好的选择。

但是如果单单只用MySQL,又无法解决海量数据的检索问题。其实在实际工作中常见的做法就是两者结合起来,通过数据同步的操作将MySQL的实时数据同步至Elasticsearch,两者各司其职互不干扰。

3、Canal

3.1 Canal是啥

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

3.2 应用场景

Elasticsearch 不支持事务。 ES通常在分布式系统架构中承担“搜索引擎”的角色,一般来说解决词类问题,可以把ES和支持ACID特性的关系型数据库结合起来使用。首先把对数据的更(增删改)操作在RDB中执行,然后把这些动作同步到Elasticsearch。 通过这种方式,你将受益于数据库 ACID 事务支持,并且在 Elasticsearch 中以正确的顺序产生变更。 并发在关系数据库中得到了处理。

MySQL为例,如果要把数据从同步至ES,canal + binary log就是常用的一种增量解决方案。

3.3 原理机制

img

3.3.1 MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

3.3.2 canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
    image.png

3.3 优势

  • 准实时性
  • 性能好
  • 一劳永逸

4 基于canal的MySQL=>ES数据同步方案

4.1 环境:

Java和ES兼容性:https://www.elastic.co/cn/support/matrix#matrix_jvm

  • JDK:1.8
  • Elasticsearch:7.x
  • MySQL: 5.7
  • Canal: 1.1.4

4.2 下载地址:

4.3 步骤

4.3.1 保证Elasticsearch服务可用

使用cancal向ES同步数据之前需要保证ES的服务是可以正常访问的

4.3.2 保证MySQL服务可用

不再赘述

4.3.3 开启MySQL的binary log(主备模式)

配置MySQL:

server_id = 1 #开启主从模式后每个MySQL节点的id
log-bin = mysql-bin #bin-log的存储位置
binlog-format = ROW #选择存储binlog日志方式为ROW模式

重启MySQL服务
验证是否开启成功

SHOW VARIABLES LIKE 'log_bin';
log_bin	ON #开启

4.4.4 canal-deployer

配置:conf/example/instance.properties

#canal示例的slaveId
canal.instance.mysql.slaveId=1234
#mysql地址
canal.instance.master.address= 127.0.0.1:3306
#用户名
canal.instance.dbUsername = root
#密码
canal.instance.dbPassword = 123456
#指定需要同步的数据库
canal.instance.defaultDatabaseName = msb_order
#指定编码方式
canal.instance.connectionCharset = UTF-8
#监控的是所有数据库,所有的表改动都会监控到,这样可能会浪费不少性能,可能我只想监控的是某一个数据库下的表。
#  .*\\..*表示监控所有数据库,canal\\..*表示监控canal数据库
canal.instance.filter.regex = .\*\\\\..\*

启动: ./startup.sh(Linux)
验证: demo

4.4.5 canal-admin

配置:conf/application.yml

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: root
  password: 123456
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1
canal:
  adminUser: admin
  adminPasswd: admin

启动管理服务
访问服务:server_ip:8089

4.4.6 canal-adapter

配置:conf/application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/msb_order?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
        key: exampleKey
        properties:
          mode: rest # transport or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch

配置:conf/es7/my_order.yml
这里要注意,在写SQL语句的时候,要保证a.id as _id,即把id属性改为_id,以保证能正常写入ES的_id字段。

dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
outerAdapterKey: exampleKey     # 对应application.yml中es配置的key 
destination: example            # cannal的instance或者MQ的topic
groupId: g1                     # 对应MQ模式下的groupId, 只会同步对应groupId的数据_search
esMapping:
  _index: msb_order             # es 的索引名称
  _type: _doc					# es7 固定'_doc',es8删除
  _id: _id                      # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  upsert: true
  sql: "SELECT a.id as _id,	a.customer_phone, a.customer_name, a.customer_region, a.customer_city, a.customer_district, a.customer_addr, a.failure_phenomenon, a.failure_phenomenon_text, a.book_date, a.service_required, a.buyshop_detail, a.buy_shop, a.buy_way, a.product_type, a.model_no, a.process_type, a.ex_order_no, a.customer_country, a.source, a.matnr, a.buyDate, a.serviceReq, a.create_time, a.result, a.rvstatus, a.ip_address   msb_order_1 a"
  commitBatch: 3000  

服务启动
注意:

  • 索引msb_order的mapping必须提前创建好
  • 索引中的id字段是_id,因此需要查询的时候需要id as _id