mysql源码解析--一条sql的生命周期

死线 发布于

xcode源码引入

在分析之前,我们先将 mysql 8.0 源码引入到 xcode 工程中,方便后续的分析,步骤很简单,先将 mysql 源码下载到指定目录中,执行以下 shell 命令即可:

1
2
3
4
5
6
7
8
# 进入源码目录
cd your_mysql_path
# 新建 xcode 工程生成目录
mkdir bld
cd bld
# 生成 xcode 工程,替换自己的源码目录即可
cmake .. -G "Xcode" -DWITH_DEBUG=1 -DCMAKE_INSTALL_PREFIX=your_mysql_path/mysql-server/bld/install -DDOWNLOAD_BOOST=1 -DWITH_BOOST=your_mysql_path/mysql-server/bld/install/boost

打开该目录下生成的 xcode 工程文件即可,默认会生成所有的 target 这里推荐使用 mysqld ,当然你也可以在 windows 下使用 source insight ,只不过后续无法编译调试而已。

言归正传,下面开始分析一条 sql 的请求流程

初始化

很大多数流行的项目一样,mysql 也是 C/S 架构,mysql 本身只负责接收 sql 请求,经过一些列复杂的步骤再将结果回传给客户端。

mysql 的初始化入口是 mysqld_main 函数,具体一些关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// sql/mysqld.cc
int mysqld_main(int argc, char **argv)
{
...
// 加载配置
if (load_defaults(MYSQL_CONFIG_NAME, load_default_groups, &argc, &argv,
&argv_alloc)) {
flush_error_log_messages();
return 1;
}
...
// 启动接收请求的工作loop
mysqld_socket_acceptor->connection_event_loop();
...
// 退出初始化
mysqld_exit(signal_hand_thr_exit_code);
...
}

多任务模型

上面的重点就是调用 「connection_event_loop」 函数启动了事件处理函数,这里我们先简单的理解成 mysql 使用了 IO 复用的方式来处理多个连接,每个连接在这个 loop 进行调度, 即相关资源的分配和执行时机的确定。接下来,然我们看看这个 loop 里具体做了什么事情:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void connection_event_loop() {
// 获取连接管理实例(单例模式)
Connection_handler_manager *mgr =
Connection_handler_manager::get_instance();
// 检测 loop 是否已退出
while (!connection_events_loop_aborted()) {
// 从复用 IO fd 链中取出一个连接做处理,其实本质就是我们熟悉的 poll 和 select
Channel_info *channel_info = m_listener->listen_for_connection_event();
// 实际的链接处理入口
if (channel_info != nullptr) mgr->process_new_connection(channel_info);
}
}

// 默认内存序列是 memory_order_seq_cst 使用了无锁并发访问
inline MY_ATTRIBUTE(
(warn_unused_result)) bool connection_events_loop_aborted() {
return connection_events_loop_aborted_flag.load();
}

// 优先选择 poll ,否则使用 select ,这里似乎没有判断使用 epoll
Channel_info *Mysqld_socket_listener::listen_for_connection_event() {
#ifdef HAVE_POLL
int retval = poll(&m_poll_info.m_fds[0], m_socket_map.size(), -1);
#else
m_select_info.m_read_fds = m_select_info.m_client_fds;
int retval = select((int)m_select_info.m_max_used_connection,
&m_select_info.m_read_fds, 0, 0, 0);
#endif
...
}

从上面可以看出来,mysql 的连接选择上并没有什么特别的地方,纯粹的使用 select/poll 来选择而已,重头戏还是在怎么处理单个连接上,我们来具体看一下「process_new_connection」这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void Connection_handler_manager::process_new_connection(
Channel_info *channel_info) {
// 再次检测 loop 是否退出,并且数量没有超过阈值
if (connection_events_loop_aborted() ||
!check_and_incr_conn_count(channel_info->is_admin_connection())) {
channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
delete channel_info;
return;
}

// 增加一个连接,并传入本次连接的信息
if (m_connection_handler->add_connection(channel_info)) {
inc_aborted_connects();
delete channel_info;
}
}

bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
...
// 检测当前空闲的线程,如果没有空闲的线程,放入等待队列里并退出
if (!check_idle_thread_and_enqueue_connection(channel_info)) return false;
...
// 创建 mysql 线程,这里其实是假的创建,只是从线程池里捞一个出来使用
error =
mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
handle_connection, (void *)channel_info);
}

