源码才是王道。
真正的高手从来不是临场发挥,随机应变是外人看来的错觉。
1. 主函数sql/mysqld.cc中,代码如下:
int main(int argc, char **argv)
{
MY_INIT(argv[0]);
logger.init_base();
init_common_variables(MYSQL_CONFIG_NAME,argc, argv, load_default_groups)
user_info = check_user(mysqld_user);
set_user(mysqld_user, user_info);
init_server_components();
network_init();
start_signal_handler();
mysql_rm_tmp_tables() || acl_init(opt_noacl)
init_status_vars();
start_handle_manager();
handle_connections_sockets();
}
2.监听连接: sql/mysqld.cc - handle_connections_sockets:
pthread_handler_t handle_connections_sockets(void *arg __attribute__((unused))) {
FD_SET(unix_sock,&clientFDs);
while (!abort_loop) {
readFDs=clientFDs;
select((int) max_used_connection,&readFDs,0,0,0);
new_sock = accept(sock, my_reinterpret_cast(struct sockaddr *) (&cAddr), &length);
thd= new THD;
vio_tmp=vio_new(new_sock, VIO_TYPE_SOCKET, VIO_LOCALHOST);
my_net_init(&thd->net,vio_tmp));
create_new_thread(thd);
}
}
3. 创建连接 sql/mysqld.cc create_new_thread/create_thread_to_handle_connection:
static void create_new_thread(THD *thd) {
NET *net=&thd->net;
if (connection_count >= max_connections + 1 || abort_loop) {
close_connection(thd, ER_CON_COUNT_ERROR, 1);
delete thd;
}
++connection_count;
thread_scheduler.add_connection(thd);
}
4. 线程调度器thread_scheduler - create_thread_to_handle_connection
void create_thread_to_handle_connection(THD *thd) {
if (cached_thread_count > wake_thread) {
thread_cache.append(thd);
pthread_cond_signal(&COND_thread_cache);
} else {
threads.append(thd);
pthread_create(&thd->real_id,&connection_attrib, handle_one_connection, (void*) thd)));
}
}
5.handle_one_connection
pthread_handler_t handle_one_connection(void *arg) {
thread_scheduler.init_new_connection_thread();
setup_connection_thread_globals(thd);
for (;;) {
lex_start(thd);
login_connection(thd);
prepare_new_connection_state(thd);
do_command(thd);
end_connection(thd);
}
}
6.执行语句 sql/sql_parse.cc - do_command函数
bool do_command(THD *thd) {
NET *net= &thd->net;
packet_length = my_net_read(net);
packet = (char*) net->read_pos;
command = (enum enum_server_command) (uchar) packet[0];
dispatch_command(command, thd, packet+1, (uint) (packet_length-1));
}
7.指令分发 sql/sql_parse.cc定义dispatch_command
bool dispatch_command(enum enum_server_command command, THD *thd, char* packet, uint packet_length) {
NET *net = &thd->net;
thd->command = command;
switch (command) { //判断命令类型
case COM_INIT_DB: ...;
case COM_TABLE_DUMP: ...;
case COM_CHANGE_USER: ...;
...
case COM_QUERY: //如果是Query
alloc_query(thd, packet, packet_length); //从网络数据包中读取Query并存入thd->query
mysql_parse(thd, thd->query, thd->query_length, &end_of_stmt); //送去解析
}
}
8.sql/sql_parse.cc mysql_parse函数负责解析SQL
void mysql_parse(THD *thd, const char *inBuf, uint length, const char ** found_semicolon) {
lex_start(thd);
if (query_cache_send_result_to_client(thd, (char*) inBuf, length) <= 0) {
Parser_state parser_state(thd, inBuf, length);
parse_sql(thd, & parser_state, NULL);
mysql_execute_command(thd);
}
}
9.执行命令 mysql_execute_command
int mysql_execute_command(THD *thd) {
LEX *lex= thd->lex; // 解析过后的SQL语句的语法结构
TABLE_LIST *all_tables = lex->query_tables; // 该语句要访问的表的列表
switch (lex->sql_command) {
...
case SQLCOM_INSERT:
insert_precheck(thd, all_tables);
mysql_insert(thd, all_tables, lex->field_list, lex->many_values, lex->update_list, lex->value_list, lex->duplicates, lex->ignore);
break; ...
case SQLCOM_SELECT:
check_table_access(thd, lex->exchange ? SELECT_ACL | FILE_ACL : SELECT_ACL, all_tables, UINT_MAX, FALSE); // 检查用户对数据表的访问权限
execute_sqlcom_select(thd, all_tables); // 执行select语句
break;
}
}
10.接下来sql/sql_insert.cc中mysql_insert函数
bool mysql_insert(THD *thd,
TABLE_LIST *table_list, // 该INSERT要用到的表
List<Item> &fields, // 使用的项
....) {
open_and_lock_tables(thd, table_list); // 这里的锁只是防止表结构修改
mysql_prepare_insert(...);
foreach value in values_list {
write_record(...);
}
} //里面还有trigger,错误,view之类的杂七杂八的东西,我们都忽略
11.接着看真正写数据的函数write_record (在sql/sql_insert.cc),精简代码如下:
int write_record(THD *thd, TABLE *table,COPY_INFO *info) { // 写数据记录
if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE) { //如果是REPLACE或UPDATE则替换数据
table->file->ha_write_row(table->record[0]);
table->file->ha_update_row(table->record[1], table->record[0]));
} else {
table->file->ha_write_row(table->record[0]);
}
}
int handler::ha_write_row(uchar *buf) { //这是啥? Handler API !
write_row(buf); // 调用具体的实现
binlog_log_row(table, 0, buf, log_func)); // 写binlog
}
请求数据流
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)