码迷,mamicode.com
首页 > 其他好文 > 详细

boost库asio详解——io_service作为work pool

时间:2015-01-20 13:29:41      阅读:389      评论:0      收藏:0      [点我收藏+]

标签:

      无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。
使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.dispatch的接口也和io_service.post一样,但不同的是它是直接调用而不是经过push到队列然后在io_services.run中执行,而在这个示例当中,显然我们需要把工作交到另一个线程去完成,这样才不会影响网络接收线程池的工作以达到高效率的接收数据,这种设计与前面的netsever其实相同,这就是典型的Half Sync/Half Async。二者的区别就是netsever自己实现了工作队列,而不是直接使用io_service,这种设计实际上在win下是使用了iocp作为工作队列。
不过我更倾向于前一种设计,因为那样做,代码一切都在自己的掌握中,而io_service则是经过许多封装代码,并且本身设计只是用于处理网络完成事件的。
无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。

    

  1 #include <stdio.h> 
  2 #include <cstdlib> 
  3 #include <iostream> 
  4 #include <boost/thread.hpp> 
  5 #include <boost/aligned_storage.hpp> 
  6 #include <boost/array.hpp> 
  7 #include <boost/bind.hpp> 
  8 #include <boost/enable_shared_from_this.hpp> 
  9 #include <boost/noncopyable.hpp> 
 10 #include <boost/shared_ptr.hpp> 
 11 #include <boost/asio.hpp> 
 12 
 13 using boost::asio::ip::tcp; 
 14 
 15 class handler_allocator 
 16     : private boost::noncopyable 
 17 { 
 18 public: 
 19     handler_allocator() 
 20         : in_use_(false) 
 21     { 
 22     } 
 23 
 24     void* allocate(std::size_t size) 
 25     { 
 26         if (!in_use_ && size < storage_.size) 
 27         { 
 28             in_use_ = true; 
 29             return storage_.address(); 
 30         } 
 31         else 
 32         { 
 33             return ::operator new(size); 
 34         } 
 35     } 
 36 
 37     void deallocate(void* pointer) 
 38     { 
 39         if (pointer == storage_.address()) 
 40         { 
 41             in_use_ = false; 
 42         } 
 43         else 
 44         { 
 45             ::operator delete(pointer); 
 46         } 
 47     } 
 48 
 49 private: 
 50     // Storage space used for handler-based custom memory allocation. 
 51     boost::aligned_storage<1024> storage_; 
 52 
 53     // Whether the handler-based custom allocation storage has been used. 
 54     bool in_use_; 
 55 }; 
 56 
 57 template <typename Handler> 
 58 class custom_alloc_handler 
 59 { 
 60 public: 
 61     custom_alloc_handler(handler_allocator& a, Handler h) 
 62         : allocator_(a), 
 63         handler_(h) 
 64     { 
 65     } 
 66 
 67     template <typename Arg1> 
 68     void operator()(Arg1 arg1) 
 69     { 
 70         handler_(arg1); 
 71     } 
 72 
 73     template <typename Arg1, typename Arg2> 
 74     void operator()(Arg1 arg1, Arg2 arg2) 
 75     { 
 76         handler_(arg1, arg2); 
 77     } 
 78 
 79     friend void* asio_handler_allocate(std::size_t size, 
 80         custom_alloc_handler<Handler>* this_handler) 
 81     { 
 82         return this_handler->allocator_.allocate(size); 
 83     } 
 84 
 85     friend void asio_handler_deallocate(void* pointer, std::size_t /*size*/, 
 86         custom_alloc_handler<Handler>* this_handler) 
 87     { 
 88         this_handler->allocator_.deallocate(pointer); 
 89     } 
 90 
 91 private: 
 92     handler_allocator& allocator_; 
 93     Handler handler_; 
 94 }; 
 95 
 96 // Helper function to wrap a handler object to add custom allocation. 
 97 template <typename Handler> 
 98 inline custom_alloc_handler<Handler> make_custom_alloc_handler( 
 99     handler_allocator& a, Handler h) 