static void *handle_connection(void *arg) {
Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
Connection_handler_manager *handler_manager =
Connection_handler_manager::get_instance();
Channel_info *channel_info = static_cast<Channel_info *>(arg);

if (my_thread_init()) {
// 出错处理
...
}

for (;;) {
THD *thd = init_new_thd(channel_info);
if (thd == nullptr) {
// 出错处理
...
}
...
// 加入线程池
thd_manager->add_thd(thd);

// 预处理
if (thd_prepare_connection(thd))
handler_manager->inc_aborted_connects();
else {
while (thd_connection_alive(thd)) {
// 一直处理当前的任务
if (do_command(thd)) break;
}
end_connection(thd);
}
...
// 释放连接
channel_info = Per_thread_connection_handler::block_until_new_connection();
...
my_thread_end();
my_thread_exit(nullptr);
return nullptr;
}

可以看出 mysql 使用 IO 复用来处理网络连接,用线程池去做每个连接具体的任务,这样使得「连接调度」和「实际处理」分开,这也是很多开源项目里经典的做法,只是没有引入 epoll 还是挺惊讶的,可能是可移植性的考虑吧,毕竟类似 windows 很难支持这种特性。

任务处理

上述的「do_command」就是具体执行 sql 命令的入口,下面我们看这个入口做的事情:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
bool do_command(THD *thd) {
...
enum enum_server_command command;
...
// 根据不同的协议解包命令
rc = thd->get_protocol()->get_command(&com_data, &command);
...
// 实际处理入口,根据不同的 command 分发
return_value = dispatch_command(thd, &com_data, command);
...
}

bool dispatch_command(THD *thd, const COM_DATA *com_data,
enum enum_server_command command) {
...
// 根据不同的命令分发处理
switch (command) {
...
case COM_QUERY: {
...
// 词法分析初始化
Parser_state parser_state;
if (parser_state.init(thd, thd->query().str, thd->query().length)) break;
...
// 这里只分析查询命令
mysql_parse(thd, &parser_state);
...
}
}
...
}

上述的两个函数其实就是做了命令解析与分发,以查询命令为例,下面要做的就是我们熟知的 sql 解析过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
void mysql_parse(THD *thd, Parser_state *parser_state) {
...
mysql_reset_thd_for_next_command(thd);
// 词法解析
lex_start(thd);

thd->m_parser_state = parser_state;
invoke_pre_parse_rewrite_plugins(thd);
thd->m_parser_state = nullptr;

enable_digest_if_any_plugin_needs_it(thd, parser_state);

LEX *lex = thd->lex;
const char *found_semicolon = nullptr;

bool err = thd->get_stmt_da()->is_error();

if (!err) {
// 语法解析
err = parse_sql(thd, parser_state, nullptr);
if (!err) err = invoke_post_parse_rewrite_plugins(thd, false);

found_semicolon = parser_state->m_lip.found_semicolon;
}

if (!err) {
...
// 查询优化
// MySQL8.0取消了query cache
if (thd->rewritten_query().length() == 0) mysql_rewrite_query(thd);
...

if (!(opt_general_log_raw || thd->slave_thread)) {
if (thd->rewritten_query().length())
// 固化查询日志
query_logger.general_log_write(thd, COM_QUERY,
thd->rewritten_query().ptr(),
thd->rewritten_query().length());
else {
...
}
}
}

if (!err) {
thd->m_statement_psi = MYSQL_REFINE_STATEMENT(
thd->m_statement_psi, sql_statement_info[thd->lex->sql_command].m_key);

if (mqh_used && thd->get_user_connect() &&
check_mqh(thd, lex->sql_command)) {
...
} else {
if (!thd->is_error()) {
...
lex->set_trg_event_type_for_tables();

int error MY_ATTRIBUTE((unused));
if (unlikely(thd->security_context()->password_expired() &&
lex->sql_command != SQLCOM_SET_PASSWORD &&
lex->sql_command != SQLCOM_SET_OPTION &&
lex->sql_command != SQLCOM_ALTER_USER)) {
my_error(ER_MUST_CHANGE_PASSWORD, MYF(0));
error = 1;
} else {
...
// 执行sql命令
error = mysql_execute_command(thd, true);
...
}
}
}
} else {
...
}
...
}

当连接请求的 sql 命令被解析后,经过词法解析->语法解析->查询优化后,最后交由执行器执行 sql 命令,执行过程比较复杂,本篇就先不讲了,以后讲存储的时候会细说。

释放连接

最后,在「handle_connection」函数里,还有个释放连接的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Channel_info *Per_thread_connection_handler::block_until_new_connection() {
Channel_info *new_conn = nullptr;
mysql_mutex_lock(&LOCK_thread_cache);
if (blocked_pthread_count < max_blocked_pthreads && !shrink_cache) {
...
// Block pthread
blocked_pthread_count++;
while (!connection_events_loop_aborted() && !wake_pthread && !shrink_cache)
mysql_cond_wait(&COND_thread_cache, &LOCK_thread_cache);
blocked_pthread_count--;

if (shrink_cache && blocked_pthread_count <= max_blocked_pthreads) {
mysql_cond_signal(&COND_flush_thread_cache);
}

if (wake_pthread) {
wake_pthread--;
if (!waiting_channel_info_list->empty()) {
new_conn = waiting_channel_info_list->front();
waiting_channel_info_list->pop_front();
DBUG_PRINT("info", ("waiting_channel_info_list->pop %p", new_conn));
} else {
DBUG_ASSERT(0); // We should not get here.
}
}
}
mysql_mutex_unlock(&LOCK_thread_cache);
return new_conn;
}

回收的逻辑很简单,大于阻塞线程配置数,回收,反之,阻塞等待新的连接。

结语

本篇除了执行器,大概简单的演示了一下一条 sql 语句的生命周期,描述还是以代码为主,比较简陋,忘大佬们海涵,具体不明白的地方,可以通过留言区或者邮箱联系我。

我一直觉得分析源码的最好途径就是追踪一遍最简单的命令的生命周期,再根据这条命令经过的模块足迹来各个击破。

下篇会讲锁或者MVCC数据存储,敬请期待。

题图

电影《HELLO WORLD》的女主「一行瑠璃」,挺不错的科幻电影,安利给大家。