前言

不知道阅读该篇文章的同学有没有听说过阿里的Binlog增量订阅组件Canal。通过Canal的简介我们也能大致看出Canal Server模拟了Mysql的Slave节点向Master发送Dump请求推送Binlog日志,再Canal Server接收到请求后对日志文件做数据过滤、加工最终推送到下游客户端(如ES、Hbase、Kafka)以实现诸如业务Cache刷新、业务增量数据处理等。
canal-introduction
该篇文章主要针对Canal Server如何模拟Slave节点并让Master节点推送Binlog日志,Canal组件肯定会对性能如IO这块做优化,这里建议去学习Canal整体的架构设计。

主从复制

在了解如何伪装Slave节点前,我认为得先浅浅了解一下Mysql主从复制的原理,下图是Mysql主从复制的原理图:
master-slave

  1. Slave创建俩个线程分别是I/O线程、SQL线程。
  2. Slave创建的I/O线程向Master节点请求Binlog日志。
  3. Master为Slave的I/O线程创建Dump线程Push Binlog日志。
  4. Slave的I/O线程将数据写入RelayLog。
  5. Slave的SQL线程对语句分析并执行。

如何伪装Slave

canal-structure
从上图我们可以大致了解Canal的流程,那我们这里进入正题看看Canal Server如何伪装成Slave节点。

Mysql协议包定义

我们可以大致猜到需要伪装成Slave肯定需要跟Mysql进行TCP连接,那么我们得先了解Mysql针对通信协议的数据包的结构。如下图所示:
packet-defination
首先Mysql协议定义的Packet,Header占4字节包含Payload_Length占3个字节、SequenceID占1个字节(该字段从00开始并以一次通信结束后重新开始,即Mysql发送OK_Parket、Err_Packet),其次由于Payload_Length仅有3个字节,因此Payload最大为(2^24 - 1)Byte,因此才有了上图的三种情况,当Payload的长度 < ffffff时,只需发一个包即可、当Payload的长度 = ffffff时,需要在发一个空包、当Payload的长度 > ffffff时,需要分包发送。

Mysql连接生命周期

连接阶段

首先我们需要跟Mysql建立连接,因此我们需要进行Handshake,我们看下Mysql连接阶段的流程:
mysql-connectoin-step

  1. Mysql Server向客户端发送Initial Handshake Packet。
  2. 客户端向Mysql Server发送Handshake Response Packet。
  3. 若认证成功,Mysql Server向客户端发送OK_Packet。
    整个过程我们可以简单的概括为客户端与服务端互相交换能力以及服务端对客户端进行认证的过程。(SSL握手会比普通握手多一步SSL交换创建SSL连接的过程,但本文的重点不在此,有兴趣的同学可以去Mysql官方文档学习。)
针对连接阶段的补充
  1. Client端可向Server端发起SSL握手,方式是Client端在接收到Initial Handshake Packet后回复Protocol::SSLRequest,通过SSL Exchange使得建立SSL连接,之后在回复Handshake Response Packet。
  2. 当我们需要知道Server端是否支持某个CapabilityFlag时,只需要将Server端返回的CapabilityFlags按位与具体的CapabilityFlag即可,如CLIENT_LONG_PASSWORD = CapabilityFlags & CLIENT_LONG_PASSWORD。同理在客户端需要让服务端了解我们需要的能力只需将我们需要的逻辑按位或即可,如CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG。
实现代码

Initial Handshake Packet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
## Protocol::HandshakeV10 3.21.0版本之后,都是HandshakeV10的版本。旧版本则为HandshakeV9。