100 { 
101     return custom_alloc_handler<Handler>(a, h); 
102 } 
103 
104 /// A pool of io_service objects. 
105 class io_service_pool 
106     : private boost::noncopyable 
107 { 
108 public: 
109     /// Construct the io_service pool. 
110     explicit io_service_pool(std::size_t pool_size) : next_io_service_(0) 
111     { 
112         if (pool_size == 0) 
113             throw std::runtime_error("io_service_pool size is 0"); 
114 
115         // Give all the io_services work to do so that their run() functions will not 
116         // exit until they are explicitly stopped. 
117         for (std::size_t i = 0; i < pool_size; ++i) 
118         { 
119             io_service_ptr io_service(new boost::asio::io_service); 
120             work_ptr work(new boost::asio::io_service::work(*io_service)); 
121             io_services_.push_back(io_service); 
122             work_.push_back(work); 
123         } 
124     } 
125 
126     // Run all io_service objects in the pool. 
127     void run() 
128     { 
129         // Create a pool of threads to run all of the io_services. 
130         std::vector<boost::shared_ptr<boost::thread> > threads; 
131         for (std::size_t i = 0; i < io_services_.size(); ++i) 
132         { 
133             boost::shared_ptr<boost::thread> thread(new boost::thread( 
134                 boost::bind(&boost::asio::io_service::run, io_services_[i]))); 
135             threads.push_back(thread); 
136         } 
137 
138         // Wait for all threads in the pool to exit. 
139         for (std::size_t i = 0; i < threads.size(); ++i) 
140             threads[i]->join(); 
141     } 
142 
143     // Stop all io_service objects in the pool. 
144     void stop() 
145     { 
146         // Explicitly stop all io_services. 
147         for (std::size_t i = 0; i < io_services_.size(); ++i) 
148             io_services_[i]->stop(); 
149     } 
150 
151     // Get an io_service to use. 
152     boost::asio::io_service& get_io_service() 
153     { 
154         // Use a round-robin scheme to choose the next io_service to use. 
155         boost::asio::io_service& io_service = *io_services_[next_io_service_]; 
156         ++next_io_service_; 
157         if (next_io_service_ == io_services_.size()) 
158             next_io_service_ = 0; 
159         return io_service; 
160     } 
161 
162 private: 
163     typedef boost::shared_ptr<boost::asio::io_service> io_service_ptr; 
164     typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr; 
165 
166     /// The pool of io_services. 
167     std::vector<io_service_ptr> io_services_; 
168 
169     /// The work that keeps the io_services running. 
170     std::vector<work_ptr> work_; 
171 
172     /// The next io_service to use for a connection. 
173     std::size_t next_io_service_; 
174 }; 
175 
176 class session 
177     : public boost::enable_shared_from_this<session> 
178 { 
179 public: 
180     session(boost::asio::io_service& work_service
181         , boost::asio::io_service& io_service) 
182         : socket_(io_service) 
183         , io_work_service(work_service) 
184     { 
185     } 
186 
187     tcp::socket& socket() 
188     { 
189         return socket_; 
190     } 
191 
192     void start() 
193     { 
194         socket_.async_read_some(boost::asio::buffer(data_), 
195             make_custom_alloc_handler(allocator_, 
196             boost::bind(&session::handle_read, 
197             shared_from_this(), 
198             boost::asio::placeholders::error, 
199             boost::asio::placeholders::bytes_transferred))); 
200     } 
201 
202     void handle_read(const boost::system::error_code& error, 
203         size_t bytes_transferred) 
204     { 
205         if (!error) 
206         { 
207             boost::shared_ptr<std::vector<char> > buf(new std::vector<char>); 
208 
209             buf->resize(bytes_transferred); 
210             std::copy(data_.begin(), data_.begin() + bytes_transferred, buf->begin()); 
211             io_work_service.post(boost::bind(&session::on_receive
212                 , shared_from_this(), buf, bytes_transferred)); 
213 
214             socket_.async_read_some(boost::asio::buffer(data_), 
215                 make_custom_alloc_handler(allocator_, 
216                 boost::bind(&session::handle_read, 
217                 shared_from_this(), 
218                 boost::asio::placeholders::error, 
219                 boost::asio::placeholders::bytes_transferred))); 
220         } 
221     } 
222 
223     void handle_write(const boost::system::error_code& error) 
224     { 
225         if (!error) 
226         { 
227         } 
228     } 
229 
230     void on_receive(boost::shared_ptr<std::vector<char> > buffers
231         , size_t bytes_transferred) 
232     { 
233         char* data_stream = &(*buffers->begin()); 
234         // in here finish the work. 
235         std::cout << "receive :" << bytes_transferred << " bytes." << 
236             "message :" << data_stream << std::endl; 
237     } 
238 
239 private: 
240     // The io_service used to finish the work. 
241     boost::asio::io_service& io_work_service; 
242 
243     // The socket used to communicate with the client. 
244     tcp::socket socket_; 
245 
246     // Buffer used to store data received from the client. 
247     boost::array<char, 1024> data_; 
248 
249     // The allocator to use for handler-based custom memory allocation. 
250     handler_allocator allocator_; 
251 }; 
252 
253 typedef boost::shared_ptr<session> session_ptr; 
254 
255 class server 
256 { 
257 public: 
258     server(short port, std::size_t io_service_pool_size) 
259         : io_service_pool_(io_service_pool_size) 
260         , io_service_work_pool_(io_service_pool_size) 
261         , acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port)) 
262     { 
263         session_ptr new_session(new session(io_service_work_pool_.get_io_service()
264             , io_service_pool_.get_io_service())); 
265         acceptor_.async_accept(new_session->socket(), 
266             boost::bind(&server::handle_accept, this, new_session, 
267             boost::asio::placeholders::error)); 
268     } 
269 
270     void handle_accept(session_ptr new_session, 
271         const boost::system::error_code& error) 
272     { 
273         if (!error) 
274         { 
275             new_session->start(); 
276             new_session.reset(new session(io_service_work_pool_.get_io_service()
277                 , io_service_pool_.get_io_service())); 
278             acceptor_.async_accept(new_session->socket(), 
279                 boost::bind(&server::handle_accept, this, new_session, 
280                 boost::asio::placeholders::error)); 
281         } 
282     } 
283 
284     void run() 
285     { 
286         io_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run
287             , &io_service_pool_))); 
288         work_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run
289             , &io_service_work_pool_))); 
290     } 
291 
292     void stop() 
293     { 
294         io_service_pool_.stop(); 
295         io_service_work_pool_.stop(); 
296 
297         io_thread_->join(); 
298         work_thread_->join(); 
299     } 
300 
301 private: 
302     boost::shared_ptr<boost::thread> io_thread_; 
303     boost::shared_ptr<boost::thread> work_thread_; 
304     io_service_pool io_service_pool_; 
305     io_service_pool io_service_work_pool_; 
306     tcp::acceptor acceptor_; 
307 }; 
308 
309 int main(int argc, char* argv[]) 
310 { 
311     try 
312     { 
313         if (argc != 2) 
314         { 
315             std::cerr << "Usage: server <port>/n"; 
316             return 1; 
317         } 
318 
319         using namespace std; // For atoi. 
320         server s(atoi(argv[1]), 10); 
321 
322         s.run(); 
323 
324         getchar(); 
325 
326         s.stop(); 
327     } 
328     catch (std::exception& e) 
329     { 
330         std::cerr << "Exception: " << e.what() << "/n"; 
331     } 
332 
333     return 0; 
334 } 

 

boost库asio详解——io_service作为work pool

标签:

原文地址:http://www.cnblogs.com/jingliming/p/4235644.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!