139 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			139 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
#ifndef HV_EVENT_LOOP_THREAD_POOL_HPP_
 | 
						|
#define HV_EVENT_LOOP_THREAD_POOL_HPP_
 | 
						|
 | 
						|
#include "EventLoopThread.h"
 | 
						|
#include "hbase.h"
 | 
						|
 | 
						|
namespace hv {
 | 
						|
 | 
						|
class EventLoopThreadPool : public Status {
 | 
						|
public:
 | 
						|
    EventLoopThreadPool(int thread_num = std::thread::hardware_concurrency()) {
 | 
						|
        setStatus(kInitializing);
 | 
						|
        thread_num_ = thread_num;
 | 
						|
        next_loop_idx_ = 0;
 | 
						|
        setStatus(kInitialized);
 | 
						|
    }
 | 
						|
 | 
						|
    ~EventLoopThreadPool() {
 | 
						|
        stop();
 | 
						|
        join();
 | 
						|
    }
 | 
						|
 | 
						|
    int threadNum() {
 | 
						|
        return thread_num_;
 | 
						|
    }
 | 
						|
 | 
						|
    void setThreadNum(int num) {
 | 
						|
        thread_num_ = num;
 | 
						|
    }
 | 
						|
 | 
						|
    EventLoopPtr nextLoop(load_balance_e lb = LB_RoundRobin) {
 | 
						|
        size_t numLoops = loop_threads_.size();
 | 
						|
        if (numLoops == 0) return NULL;
 | 
						|
        size_t idx = 0;
 | 
						|
        if (lb == LB_RoundRobin) {
 | 
						|
            if (++next_loop_idx_ >= numLoops) next_loop_idx_ = 0;
 | 
						|
            idx = next_loop_idx_ % numLoops;
 | 
						|
        } else if (lb == LB_Random) {
 | 
						|
            idx = hv_rand(0, numLoops - 1);
 | 
						|
        } else if (lb == LB_LeastConnections) {
 | 
						|
            for (size_t i = 1; i < numLoops; ++i) {
 | 
						|
                if (loop_threads_[i]->loop()->connectionNum < loop_threads_[idx]->loop()->connectionNum) {
 | 
						|
                    idx = i;
 | 
						|
                }
 | 
						|
            }
 | 
						|
        } else {
 | 
						|
            // Not Implemented
 | 
						|
        }
 | 
						|
        return loop_threads_[idx]->loop();
 | 
						|
    }
 | 
						|
 | 
						|
    EventLoopPtr loop(int idx = -1) {
 | 
						|
        if (idx >= 0 && idx < (int)loop_threads_.size()) {
 | 
						|
            return loop_threads_[idx]->loop();
 | 
						|
        }
 | 
						|
        return nextLoop();
 | 
						|
    }
 | 
						|
 | 
						|
    hloop_t* hloop(int idx = -1) {
 | 
						|
        EventLoopPtr ptr = loop(idx);
 | 
						|
        return ptr ? ptr->loop() : NULL;
 | 
						|
    }
 | 
						|
 | 
						|
    // @param wait_threads_started: if ture this method will block until all loop_threads started.
 | 
						|
    // @param pre: This functor will be executed when loop_thread started.
 | 
						|
    // @param post:This Functor will be executed when loop_thread stopped.
 | 
						|
    void start(bool wait_threads_started = false,
 | 
						|
               std::function<void(const EventLoopPtr&)> pre = NULL,
 | 
						|
               std::function<void(const EventLoopPtr&)> post = NULL) {
 | 
						|
        if (thread_num_ == 0) return;
 | 
						|
        if (status() >= kStarting && status() < kStopped) return;
 | 
						|
        setStatus(kStarting);
 | 
						|
 | 
						|
        auto started_cnt = std::make_shared<std::atomic<int>>(0);
 | 
						|
        auto exited_cnt  = std::make_shared<std::atomic<int>>(0);
 | 
						|
 | 
						|
        loop_threads_.clear();
 | 
						|
        for (int i = 0; i < thread_num_; ++i) {
 | 
						|
            auto loop_thread = std::make_shared<EventLoopThread>();
 | 
						|
            const EventLoopPtr& loop = loop_thread->loop();
 | 
						|
            loop_thread->start(false,
 | 
						|
                [this, started_cnt, pre, &loop]() {
 | 
						|
                    if (++(*started_cnt) == thread_num_) {
 | 
						|
                        setStatus(kRunning);
 | 
						|
                    }
 | 
						|
                    if (pre) pre(loop);
 | 
						|
                    return 0;
 | 
						|
                },
 | 
						|
                [this, exited_cnt, post, &loop]() {
 | 
						|
                    if (post) post(loop);
 | 
						|
                    if (++(*exited_cnt) == thread_num_) {
 | 
						|
                        setStatus(kStopped);
 | 
						|
                    }
 | 
						|
                    return 0;
 | 
						|
                }
 | 
						|
            );
 | 
						|
            loop_threads_.push_back(loop_thread);
 | 
						|
        }
 | 
						|
 | 
						|
        if (wait_threads_started) {
 | 
						|
            while (status() < kRunning) {
 | 
						|
                hv_delay(1);
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    // @param wait_threads_started: if ture this method will block until all loop_threads stopped.
 | 
						|
    // stop thread-safe
 | 
						|
    void stop(bool wait_threads_stopped = false) {
 | 
						|
        if (status() < kStarting || status() >= kStopping) return;
 | 
						|
        setStatus(kStopping);
 | 
						|
 | 
						|
        for (auto& loop_thread : loop_threads_) {
 | 
						|
            loop_thread->stop(false);
 | 
						|
        }
 | 
						|
 | 
						|
        if (wait_threads_stopped) {
 | 
						|
            join();
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    // @brief join all loop_threads
 | 
						|
    // @note  destructor will join loop_threads if you forget to call this method.
 | 
						|
    void join() {
 | 
						|
        for (auto& loop_thread : loop_threads_) {
 | 
						|
            loop_thread->join();
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
private:
 | 
						|
    int                                         thread_num_;
 | 
						|
    std::vector<EventLoopThreadPtr>             loop_threads_;
 | 
						|
    std::atomic<unsigned int>                   next_loop_idx_;
 | 
						|
};
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
#endif // HV_EVENT_LOOP_THREAD_POOL_HPP_
 |