Kafka消息 Specification

Kafka消息 specification, encoded using GSON library.

名字 类型 对应Java类型 意义
binlog 字符串 String MYSQL的binlog文件和offset,格式为offset@binlog-file
time 64位整型 long 以毫秒为单位的Unix时间戳,代表此binlog条目产生的时间
canalTime 64位整型 long 以毫秒为单位的Unix时间戳,代表binlog经过到达KafkaRiverCanal程序的时间,也可视为消息进入kafka的时间
db 字符串 String MYSQL一个数据库的schema名(一般情况下与数据库名一样,某些情况也可能不一样,请咨询相关数据库的DBA)
table 字符串 String 此条binlog影响的MYSQL的一个表的表名
event 字符型 char 代表binlog事件类型,目前只解析出了insert, update & delete(对应的值为'i', 'u'和'd')
columns JSON数组 ArrayList<Object> MYSQL表一个row的各个column,包含这次事件影响的数据,详见表二
keys JSON数组 ArrayList<String> MYSQL keys

其中,columns数组的每个元素specification见表二。

名字 类型 对应Java类型 意义
v 字符串 String value的缩写,即该列的值。对于insert,即插入的新值;对于update,即update之后的值;而对于delete,则为删除前的值
updated 布尔 boolean 本次事件该字段是否被更新了。仅对update事件有意义,对insert和delete一概为true,因此略去
t 字符串 String type的缩写,即这个列的MYSQL数据类型。例如decimal(10,4)
origin_val 字符串 String origin_value的缩写,该列更新之前的旧值。仅对update事件,且updated为true的字段有效。
null 布尔 boolean 该字段的值是否为null
n 字符串 String name的缩写,即MYSQL表的一个列的列名

INSERT消息样例:

{ 
"binlog": "6816@mysql-bin.000070", 
"time": 1450235092000,
"canalTime": 1450235093370,
"db": "TestCanal", 
"table": "g_order_010",
"event": "i",
"columns": [ 
{ "n": "order_id", "t": "bigint(20)", "v": "126", "null": false }, 
{ "n": "x_id", "t": "bigint(20)", "v": "123456", "null": false }, 
{ "n": "phone", "t": "varchar(15)", "v": "13264494028", "null": false }, 
{ "n": "time", "t": "timestamp", "v": "2015-08-10 13:08:13", "null": false }
],
"keys": [ "order_id" ] }

UPDATE消息样例:

{
"binlog": "25521@mysql-bin.000070",
"time": 1450236307000,
"canalTime": 1450236308279,
"db": "TestCanal",
"table": "g_order_010",
"event": "u",
"columns": [
{"n": "order_id", "t": "bigint(20)", "v": "126", "null": false, "updated": false},
{"n": "x_id", "t": "bigint(20)", "v": "123456", "null": false, "updated": false},
{ "n": "name", "t": "varchar(100)", "v": "小春", "origin_val": "小明", "null": false, "updated": true}
],
"keys": ["order_id"]
}

DELETE消息样例:

{
"binlog": "58851@mysql-bin.000070",
"time": 1450237034000,
"canalTime": 1450237034492,
"db": "TestCanal",
"table": "g_order_010",
"event": "d",
"columns": [
{"n": "order_id", "t": "bigint(20)", "v": "126", "null": false},
{"n": "x_id", "t": "bigint(20)", "v": "123456", "null": false},
{"n": "phone", "t": "varchar(15)", "v": "13264494028", "null": false}
],
"keys": ["order_id"]
}