def parse_initial_handshake_packet(s):
"""
解析Mysql Server发的包
:param s:
:return:
1 [0a] protocol version
string[NUL] server version
4 connection id
string[8] auth-plugin-data-part-1
1 [00] filler
2 capability flags (lower 2 bytes)
if more data in the packet:
1 character set
2 status flags
2 capability flags (upper 2 bytes)
if capabilities & CLIENT_PLUGIN_AUTH {
1 length of auth-plugin-data
} else {
1 [00]
}
string[10] reserved (all [00])
if capabilities & CLIENT_SECURE_CONNECTION {
string[$len] auth-plugin-data-part-2 ($len=MAX(13, length of auth-plugin-data - 8))
if capabilities & CLIENT_PLUGIN_AUTH {
string[NUL] auth-plugin name
}
"""
packet = []
input_buffer = s.recv(16384)
packet.append(input_buffer)
packet = b''.join(packet).hex()
index = 0
# int<3> 3个字节长度的payload_length
payload_length = packet[index:6]
index += 6
# int<1> 1个字节长度的sequence_id
sequence_id = packet[index:8]
index += 2
# int<1> 1个字节长度的协议等级
protocol_version = packet[index:10]
index += 2
# string<NULL> 已NULL结尾的Mysql服务器版本
server_version = ''
for i in range(int(len(packet[index:]) / 2)):
start = index
end = index + 2
server_version_part = packet[start:end]
server_version += server_version_part
index += 2
if server_version_part == '00':
break
# int<4> 线程id
thread_id = packet[index:index + 8]
index += 8
# string<8> 密码加密前8位
auth_plugin_data_part_1 = packet[index:index + 16]
index += 16
# int<1> 填充位
filter_part = packet[index:index + 2]
index += 2
# int<2> 功能标识低位
capability_flags_1 = packet[index:index + 4]
index += 4
# int<1> 字符编码格式
character_set = packet[index:index + 2]
index += 2
# int<2> 服务器状态标识
status_flags = packet[index:index + 4]
index += 4
# int<2> 功能标识高位
capability_flags_2 = packet[index:index + 4]
index += 4
# int<1> 加密随机数长度,若不支持CLIENT_PLUGIN_AUTH则为00
auth_plugin_data_len = packet[index:index + 2]
index += 2
# string<10> 保留位,都是00
reserved = packet[index:index + 20]
index += 20
# string<13> 密码加密后13位
auth_plugin_data_part_2 = packet[index:index + 26]
index += 26
# 已NULL结尾的字符串,用户认证方式名称
auth_plugin_name = packet[index:]

return {
"payload_length": payload_length,
"sequence_id": sequence_id,
"protocol_version": protocol_version,
"server_version": server_version,
"thread_id": thread_id,
"auth_plugin_data_part_1": auth_plugin_data_part_1,
"filter": filter_part,
"capability_flags_1": capability_flags_1,
"character_set": character_set,
"status_flags": status_flags,
"capability_flags_2": capability_flags_2,
"auth_plugin_data_len": auth_plugin_data_len,
"reserved": reserved,
"auth_plugin_data_part_2": auth_plugin_data_part_2,
"auth_plugin_name": auth_plugin_name
}

Handshake Resp Packet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
## Protocol::HandshakeResponse41 当Server的CapabilityFlags不支持CLIENT_PROTOCOL_41,则协议版本为HandshakeReponse320。
def send_handshake_resp_packet(s, initial_resp):
"""
Client发送相应包
:param s:
:param initial_resp:
:return:
4 capability flags, CLIENT_PROTOCOL_41 always set
4 max-packet size
1 character set
string[23] reserved (all [0])
string[NUL] username
if capabilities & CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA {
lenenc-int length of auth-response
string[n] auth-response
} else if capabilities & CLIENT_SECURE_CONNECTION {
1 length of auth-response
string[n] auth-response
} else {
string[NUL] auth-response
}
if capabilities & CLIENT_CONNECT_WITH_DB {
string[NUL] database
}
if capabilities & CLIENT_PLUGIN_AUTH {
string[NUL] auth plugin name
}
if capabilities & CLIENT_CONNECT_ATTRS {
lenenc-int length of all key-values
lenenc-str key
lenenc-str value
if-more data in 'length of all key-values', more keys and value pairs
}
"""
client_long_password = 1
client_long_flag = 4
client_connect_with_db = 8
client_protocol_41 = 512
client_interactive = 1024
client_transactions = 8192
client_secure_connection = 32768
client_multi_statements = 1 << 16
client_plugin_auth = 1 << 19

