mysql多线程问题

单线程

  • 一般情况下,单线程连接mysql代码如下:
    /*
        single_thread_mysql_client.cpp
    */
    #include <stdio.h>
    #include <string.h>
    #include <stdlib.h>
    #include <mysql/mysql.h>
    #include <pthread.h>
    #include <unistd.h>

    #define DBHOST      "localhost"
    #define DBUSER      "pca"
    #define DBPASS      "pca"
    #define DBPORT      3306
    #define DBNAME      "dxponline"
    #define DBSOCK      NULL //"/var/lib/mysql/mysql.sock"
    #define DBPCNT      0

    int main()
    {
        MYSQL_RES *result;
        MYSQL_ROW row;
        MYSQL_FIELD *field;
        unsigned int num_fields;
        unsigned int i;
        const char *pStatement = "SHOW TABLES";
        mysql_library_init(0, NULL, NULL);
        MYSQL *mysql = mysql_init(NULL);
        unsigned int timeout = 3000;
        mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);

        if (mysql_real_connect(mysql, DBHOST, DBUSER, DBPASS, DBNAME, DBPORT, DBSOCK, DBPCNT) == NULL)
        {
            printf("connect failed: %s\n", mysql_error(mysql));
            mysql_close(mysql);
            mysql_library_end();
            return 0;
        }

        printf("connect succssfully\n");

        if (0 != mysql_real_query(mysql, pStatement, strlen(pStatement)))
        {
            printf("query failed: %s\n", mysql_error(mysql));
            mysql_close(mysql);
            mysql_library_end();
            return 0;
        }

        result = mysql_store_result(mysql);

        if (result == NULL)
        {
            printf("fetch result failed: %s\n", mysql_error(mysql));
            mysql_close(mysql);
            mysql_library_end();
            return 0;
        }

        num_fields = mysql_num_fields(result);
        printf("numbers of result: %d\n", num_fields);

        while (NULL != (field = mysql_fetch_field(result)))
        {
            printf("field name: %s\n", field->name);
        }

        while (NULL != (row = mysql_fetch_row(result)))
        {
            unsigned long *lengths;
            lengths = mysql_fetch_lengths(result);

            for (i = 0; i < num_fields; i++)
            {
                printf("{%.*s} ", (int) lengths[i], row[i] ? row[i] : "NULL");
            }

            printf("\n");
        }

        mysql_free_result(result);
        mysql_close(mysql);
        mysql_library_end();
        return 0;
    }
  • 执行

    make single_thread_mysql_client LDFLAGS="-g -O2 -L/usr/lib64/mysql -lmysqlclient -lpthread -lz -lm -lssl -lcrypto -ldl"

    即可获得对应单线程二进制。

