使用Canal + Kafka + ElasticSearch记录数据库修改历史

0

最近有个需求需要记录某些数据库表里面的新增修改和删除日志。
开始本来想直接通过注解方式实现,但是后来看了看,实现起来比较麻烦。主要是批量修改的时候很难记录日志。
所以想到使用Canal来监听MySQL的binlog,然后发送修改到Kafka,然后入库到ElasticSearch。

DEMO地址:https://gitee.com/acgist/muses/tree/master/service-parent/service-log-parent

异常

2022-10-27 16:08:58.068 [destination = acgist , address = /192.168.8.187:3306 , EventParser] ERROR c.a.o.canal.parse.inbound.mysql.dbsync.DirectLogFetcher - I/O error while reading from client socket
java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file
        at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) ~[canal.parse-1.1.6.jar:na]
        at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:237) ~[canal.parse-1.1.6.jar:na]
        at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:262) ~[canal.parse-1.1.6.jar:na]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

开始同步了一次,然后很久没有同步导致binlog偏移出错,停了canal然后删除meta.dat文件,重新启动就好。

Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:acgist.t_res_equipment,42 vs 40
2022-11-01 07:19:24.986 [destination = acgist , address = /192.168.8.187:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:acgist[com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.

这个是由于canal关闭的时候修改了数据库表结构,删除h2.mv.db重启即可。