xcode源码引入
在分析之前,我们先将 mysql 8.0 源码引入到 xcode 工程中,方便后续的分析,步骤很简单,先将 mysql 源码下载到指定目录中,执行以下 shell 命令即可:
1 2 3 4 5 6 7 8
| cd your_mysql_path
mkdir bld cd bld
cmake .. -G "Xcode" -DWITH_DEBUG=1 -DCMAKE_INSTALL_PREFIX=your_mysql_path/mysql-server/bld/install -DDOWNLOAD_BOOST=1 -DWITH_BOOST=your_mysql_path/mysql-server/bld/install/boost
|
打开该目录下生成的 xcode 工程文件即可,默认会生成所有的 target 这里推荐使用 mysqld ,当然你也可以在 windows 下使用 source insight ,只不过后续无法编译调试而已。
言归正传,下面开始分析一条 sql 的请求流程
初始化
很大多数流行的项目一样,mysql 也是 C/S 架构,mysql 本身只负责接收 sql 请求,经过一些列复杂的步骤再将结果回传给客户端。
mysql 的初始化入口是 mysqld_main 函数,具体一些关键代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| int mysqld_main(int argc, char **argv) { ... if (load_defaults(MYSQL_CONFIG_NAME, load_default_groups, &argc, &argv, &argv_alloc)) { flush_error_log_messages(); return 1; } ... mysqld_socket_acceptor->connection_event_loop(); ... mysqld_exit(signal_hand_thr_exit_code); ... }
|
多任务模型
上面的重点就是调用 「connection_event_loop」 函数启动了事件处理函数,这里我们先简单的理解成 mysql 使用了 IO 复用的方式来处理多个连接,每个连接在这个 loop 进行调度, 即相关资源的分配和执行时机的确定。接下来,然我们看看这个 loop 里具体做了什么事情:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| void connection_event_loop() {
Connection_handler_manager *mgr = Connection_handler_manager::get_instance(); while (!connection_events_loop_aborted()) { Channel_info *channel_info = m_listener->listen_for_connection_event(); if (channel_info != nullptr) mgr->process_new_connection(channel_info); } }
inline MY_ATTRIBUTE( (warn_unused_result)) bool connection_events_loop_aborted() { return connection_events_loop_aborted_flag.load(); }
Channel_info *Mysqld_socket_listener::listen_for_connection_event() { #ifdef HAVE_POLL int retval = poll(&m_poll_info.m_fds[0], m_socket_map.size(), -1); #else m_select_info.m_read_fds = m_select_info.m_client_fds; int retval = select((int)m_select_info.m_max_used_connection, &m_select_info.m_read_fds, 0, 0, 0); #endif ... }
|
从上面可以看出来,mysql 的连接选择上并没有什么特别的地方,纯粹的使用 select/poll 来选择而已,重头戏还是在怎么处理单个连接上,我们来具体看一下「process_new_connection」这个函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| void Connection_handler_manager::process_new_connection( Channel_info *channel_info) { if (connection_events_loop_aborted() || !check_and_incr_conn_count(channel_info->is_admin_connection())) { channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true); delete channel_info; return; }
if (m_connection_handler->add_connection(channel_info)) { inc_aborted_connects(); delete channel_info; } }
bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) { ...
if (!check_idle_thread_and_enqueue_connection(channel_info)) return false; ...
error = mysql_thread_create(key_thread_one_connection, &id, &connection_attrib, handle_connection, (void *)channel_info); }
static void *handle_connection(void *arg) { Global_THD_manager *thd_manager = Global_THD_manager::get_instance(); Connection_handler_manager *handler_manager = Connection_handler_manager::get_instance(); Channel_info *channel_info = static_cast<Channel_info *>(arg);
if (my_thread_init()) { ... }
for (;;) { THD *thd = init_new_thd(channel_info); if (thd == nullptr) { ... } ... thd_manager->add_thd(thd);
if (thd_prepare_connection(thd)) handler_manager->inc_aborted_connects(); else { while (thd_connection_alive(thd)) { if (do_command(thd)) break; } end_connection(thd); } ... channel_info = Per_thread_connection_handler::block_until_new_connection(); ... my_thread_end(); my_thread_exit(nullptr); return nullptr; }
|
可以看出 mysql 使用 IO 复用来处理网络连接,用线程池去做每个连接具体的任务,这样使得「连接调度」和「实际处理」分开,这也是很多开源项目里经典的做法,只是没有引入 epoll 还是挺惊讶的,可能是可移植性的考虑吧,毕竟类似 windows 很难支持这种特性。
任务处理
上述的「do_command」就是具体执行 sql 命令的入口,下面我们看这个入口做的事情:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| bool do_command(THD *thd) { ... enum enum_server_command command; ... rc = thd->get_protocol()->get_command(&com_data, &command); ... return_value = dispatch_command(thd, &com_data, command); ... }
bool dispatch_command(THD *thd, const COM_DATA *com_data, enum enum_server_command command) { ... switch (command) { ... case COM_QUERY: { ... Parser_state parser_state; if (parser_state.init(thd, thd->query().str, thd->query().length)) break; ... mysql_parse(thd, &parser_state); ... } } ... }
|
上述的两个函数其实就是做了命令解析与分发,以查询命令为例,下面要做的就是我们熟知的 sql 解析过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| void mysql_parse(THD *thd, Parser_state *parser_state) { ... mysql_reset_thd_for_next_command(thd); lex_start(thd);
thd->m_parser_state = parser_state; invoke_pre_parse_rewrite_plugins(thd); thd->m_parser_state = nullptr;
enable_digest_if_any_plugin_needs_it(thd, parser_state);
LEX *lex = thd->lex; const char *found_semicolon = nullptr;
bool err = thd->get_stmt_da()->is_error();
if (!err) { err = parse_sql(thd, parser_state, nullptr); if (!err) err = invoke_post_parse_rewrite_plugins(thd, false);
found_semicolon = parser_state->m_lip.found_semicolon; }
if (!err) { ... if (thd->rewritten_query().length() == 0) mysql_rewrite_query(thd); ...
if (!(opt_general_log_raw || thd->slave_thread)) { if (thd->rewritten_query().length()) query_logger.general_log_write(thd, COM_QUERY, thd->rewritten_query().ptr(), thd->rewritten_query().length()); else { ... } } }
if (!err) { thd->m_statement_psi = MYSQL_REFINE_STATEMENT( thd->m_statement_psi, sql_statement_info[thd->lex->sql_command].m_key);
if (mqh_used && thd->get_user_connect() && check_mqh(thd, lex->sql_command)) { ... } else { if (!thd->is_error()) { ... lex->set_trg_event_type_for_tables();
int error MY_ATTRIBUTE((unused)); if (unlikely(thd->security_context()->password_expired() && lex->sql_command != SQLCOM_SET_PASSWORD && lex->sql_command != SQLCOM_SET_OPTION && lex->sql_command != SQLCOM_ALTER_USER)) { my_error(ER_MUST_CHANGE_PASSWORD, MYF(0)); error = 1; } else { ... error = mysql_execute_command(thd, true); ... } } } } else { ... } ... }
|
当连接请求的 sql 命令被解析后,经过词法解析->语法解析->查询优化后,最后交由执行器执行 sql 命令,执行过程比较复杂,本篇就先不讲了,以后讲存储的时候会细说。
释放连接
最后,在「handle_connection」函数里,还有个释放连接的过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| Channel_info *Per_thread_connection_handler::block_until_new_connection() { Channel_info *new_conn = nullptr; mysql_mutex_lock(&LOCK_thread_cache); if (blocked_pthread_count < max_blocked_pthreads && !shrink_cache) { ... blocked_pthread_count++; while (!connection_events_loop_aborted() && !wake_pthread && !shrink_cache) mysql_cond_wait(&COND_thread_cache, &LOCK_thread_cache); blocked_pthread_count--;
if (shrink_cache && blocked_pthread_count <= max_blocked_pthreads) { mysql_cond_signal(&COND_flush_thread_cache); }
if (wake_pthread) { wake_pthread--; if (!waiting_channel_info_list->empty()) { new_conn = waiting_channel_info_list->front(); waiting_channel_info_list->pop_front(); DBUG_PRINT("info", ("waiting_channel_info_list->pop %p", new_conn)); } else { DBUG_ASSERT(0); } } } mysql_mutex_unlock(&LOCK_thread_cache); return new_conn; }
|
回收的逻辑很简单,大于阻塞线程配置数,回收,反之,阻塞等待新的连接。
结语
本篇除了执行器,大概简单的演示了一下一条 sql 语句的生命周期,描述还是以代码为主,比较简陋,忘大佬们海涵,具体不明白的地方,可以通过留言区或者邮箱联系我。
我一直觉得分析源码的最好途径就是追踪一遍最简单的命令的生命周期,再根据这条命令经过的模块足迹来各个击破。
下篇会讲锁或者MVCC数据存储,敬请期待。
题图
电影《HELLO WORLD》的女主「一行瑠璃」,挺不错的科幻电影,安利给大家。