多线程

  • 多线程主要需要注意以下几点
    • mysql_library_initmysql_library_end 需要放入主线程;
    • 连接句柄需要多个才能加快并发,而连接句柄由 mysql_init 生成,而 mysql_init 跟随机函数 rand 有点相似,第一次需要初始化后才能线程安全,所以需要使用 mysql_thread_initmysql_thread_end 两个函数来保证线程安全;
  • 一般多线程连接mysql代码如下
    /*
        muti_thread_mysql_client.cpp
    */
    #include <stdio.h>
    #include <string.h>
    #include <stdlib.h>
    #include <mysql/mysql.h>
    #include <pthread.h>
    #include <unistd.h>

    #define THREAD_NUM  4
    #define DBHOST      "localhost"
    #define DBUSER      "pca"
    #define DBPASS      "pca"
    #define DBPORT      3306
    #define DBNAME      "dxponline"
    #define DBSOCK      NULL //"/var/lib/mysql/mysql.sock"
    #define DBPCNT      0

    typedef struct ThreadArgsST
    {
        int id;
        pthread_t *thread_id;
    } ThreadArgs;

    void *func(void *arg)
    {
        ThreadArgs *args = (ThreadArgs *)arg;
        MYSQL_RES *result;
        MYSQL_ROW row;
        MYSQL_FIELD *field;
        unsigned int num_fields;
        unsigned int i;
        unsigned int timeout = 3000;
        const char *pStatement = "SHOW TABLES";
        mysql_thread_init();
        MYSQL *mysql = mysql_init(NULL);

        if (mysql == NULL)
        {
            printf("[%ld][%d]mysql init failed: %s\n", *args->thread_id, args->id, mysql_error(mysql));
            return (void *)0;
        }

        mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);

        if (mysql_real_connect(mysql, DBHOST, DBUSER, DBPASS, DBNAME, DBPORT, DBSOCK, DBPCNT) == NULL)
        {
            printf("[%ld][%d]connect failed: %s\n", *args->thread_id, args->id, mysql_error(mysql));
            mysql_close(mysql);
            mysql_thread_end();
            return (void *)0;
        }

        printf("[%ld][%d]connect succssfully\n", *args->thread_id, args->id);

        if (0 != mysql_real_query(mysql, pStatement, strlen(pStatement)))
        {
            printf("[%ld][%d]query failed: %s\n", *args->thread_id, args->id, mysql_error(mysql));
            mysql_close(mysql);
            mysql_thread_end();
            return (void *)0;
        }

        result = mysql_store_result(mysql);

        if (result == NULL)
        {
            printf("[%ld][%d]fetch result failed: %s\n", *args->thread_id, args->id, mysql_error(mysql));
            mysql_close(mysql);
            mysql_thread_end();
            return (void *)0;
        }

        num_fields = mysql_num_fields(result);
        printf("[%ld][%d]numbers of result: %d\n", *args->thread_id, args->id, num_fields);

        while (NULL != (field = mysql_fetch_field(result)))
        {
            printf("[%ld][%d]field name: %s\n", *args->thread_id, args->id, field->name);
        }

        while (NULL != (row = mysql_fetch_row(result)))
        {
            unsigned long *lengths;
            lengths = mysql_fetch_lengths(result);

            for (i = 0; i < num_fields; i++)
            {
                printf("[%ld][%d]{%.*s} ", *args->thread_id, args->id, (int) lengths[i], row[i] ? row[i] : "NULL");
            }

            printf("\n");
        }

        mysql_free_result(result);
        mysql_close(mysql);
        mysql_thread_end();
        return (void *)0;
    }

    int main(int argc, char *argv[])
    {
        int thread_num;

        if (argc == 2)
        {
            thread_num = atoi(argv[1]);
        }
        else
        {
            thread_num = THREAD_NUM;
        }

        mysql_library_init(0, NULL, NULL);
        printf("argc: %d and thread_num: %d\n", argc, thread_num);

        do
        {
            pthread_t *pTh = new pthread_t[thread_num];
            ThreadArgs *pArgs = new ThreadArgs[thread_num];
            int i;

            for (i = 0; i < thread_num; i ++)
            {
                pArgs[i].id = i;
                pArgs[i].thread_id = &pTh[i];

                if (0 != pthread_create(&pTh[i], NULL, func, &pArgs[i]))
                {
                    printf("pthread_create failed\n");
                    continue;
                }
            }

            for (i = 0; i < thread_num; i ++)
            {
                pthread_join(pTh[i], NULL);
            }

            delete[] pTh;
            delete[] pArgs;
        }
        while (0);

        mysql_library_end();
        return 0;
    }
  • 执行

    make muti_thread_mysql_client LDFLAGS="-g -O2 -L/usr/lib64/mysql -lmysqlclient -lpthread -lz -lm -lssl -lcrypto -ldl"

    即可获得对应单线程二进制。

  • 连接数与连接句柄是一一对应关系,故一般使用长连接,所以需要连接池,所以上面的代码可以有优化的空间,代码见:

    /*
        muti_thread_mysql_client_pool.cpp
    */
    #include <stdio.h>
    #include <string.h>
    #include <stdlib.h>
    #include <mysql/mysql.h>
    #include <pthread.h>
    #include <unistd.h>
    #include <queue>
    #include <string>

    #define THREAD_NUM  4
    #define DBHOST      "localhost"
    #define DBUSER      "pca"
    #define DBPASS      "pca"
    #define DBPORT      3306
    #define DBNAME      "dxponline"
    #define DBSOCK      NULL //"/var/lib/mysql/mysql.sock"
    #define DBPCNT      0

    using namespace std;

    class CBlockQueue;
    typedef struct ThreadArgsST
    {
        int id;
        pthread_t *thread_id;
        CBlockQueue *pQueue;
    } ThreadArgs;

    class CMutex
    {
    public:
        CMutex()
        {
            pthread_mutex_init(&_mutex, NULL);
        }
        ~CMutex()
        {
            pthread_mutex_destroy(&_mutex);
        }

        int32_t lock()
        {
            return pthread_mutex_lock(&_mutex);
        }

        int32_t unlock()
        {
            return pthread_mutex_unlock(&_mutex);
        }

        int32_t trylock()
        {
            return pthread_mutex_trylock(&_mutex);
        }

    private:
        pthread_mutex_t _mutex;
    };

    class CGlobalFunction
    {
    public:
        static MYSQL *connect()
        {
            unsigned int timeout = 3000;
            mysql_thread_init();
            MYSQL *mysql = mysql_init(NULL);

            if (mysql == NULL)
            {
                printf("mysql init failed: %s\n", mysql_error(mysql));
                return NULL;
            }

            mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);

            if (mysql_real_connect(mysql, DBHOST, DBUSER, DBPASS, DBNAME, DBPORT, DBSOCK, DBPCNT) == NULL)
            {
                printf("connect failed: %s\n", mysql_error(mysql));
                mysql_close(mysql);
                mysql_thread_end();
                return NULL;
            }

            printf("connect succssfully\n");
            return mysql;
        }
    };

    class CBlockQueue : public CMutex
    {
    public:
        CBlockQueue() : _size(512)
        {
        }
        ~CBlockQueue()
        {
        }
        void set_size(int size)
        {
            _size = size;
        }
        int size()
        {
            this->lock();
            int size = q.size();
            this->unlock();
            return size;
        }
        bool push(void *m)
        {
            this->lock();
            // TODO
            /*
            if (q.size() > _size)
            {
                this->unlock();
                fprintf(stderr, "[QUEUE_IS_FULL]queue size over limit from push: %d\n", _size);
                return false;
            }
            */
            q.push(m);
            this->unlock();
            return true;
        }

        void *pop()
        {
            this->lock();

            if (q.empty())
            {
                this->unlock();
                fprintf(stderr, "[QUEUE_IS_EMPTY]queue is no item from pop");
                return NULL;
            }

            void *m = q.front();
            q.pop();
            this->unlock();
            return m;
        }

    private:
        queue q;
        int _size;
    };

    void *func(void *arg)
    {
        ThreadArgs *args = (ThreadArgs *)arg;
        MYSQL_RES *result;
        MYSQL_ROW row;
        MYSQL_FIELD *field;
        bool pushed = true;
        unsigned int num_fields;
        unsigned int i;
        const char *pStatement = "SHOW TABLES";
        MYSQL *db = (MYSQL *)args->pQueue->pop();

        if (db == NULL)
        {
            db = CGlobalFunction::connect();

            if (db == NULL)
            {
                printf("[%ld][%d]mysql connect failed\n", *args->thread_id, args->id);
                return (void *)0;
            }
        }

        if (0 != mysql_real_query(db, pStatement, strlen(pStatement)))
        {
            printf("[%ld][%d]query failed: %s\n", *args->thread_id, args->id, mysql_error(db));
            args->pQueue->push(db);
            return (void *)0;
        }

        result = mysql_store_result(db);

        if (result == NULL)
        {
            printf("[%ld][%d]fetch result failed: %s\n", *args->thread_id, args->id, mysql_error(db));
            args->pQueue->push(db);
            return (void *)0;
        }

        num_fields = mysql_num_fields(result);
        printf("[%ld][%d]numbers of result: %d\n", *args->thread_id, args->id, num_fields);

        while (NULL != (field = mysql_fetch_field(result)))
        {
            printf("[%ld][%d]field name: %s\n", *args->thread_id, args->id, field->name);
        }

        while (NULL != (row = mysql_fetch_row(result)))
        {
            unsigned long *lengths;
            lengths = mysql_fetch_lengths(result);

            for (i = 0; i < num_fields; i++)
            {
                printf("[%ld][%d]{%.*s} ", *args->thread_id, args->id, (int) lengths[i], row[i] ? row[i] : "NULL");
            }

            printf("\n");
        }

        mysql_free_result(result);
        args->pQueue->push(db);
        return (void *)0;
    }

    int main(int argc, char *argv[])
    {
        CBlockQueue queue;
        int thread_num;

        if (argc == 2)
        {
            thread_num = atoi(argv[1]);
        }
        else
        {
            thread_num = THREAD_NUM;
        }

        mysql_library_init(0, NULL, NULL);
        printf("argc: %d and thread_num: %d\n", argc, thread_num);

        do
        {
            int i;  
            pthread_t *pTh = new pthread_t[thread_num];
            ThreadArgs *pArgs = new ThreadArgs[thread_num];

            for (i = 0; i < thread_num; i ++)
            {
                pArgs[i].id = i;
                pArgs[i].thread_id = &pTh[i];
                pArgs[i].pQueue = &queue;

                if (0 != pthread_create(&pTh[i], NULL, func, &pArgs[i]))
                {
                    printf("pthread_create failed\n");
                    continue;
                }
            }

            for (i = 0; i < thread_num; i ++)
            {
                pthread_join(pTh[i], NULL);
            }

            delete[] pTh;
            delete[] pArgs;
            int qsize = queue.size();

            for (i = 0; i < qsize; i ++)
            {
                MYSQL *db = (MYSQL *)queue.pop();

                if (NULL != db)
                {
                    mysql_close(db);
                    mysql_thread_end();
                }
            }
        }
        while (0);

        mysql_library_end();
        return 0;
    }
  • 执行

    make muti_thread_mysql_client_pool LDFLAGS="-g -O2 -L/usr/lib64/mysql -lmysqlclient -lpthread -lz -lm -lssl -lcrypto -ldl"

    即可获得对应单线程二进制。

  • 上述代码就是利用队列来保持mysql连接,达到优化连接数。

总结

  • mysql连接与多线程处理不好,可能会造成很多问题,如

    • *MySQL Connection failed (#2058): This handle is already connected. Use a separate handle for each connection.*
    • Error in my_thread_global_end(): 1 threads didn't exit
    • 甚至出现coredump
  • 关于多线程连接mysql优化的思想,其实可以扩展到其他连接,如HTTP、Socket等连接中;

Monthly Archives

Pages

Powered by Movable Type 7.7.2

About this Entry

This page contains a single entry by Cnangel published on June 30, 2015 2:11 PM.

fedora22上ssh 的问题 was the previous entry in this blog.

CPAN 二十岁了! is the next entry in this blog.

Find recent content on the main index or look in the archives to find all content.