client_flag = (client_long_password | client_long_flag | client_connect_with_db | client_protocol_41 |
client_interactive | client_transactions | client_secure_connection | client_multi_statements |
client_plugin_auth).to_bytes(length=4, byteorder='little')
max_packet_size = b'\x00\xff\xff\xff'
character_set = b'\x21'
filter_part = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
user_name = 'canal'.encode('utf-8') + b'\x00'
password = 'canal'.encode('utf-8')
auth_response = encrypt_password(
password,
initial_resp.get('auth_plugin_data_part_1'),
initial_resp.get('auth_plugin_data_part_2')
)
auth_response_length = len(auth_response).to_bytes(length=1, byteorder='little')
database = 'learning2'.encode('utf-8') + b'\x00'
auth_plugin_name = 'mysql_native_password'.encode('utf-8') + b'\x00'
payload = client_flag + max_packet_size + character_set + filter_part + user_name \
+ auth_response_length + auth_response + database + auth_plugin_name
header_length = len(payload).to_bytes(length=3, byteorder='little')
sequence_id = b'\x01'
packet = header_length + sequence_id + payload
s.send(packet)

注意密码加密方式:auth_plugin_name为mysql_native_password,其密码加密方式为:SHA1(pwd) XOR SHA1(20 byte scramble concat SHA1(SHA1(pwd)))

命令行阶段

当我们在连接阶段接收到Mysql Server推送的OK_Parcket说明已经认证成功,之后Mysql将进入命令行阶段,此时我们就可以像Mysql发送命令如COM_REGISTER_SLAVECOM_BINLOG_DUMP实现Slave节点注册以及向Mysql Server发送Binlog Dump命令。

实现代码

发送注册Slave Command

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def send_com_register_slave(s):
"""
发送命令注册从节点
:param s:
:return:
1 [15] COM_REGISTER_SLAVE
4 server-id
1 slaves hostname length
string[$len] slaves hostname
1 slaves user len
string[$len] slaves user
1 slaves password len
string[$len] slaves password
2 slaves mysql-port
4 replication rank
4 master-id
"""
(localhost, local_port) = s.getsockname()
command = b'\x15'
server_id = (3).to_bytes(length=4, byteorder='little')
hostname = localhost.encode('utf-8')
hostname_length = len(localhost.encode('utf-8')).to_bytes(length=1, byteorder='little')
slave_user = 'canal'.encode('utf-8')
slave_user_length = len(slave_user).to_bytes(length=1, byteorder='little')
slave_password = 'canal'.encode('utf-8')
slave_password_length = len(slave_password).to_bytes(length=1, byteorder='little')
slave_mysql_port = local_port.to_bytes(length=2, byteorder='little')
replication_rank = (0).to_bytes(length=4, byteorder='little')
master_id = (0).to_bytes(length=4, byteorder='little')
payload = command + server_id + hostname_length + hostname + slave_user_length + slave_user + \
slave_password_length + slave_password + slave_mysql_port + replication_rank + master_id
header = len(payload).to_bytes(length=3, byteorder='little')
sequence_id = b'\x00'
packet = header + sequence_id + payload
s.send(packet)

发送Binlog Dump Command

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def send_com_binlog_dump(s):
"""
发送dump日志的命令
:param s:
:return:
1 [12] COM_BINLOG_DUMP
4 binlog-pos
2 flags
4 server-id
string[EOF] binlog-filename
"""
command = b'\x12'
binlog_pos = (100).to_bytes(length=4, byteorder='little')
flags = b'\x00\x00'
server_id = (3).to_bytes(length=4, byteorder='little')
binlog_filename = 'binlog.000502'.encode('utf-8')
payload = command + binlog_pos + flags + server_id + binlog_filename
header = len(payload).to_bytes(length=3, byteorder='little')
sequence_id = b'\x00'
packet = header + sequence_id + payload
s.send(packet)

总结

我们通过了解Mysql的生命周期知道当我们连接成功后将进入命令行阶段,官方叫Command Phase。此时我们就可以向Mysql Server发送命令实现Slave的伪装。以上代码仅仅只是展示了如何实现Slave节点的注册和发送Binlog Dump指令,对Canal完整流程感兴趣的同学建议去阅读源码,毕竟一个好的组件设计需要考虑IO性能优化(诸如零拷贝、NIO等)、代码的可扩展性等等。