- mysql多线程处理不好,经常会发生coredump,见使用Mysql出core一文。
单线程
- 一般情况下,单线程连接mysql代码如下:
/*
single_thread_mysql_client.cpp
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <mysql/mysql.h>
#include <pthread.h>
#include <unistd.h>
#define DBHOST "localhost"
#define DBUSER "pca"
#define DBPASS "pca"
#define DBPORT 3306
#define DBNAME "dxponline"
#define DBSOCK NULL //"/var/lib/mysql/mysql.sock"
#define DBPCNT 0
int main()
{
MYSQL_RES *result;
MYSQL_ROW row;
MYSQL_FIELD *field;
unsigned int num_fields;
unsigned int i;
const char *pStatement = "SHOW TABLES";
mysql_library_init(0, NULL, NULL);
MYSQL *mysql = mysql_init(NULL);
unsigned int timeout = 3000;
mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
if (mysql_real_connect(mysql, DBHOST, DBUSER, DBPASS, DBNAME, DBPORT, DBSOCK, DBPCNT) == NULL)
{
printf("connect failed: %s\n", mysql_error(mysql));
mysql_close(mysql);
mysql_library_end();
return 0;
}
printf("connect succssfully\n");
if (0 != mysql_real_query(mysql, pStatement, strlen(pStatement)))
{
printf("query failed: %s\n", mysql_error(mysql));
mysql_close(mysql);
mysql_library_end();
return 0;
}
result = mysql_store_result(mysql);
if (result == NULL)
{
printf("fetch result failed: %s\n", mysql_error(mysql));
mysql_close(mysql);
mysql_library_end();
return 0;
}
num_fields = mysql_num_fields(result);
printf("numbers of result: %d\n", num_fields);
while (NULL != (field = mysql_fetch_field(result)))
{
printf("field name: %s\n", field->name);
}
while (NULL != (row = mysql_fetch_row(result)))
{
unsigned long *lengths;
lengths = mysql_fetch_lengths(result);
for (i = 0; i < num_fields; i++)
{
printf("{%.*s} ", (int) lengths[i], row[i] ? row[i] : "NULL");
}
printf("\n");
}
mysql_free_result(result);
mysql_close(mysql);
mysql_library_end();
return 0;
}
执行
make single_thread_mysql_client LDFLAGS="-g -O2 -L/usr/lib64/mysql -lmysqlclient -lpthread -lz -lm -lssl -lcrypto -ldl"即可获得对应单线程二进制。
多线程
- 多线程主要需要注意以下几点
mysql_library_init和mysql_library_end需要放入主线程;- 连接句柄需要多个才能加快并发,而连接句柄由
mysql_init生成,而mysql_init跟随机函数rand有点相似,第一次需要初始化后才能线程安全,所以需要使用mysql_thread_init和mysql_thread_end两个函数来保证线程安全;
- 一般多线程连接mysql代码如下
/*
muti_thread_mysql_client.cpp
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <mysql/mysql.h>
#include <pthread.h>
#include <unistd.h>
#define THREAD_NUM 4
#define DBHOST "localhost"
#define DBUSER "pca"
#define DBPASS "pca"
#define DBPORT 3306
#define DBNAME "dxponline"
#define DBSOCK NULL //"/var/lib/mysql/mysql.sock"
#define DBPCNT 0
typedef struct ThreadArgsST
{
int id;
pthread_t *thread_id;
} ThreadArgs;
void *func(void *arg)
{
ThreadArgs *args = (ThreadArgs *)arg;
MYSQL_RES *result;
MYSQL_ROW row;
MYSQL_FIELD *field;
unsigned int num_fields;
unsigned int i;
unsigned int timeout = 3000;
const char *pStatement = "SHOW TABLES";
mysql_thread_init();
MYSQL *mysql = mysql_init(NULL);
if (mysql == NULL)
{
printf("[%ld][%d]mysql init failed: %s\n", *args->thread_id, args->id, mysql_error(mysql));
return (void *)0;
}
mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
if (mysql_real_connect(mysql, DBHOST, DBUSER, DBPASS, DBNAME, DBPORT, DBSOCK, DBPCNT) == NULL)
{
printf("[%ld][%d]connect failed: %s\n", *args->thread_id, args->id, mysql_error(mysql));
mysql_close(mysql);
mysql_thread_end();
return (void *)0;
}
printf("[%ld][%d]connect succssfully\n", *args->thread_id, args->id);
if (0 != mysql_real_query(mysql, pStatement, strlen(pStatement)))
{
printf("[%ld][%d]query failed: %s\n", *args->thread_id, args->id, mysql_error(mysql));
mysql_close(mysql);
mysql_thread_end();
return (void *)0;
}
result = mysql_store_result(mysql);
if (result == NULL)
{
printf("[%ld][%d]fetch result failed: %s\n", *args->thread_id, args->id, mysql_error(mysql));
mysql_close(mysql);
mysql_thread_end();
return (void *)0;
}
num_fields = mysql_num_fields(result);
printf("[%ld][%d]numbers of result: %d\n", *args->thread_id, args->id, num_fields);
while (NULL != (field = mysql_fetch_field(result)))
{
printf("[%ld][%d]field name: %s\n", *args->thread_id, args->id, field->name);
}
while (NULL != (row = mysql_fetch_row(result)))
{
unsigned long *lengths;
lengths = mysql_fetch_lengths(result);
for (i = 0; i < num_fields; i++)
{
printf("[%ld][%d]{%.*s} ", *args->thread_id, args->id, (int) lengths[i], row[i] ? row[i] : "NULL");
}
printf("\n");
}
mysql_free_result(result);
mysql_close(mysql);
mysql_thread_end();
return (void *)0;
}
int main(int argc, char *argv[])
{
int thread_num;
if (argc == 2)
{
thread_num = atoi(argv[1]);
}
else
{
thread_num = THREAD_NUM;
}
mysql_library_init(0, NULL, NULL);
printf("argc: %d and thread_num: %d\n", argc, thread_num);
do
{
pthread_t *pTh = new pthread_t[thread_num];
ThreadArgs *pArgs = new ThreadArgs[thread_num];
int i;
for (i = 0; i < thread_num; i ++)
{
pArgs[i].id = i;
pArgs[i].thread_id = &pTh[i];
if (0 != pthread_create(&pTh[i], NULL, func, &pArgs[i]))
{
printf("pthread_create failed\n");
continue;
}
}
for (i = 0; i < thread_num; i ++)
{
pthread_join(pTh[i], NULL);
}
delete[] pTh;
delete[] pArgs;
}
while (0);
mysql_library_end();
return 0;
}
执行
make muti_thread_mysql_client LDFLAGS="-g -O2 -L/usr/lib64/mysql -lmysqlclient -lpthread -lz -lm -lssl -lcrypto -ldl"即可获得对应单线程二进制。
连接数与连接句柄是一一对应关系,故一般使用长连接,所以需要连接池,所以上面的代码可以有优化的空间,代码见:
/*
muti_thread_mysql_client_pool.cpp
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <mysql/mysql.h>
#include <pthread.h>
#include <unistd.h>
#include <queue>
#include <string>
#define THREAD_NUM 4
#define DBHOST "localhost"
#define DBUSER "pca"
#define DBPASS "pca"
#define DBPORT 3306
#define DBNAME "dxponline"
#define DBSOCK NULL //"/var/lib/mysql/mysql.sock"
#define DBPCNT 0
using namespace std;
class CBlockQueue;
typedef struct ThreadArgsST
{
int id;
pthread_t *thread_id;
CBlockQueue *pQueue;
} ThreadArgs;
class CMutex
{
public:
CMutex()
{
pthread_mutex_init(&_mutex, NULL);
}
~CMutex()
{
pthread_mutex_destroy(&_mutex);
}
int32_t lock()
{
return pthread_mutex_lock(&_mutex);
}
int32_t unlock()
{
return pthread_mutex_unlock(&_mutex);
}
int32_t trylock()
{
return pthread_mutex_trylock(&_mutex);
}
private:
pthread_mutex_t _mutex;
};
class CGlobalFunction
{
public:
static MYSQL *connect()
{
unsigned int timeout = 3000;
mysql_thread_init();
MYSQL *mysql = mysql_init(NULL);
if (mysql == NULL)
{
printf("mysql init failed: %s\n", mysql_error(mysql));
return NULL;
}
mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
if (mysql_real_connect(mysql, DBHOST, DBUSER, DBPASS, DBNAME, DBPORT, DBSOCK, DBPCNT) == NULL)
{
printf("connect failed: %s\n", mysql_error(mysql));
mysql_close(mysql);
mysql_thread_end();
return NULL;
}
printf("connect succssfully\n");
return mysql;
}
};
class CBlockQueue : public CMutex
{
public:
CBlockQueue() : _size(512)
{
}
~CBlockQueue()
{
}
void set_size(int size)
{
_size = size;
}
int size()
{
this->lock();
int size = q.size();
this->unlock();
return size;
}
bool push(void *m)
{
this->lock();
// TODO
/*
if (q.size() > _size)
{
this->unlock();
fprintf(stderr, "[QUEUE_IS_FULL]queue size over limit from push: %d\n", _size);
return false;
}
*/
q.push(m);
this->unlock();
return true;
}
void *pop()
{
this->lock();
if (q.empty())
{
this->unlock();
fprintf(stderr, "[QUEUE_IS_EMPTY]queue is no item from pop");
return NULL;
}
void *m = q.front();
q.pop();
this->unlock();
return m;
}
private:
queue q;
int _size;
};
void *func(void *arg)
{
ThreadArgs *args = (ThreadArgs *)arg;
MYSQL_RES *result;
MYSQL_ROW row;
MYSQL_FIELD *field;
bool pushed = true;
unsigned int num_fields;
unsigned int i;
const char *pStatement = "SHOW TABLES";
MYSQL *db = (MYSQL *)args->pQueue->pop();
if (db == NULL)
{
db = CGlobalFunction::connect();
if (db == NULL)
{
printf("[%ld][%d]mysql connect failed\n", *args->thread_id, args->id);
return (void *)0;
}
}
if (0 != mysql_real_query(db, pStatement, strlen(pStatement)))
{
printf("[%ld][%d]query failed: %s\n", *args->thread_id, args->id, mysql_error(db));
args->pQueue->push(db);
return (void *)0;
}
result = mysql_store_result(db);
if (result == NULL)
{
printf("[%ld][%d]fetch result failed: %s\n", *args->thread_id, args->id, mysql_error(db));
args->pQueue->push(db);
return (void *)0;
}
num_fields = mysql_num_fields(result);
printf("[%ld][%d]numbers of result: %d\n", *args->thread_id, args->id, num_fields);
while (NULL != (field = mysql_fetch_field(result)))
{
printf("[%ld][%d]field name: %s\n", *args->thread_id, args->id, field->name);
}
while (NULL != (row = mysql_fetch_row(result)))
{
unsigned long *lengths;
lengths = mysql_fetch_lengths(result);
for (i = 0; i < num_fields; i++)
{
printf("[%ld][%d]{%.*s} ", *args->thread_id, args->id, (int) lengths[i], row[i] ? row[i] : "NULL");
}
printf("\n");
}
mysql_free_result(result);
args->pQueue->push(db);
return (void *)0;
}
int main(int argc, char *argv[])
{
CBlockQueue queue;
int thread_num;
if (argc == 2)
{
thread_num = atoi(argv[1]);
}
else
{
thread_num = THREAD_NUM;
}
mysql_library_init(0, NULL, NULL);
printf("argc: %d and thread_num: %d\n", argc, thread_num);
do
{
int i;
pthread_t *pTh = new pthread_t[thread_num];
ThreadArgs *pArgs = new ThreadArgs[thread_num];
for (i = 0; i < thread_num; i ++)
{
pArgs[i].id = i;
pArgs[i].thread_id = &pTh[i];
pArgs[i].pQueue = &queue;
if (0 != pthread_create(&pTh[i], NULL, func, &pArgs[i]))
{
printf("pthread_create failed\n");
continue;
}
}
for (i = 0; i < thread_num; i ++)
{
pthread_join(pTh[i], NULL);
}
delete[] pTh;
delete[] pArgs;
int qsize = queue.size();
for (i = 0; i < qsize; i ++)
{
MYSQL *db = (MYSQL *)queue.pop();
if (NULL != db)
{
mysql_close(db);
mysql_thread_end();
}
}
}
while (0);
mysql_library_end();
return 0;
}
执行
make muti_thread_mysql_client_pool LDFLAGS="-g -O2 -L/usr/lib64/mysql -lmysqlclient -lpthread -lz -lm -lssl -lcrypto -ldl"即可获得对应单线程二进制。
上述代码就是利用队列来保持mysql连接,达到优化连接数。
总结
mysql连接与多线程处理不好,可能会造成很多问题,如
*MySQL Connection failed (#2058): This handle is already connected. Use a separate handle for each connection.*Error in my_thread_global_end(): 1 threads didn't exit- 甚至出现coredump
关于多线程连接mysql优化的思想,其实可以扩展到其他连接,如HTTP、Socket等连接中;
