码迷,mamicode.com
首页 > 编程语言 > 详细

zookeeper 实现一个简单的服务注册与发现(C++) 一:与zk保持连接

时间:2020-07-15 13:05:04      阅读:113      评论:0      收藏:0      [点我收藏+]

标签:logs   com   check   seq   std   ror   max   base   connect   

git:git@github.com:ccx19930930/services_register_and_discovery.git

参考链接:https://www.cnblogs.com/haippy/archive/2013/02/21/2920280.html

 

禁止拷贝基类:

base_class.h:

1 class CUnCopyable 
2 {
3 protected:
4     CUnCopyable() {}
5     ~CUnCopyable() {}
6 private:
7     CUnCopyable(const CUnCopyable&);
8     CUnCopyable& operator=(const CUnCopyable&);
9 };

 

auto_lock.h

 1 #ifndef _AUTO_LOCK_H_
 2 #define _AUTO_LOCK_H_
 3 
 4 #include <pthread.h>
 5 
 6 
 7 class CAutoMutexLock
 8 {
 9 public:
10     CAutoMutexLock(pthread_mutex_t& mutex)
11         : m_mutex(mutex)
12     {
13         pthread_mutex_lock(&m_mutex);
14     }
15 
16     ~CAutoMutexLock()
17     {
18         pthread_mutex_unlock(&m_mutex);
19     }
20 
21 private:
22     pthread_mutex_t& m_mutex;
23 };
24 
25 #endif

 

zk_handle.h:

 1 #ifndef _ZK_HANDLE_H_
 2 #define _ZK_HANDLE_H_
 3 
 4 #include "base_class.h"
 5 
 6 #include <set>
 7 #include <string>
 8 
 9 #include <pthread.h>
10 
11 #include <zookeeper.h>
12 #include <zookeeper_log.h>
13 #include <zookeeper.jute.h>
14 
15 using namespace std;
16 
17 typedef void (*OnResetHandle_Fn)();
18 class CZkHandle : public CUnCopyable
19 {
20 private:
21     static pthread_mutex_t m_mutex;
22     static CZkHandle* m_pins;
23     CZkHandle();
24 public:
25     static CZkHandle* GetInstance();
26 
27 public:
28     int ZkInit(const string& host_list, const int time_out);
29     int ZkClose();
30     int ZkExists(const string& path, struct Stat & stat);
31 
32 public:
33     int ZkCreateNode(const string& path, const string& value, bool is_sequential, string& raw_node_name);
34     int ZkDeleteNode(const string& path, const int version = -1);
35 
36 public:
37     int ZkGetChildren(const string& path, set<string>& node_list);
38     int ZkGetNodeInfo(const string& path, string & info, struct Stat& stat);
39 
40     int ZkWgetChildren(const string& path, watcher_fn watcher, set<string>& node_list);
41     int ZkWGetNodeInfo(const string& path, watcher_fn watcher, string& info, struct Stat& stat);
42 
43 public:
44     int ZkSetNodeInfo(const string& path, const string& value);
45 
46 public:
47     int AddResetHandleFn(string type, OnResetHandle_Fn func);
48     int DelResetHandleFn(string type);
49 
50 private:
51     static void ZkInitWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx);
52     int ResetZkHandle();
53 
54 private:
55     static void* ZkHandleCheckThread(void* param);
56     int ZkHandleCheck();
57     bool IsRunning();
58 
59 private:
60     pthread_t m_handle_check_thread_id;
61     bool m_is_running;
62     zhandle_t* m_zk_handle;
63     string m_host_list;
64     int m_time_out;
65 
66     map<string, OnResetHandle_Fn> m_on_reset_handle_fn_list;
67 };
68 
69 #endif

 

zk_handle.cpp

技术图片
  1 #include "zk_handle.h"
  2 #include "auto_lock.h"
  3 
  4 #include <stdio.h>
  5 
  6 #include <sys/prctl.h>
  7 #include <unistd.h>
  8 #include <pthread.h>
  9 
 10 CZkHandle* CZkHandle::m_pins = nullptr;
 11 pthread_mutex_t CZkHandle::m_mutex;
 12 
 13 CZkHandle::CZkHandle()
 14 : m_handle_check_thread_id(0)
 15 , m_zk_handle(nullptr)
 16 , m_is_running(false)
 17 , m_host_list("")
 18 , m_time_out(0)
 19 {
 20 }
 21 
 22 CZkHandle* CZkHandle::GetInstance()
 23 {
 24     if (m_pins == nullptr)
 25     {
 26         CAutoMutexLock auto_lock(m_mutex);
 27         if (m_pins == nullptr)
 28         {
 29             m_pins = new CZkHandle;
 30         }
 31     }
 32     return m_pins;
 33 }
 34 
 35 int CZkHandle::ZkInit(const string& host_list, const int time_out)
 36 {
 37     zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
 38     m_host_list = host_list;
 39     m_time_out = time_out;
 40 
 41     if (0 == m_handle_check_thread_id)
 42     {
 43         m_is_running = true;
 44         if (0 != pthread_create(&m_handle_check_thread_id, nullptr, CZkHandle::ZkHandleCheckThread, nullptr))
 45         {
 46             printf("CZkHandle::ZkInit create register check thread fail.\n");
 47             return -1;
 48         }
 49         printf("CZkHandle::ZkInit create register check thread succ.\n");
 50     }
 51     return 0;
 52 }
 53 
 54 int CZkHandle::ZkClose()
 55 {
 56     if (m_zk_handle)
 57     {
 58         zookeeper_close(m_zk_handle);
 59         m_zk_handle = nullptr;
 60     }
 61     printf("CZkHandle::ZkClose: zk close.\n");
 62 }
 63 
 64 int CZkHandle::ZkExists(const string& path, struct Stat & stat)
 65 {
 66     int ret_code = zoo_exists(m_zk_handle, path.c_str(), 0, &stat);
 67     printf("CZkHandle::ZkExists: [ret=%d]\n", ret_code);
 68     if(ZOK == ret_code)
 69     {
 70         printf("CZkHandle::ZkExists: [path=%s] [czxid=%ld] [mzxid=%ld] [version=%d] [cversion=%d] [child_num=%d]\n",
 71             path.c_str(), stat.czxid, stat.mzxid, stat.version, stat.cversion, stat.numChildren);
 72     }
 73     return ret_code;
 74 }
 75 
 76 int CZkHandle::ZkCreateNode(const string& path, const string& value, bool is_sequential, string& raw_node_name)
 77 {
 78     int ret_code = 0;
 79     char tmp_name[kMaxBufferLen];
 80     int flag = ZOO_EPHEMERAL;
 81     if (is_sequential)
 82     {
 83         flag |= ZOO_SEQUENCE;
 84     }
 85     printf("CZkHandle::ZkCreateNode create node [path=%s] [value=%s]\n", path.c_str(), value.c_str());
 86     
 87     ret_code = zoo_create(m_zk_handle, path.c_str(), value.c_str(), value.size(), &ZOO_OPEN_ACL_UNSAFE, flag, tmp_name, kMaxBufferLen);
 88     if (ZOK != ret_code)
 89     {
 90         printf("CZkHandle::ZkCreateNode create node fail. ret=%d\n", ret_code);
 91     }
 92     else
 93     {
 94         printf("CZkHandle::ZkCreateNode create node succ! path=%s\n", tmp_name);
 95     }
 96 
 97     raw_node_name = tmp_name;
 98     return ret_code;
 99 }
100 
101 int CZkHandle::ZkDeleteNode(const string& path, const int version /*= -1*/)
102 {
103     int ret_code = zoo_delete(m_zk_handle, path.c_str(), version);
104     printf("CZkHandle::ZkDeleteNode delete node path=%s version=%d ret=%d\n", path.c_str(), version, ret_code);
105     return ret_code;
106 }
107 
108 int CZkHandle::ZkGetChildren(const string& path, set<string>& node_list)
109 {
110     int ret_code = 0;
111     struct String_vector children_list;
112 
113     printf("CZkHandle::ZkGetChildren get children for path=%s\n", path.c_str());
114 
115     ret_code = zoo_get_children(m_zk_handle, path.c_str(), 0, &children_list);
116 
117     if (ZOK != ret_code)
118     {
119         printf("CZkHandle::ZkGetChildren get children fail. ret=%d\n", ret_code);
120         return ret_code;
121     }
122     
123     printf("CZkHandle::ZkGetChildren get children succ. children_num=%d\n", children_list.count);
124 
125     for (unsigned int children_idx = 0; children_idx < children_list.count; ++children_idx)
126     {
127         printf("CZkHandle::ZkGetChildren children_idx=%u, children_name=%s\n", children_idx, children_list.data[children_idx]);
128         node_list.insert(children_list.data[children_idx]);
129     }
130 
131     deallocate_String_vector(&children_list);
132     return ret_code;
133 }
134 
135 int CZkHandle::ZkGetNodeInfo(const string& path, string& info, struct Stat& stat)
136 {
137     int ret_code = 0;
138 
139     char buffer[kMaxBufferLen];
140     int buffer_len = kMaxBufferLen;
141 
142     printf("CZkHandle::ZkGetNodeInfo get node info for path=%s\n", path.c_str());
143     ret_code = zoo_get(m_zk_handle, path.c_str(), 0, buffer, &buffer_len, &stat);
144 
145     if (ZOK != ret_code)
146     {
147         printf("CZkHandle::ZkGetNodeInfo get node info for path=%s fail. ret=%d\n", path.c_str(), ret_code);
148         return ret_code;
149     }
150 
151     buffer[buffer_len] = 0;
152     printf("CZkHandle::ZkGetNodeInfo get node info for path=%s succ. buffer=%s\n", path.c_str(), buffer);
153     printf("CZkHandle::ZkGetNodeInfo: [path=%s] [czxid=%ld] [mzxid=%ld] [version=%d] [cversion=%d] [child_num=%d]\n",
154         path.c_str(), stat.czxid, stat.mzxid, stat.version, stat.cversion, stat.numChildren);
155 
156     info = buffer;
157 
158     return ret_code;
159 }
160 
161 int CZkHandle::ZkWgetChildren(const string& path, watcher_fn watcher, set<string>& node_list)
162 {
163     int ret_code = 0;
164     struct String_vector children_list;
165 
166     printf("CZkHandle::ZkWgetChildren get children for path=%s\n", path.c_str());
167 
168     ret_code = zoo_wget_children(m_zk_handle, path.c_str(), watcher, NULL, &children_list);
169 
170     if (ZOK != ret_code)
171     {
172         printf("CZkHandle::ZkWgetChildren get children fail. ret=%d\n", ret_code);
173         return ret_code;
174     }
175 
176     printf("CZkHandle::ZkWgetChildren get children succ. children_num=%d\n", children_list.count);
177 
178     for (unsigned int children_idx = 0; children_idx < children_list.count; ++children_idx)
179     {
180         printf("CZkHandle::ZkWgetChildren children_idx=%u, children_name=%s\n", children_idx, children_list.data[children_idx]);
181         node_list.insert(children_list.data[children_idx]);
182     }
183 
184     deallocate_String_vector(&children_list);
185     return ret_code;
186 }
187 
188 int CZkHandle::ZkWGetNodeInfo(const string& path, watcher_fn watcher, string& info, struct Stat& stat)
189 {
190     int ret_code = 0;
191 
192     char buffer[kMaxBufferLen];
193     int buffer_len = kMaxBufferLen;
194 
195     printf("CZkHandle::ZkWGetNodeInfo get node info for path=%s\n", path.c_str());
196     ret_code = zoo_wget(m_zk_handle, path.c_str(), watcher, NULL, buffer, &buffer_len, &stat);
197 
198     if (ZOK != ret_code)
199     {
200         printf("CZkHandle::ZkWGetNodeInfo get node info for path=%s fail. ret=%d\n", path.c_str(), ret_code);
201         return ret_code;
202     }
203 
204     buffer[buffer_len] = 0;
205     printf("CZkHandle::ZkWGetNodeInfo get node info for path=%s succ. buffer=%s\n", path.c_str(), buffer);
206     printf("CZkHandle::ZkWGetNodeInfo: [path=%s] [czxid=%ld] [mzxid=%ld] [version=%d] [cversion=%d] [child_num=%d]\n",
207         path.c_str(), stat.czxid, stat.mzxid, stat.version, stat.cversion, stat.numChildren);
208 
209     info = buffer;
210 
211     return ret_code;
212 }
213 
214 int CZkHandle::ZkSetNodeInfo(const string& path, const string& value)
215 {
216     printf("CZkHandle::ZkSetNodeInfo set node info. path=%s value=%s\n", path.c_str(), value.c_str());
217     int ret_code = zoo_set(m_zk_handle, path.c_str(), value.c_str(), value.size(), -1);
218     if (ZOK != ret_code)
219     {
220         printf("CZkHandle::ZkSetNodeInfo set node fail.");
221     }
222     return ret_code;
223 }
224 
225 int CZkHandle::AddResetHandleFn(string type, OnResetHandle_Fn func)
226 {
227     CAutoMutexLock auto_lock(m_mutex);
228     m_on_reset_handle_fn_list[type] = func;
229     return 0;
230 }
231 
232 int CZkHandle::DelResetHandleFn(string type)
233 {
234     CAutoMutexLock auto_lock(m_mutex);
235     if (m_on_reset_handle_fn_list.count(type))
236     {
237         m_on_reset_handle_fn_list.erase(type);
238     }
239     return 0;
240 }
241 
242 void CZkHandle::ZkInitWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)
243 {
244     printf("CZkHandle::ZkInitWatchar: [type=%d] [state=%d] [path=%s] [watcher_ctx=%p]\n", type, state, path, watcherCtx);
245 }
246 
247 int CZkHandle::ResetZkHandle()
248 {
249     CAutoMutexLock auto_lock(m_mutex);
250     zhandle_t* new_zk_handle = zookeeper_init(m_host_list.c_str(), &ZkInitWatcher, m_time_out, 0, nullptr, 0);
251     if (nullptr == new_zk_handle)
252     {
253         printf("CZkHandle::ResetZkHandle: connect to zk fail.\n");
254         return -1;
255     }
256     printf("CZkHandle::ResetZkHandle: connect to zk succ.\n");
257 
258     zhandle_t* old_zk_handle = m_zk_handle;
259     m_zk_handle = new_zk_handle;
260 
261     for (const auto & func : m_on_reset_handle_fn_list)
262     {
263         func.second();
264     }
265 
266     if (old_zk_handle)
267     {
268         zookeeper_close(old_zk_handle);
269     }
270     return 0;
271 }
272 
273 void* CZkHandle::ZkHandleCheckThread(void* param)
274 {
275     prctl(PR_SET_NAME, "zk_handle_check");
276     while (true == CZkHandle::GetInstance()->IsRunning())
277     {
278         CZkHandle::GetInstance()->ZkHandleCheck();
279         usleep(kZkHandleIntervalTime);
280     }
281     return nullptr;
282 }
283 
284 int CZkHandle::ZkHandleCheck()
285 {
286     struct Stat stat;
287     if (m_zk_handle == nullptr)
288     {
289         ResetZkHandle();
290     }
291     else if(ZOK != ZkExists("/", stat))
292     {
293         ResetZkHandle();
294     }
295 }
296 
297 bool CZkHandle::IsRunning()
298 {
299     return m_is_running;
300 }
View Code

 

zk_handle_test main.cpp

 1 #include "zk_handle.h"
 2 
 3 #include <unistd.h>
 4 
 5 //伪分布式部署 host list最好以配置文件形式,此处为测试程序,暂时写死
 6 const char* host_list = "xx.xx.xx.xx:port,xx.xx.xx.xx:port,xx.xx.xx.xx:port";
 7 const int time_out = 50000;
 8 int main()
 9 {
10     CZkHandle* zk_handle = CZkHandle::GetInstance();
11     struct Stat stat;
12     set<string> node_list;
13     string node_info;
14     string raw_name;
15 
16     zk_handle->ZkInit(host_list, time_out);
17     sleep(1);
18     zk_handle->ZkExists("/", stat);
19 
20     zk_handle->ZkCreateNode("/test_1", "1", true, raw_name);
21     zk_handle->ZkCreateNode("/test_1", "1", true, raw_name);
22     zk_handle->ZkCreateNode("/test_2", "2", false, raw_name);
23     zk_handle->ZkCreateNode("/test_2", "2", false, raw_name);
24 
25     zk_handle->ZkGetNodeInfo("/test_2", node_info, stat);
26     zk_handle->ZkSetNodeInfo("/test_2", "3");
27     zk_handle->ZkGetNodeInfo("/test_2", node_info, stat);
28 
29 
30     node_list.clear();
31     zk_handle->ZkGetChildren("/", node_list);
32 
33     zk_handle->ZkDeleteNode("/test_2");
34     node_list.clear();
35     zk_handle->ZkGetChildren("/", node_list);
36 
37     sleep(60);
38 
39     zk_handle->ZkClose();
40     return 0;
41 }

 

Makefile

INC_DIR:= ./ ../zk_util/ /usr/local/include/zookeeper/ /usr/local/include/json/
SRCS:= $(wildcard ./*cpp ../zk_util/*cpp)
OBJS:= $(patsubst %.cpp, %.o, $(SRCS))
LIBS:= -lpthread -lzookeeper_mt -ljsoncpp

CXX:= g++

CXXFLAGS:= -w -g -std=c++11 $(addprefix -I, $(INC_DIR)) $(LIBS) -Wl,-rpath="/usr/local/lib"

EXE:= ../../bin/zk_handle_test

$(EXE):$(OBJS)
    $(CXX) -o $(EXE) $(OBJS) $(CXXFLAGS)

clean:
    rm -rf $(EXE)
    rm -rf $(OBJS)

 

执行结果:

ccx@ccx:~/self_test/zookeeper/services_register_and_discovery/bin$ ll
total 1068
drwxrwxrwx 1 ccx ccx     512 May 30 18:24 ./
drwxrwxrwx 1 ccx ccx     512 May 30 18:20 ../
-rwxrwxrwx 1 ccx ccx 1092352 May 30 18:24 zk_handle_test*
ccx@ccx:~/self_test/zookeeper/services_register_and_discovery/bin$ ./zk_handle_test 
CZkHandle::ZkInit create register check thread succ.
CZkHandle::ResetZkHandle: connect to zk succ.
CZkHandle::ZkInitWatchar: [type=-1] [state=3] [path=] [watcher_ctx=(nil)]
CZkHandle::ZkExists: [ret=0]
CZkHandle::ZkExists: [path=/] [czxid=0] [mzxid=0] [version=0] [cversion=285] [child_num=5]
CZkHandle::ZkCreateNode create node [path=/test_1] [value=1]
CZkHandle::ZkExists: [ret=0]
CZkHandle::ZkExists: [path=/] [czxid=0] [mzxid=0] [version=0] [cversion=285] [child_num=5]
CZkHandle::ZkCreateNode create node succ! path=/test_10000000145
CZkHandle::ZkCreateNode create node [path=/test_1] [value=1]
CZkHandle::ZkCreateNode create node succ! path=/test_10000000146
CZkHandle::ZkCreateNode create node [path=/test_2] [value=2]
CZkHandle::ZkCreateNode create node succ! path=/test_2
CZkHandle::ZkCreateNode create node [path=/test_2] [value=2]
CZkHandle::ZkCreateNode create node fail. ret=-110
CZkHandle::ZkGetNodeInfo get node info for path=/test_2
CZkHandle::ZkGetNodeInfo get node info for path=/test_2 succ. buffer=2
CZkHandle::ZkGetNodeInfo: [path=/test_2] [czxid=17179873741] [mzxid=17179873741] [version=0] [cversion=0] [child_num=0]
CZkHandle::ZkSeeNodeInfo set node info. path=/test_2 value=3
CZkHandle::ZkGetNodeInfo get node info for path=/test_2
CZkHandle::ZkGetNodeInfo get node info for path=/test_2 succ. buffer=3
CZkHandle::ZkGetNodeInfo: [path=/test_2] [czxid=17179873741] [mzxid=17179873743] [version=1] [cversion=0] [child_num=0]
CZkHandle::ZkGetChildren get children for path=/
CZkHandle::ZkGetChildren get children succ. children_num=8
CZkHandle::ZkGetChildren children_idx=0, children_name=marrs
CZkHandle::ZkGetChildren children_idx=1, children_name=test_2
CZkHandle::ZkGetChildren children_idx=2, children_name=zookeeper
CZkHandle::ZkGetChildren children_idx=3, children_name=zk_test1
CZkHandle::ZkGetChildren children_idx=4, children_name=zk_test2
CZkHandle::ZkGetChildren children_idx=5, children_name=test_10000000146
CZkHandle::ZkGetChildren children_idx=6, children_name=test_10000000145
CZkHandle::ZkGetChildren children_idx=7, children_name=zk_test
CZkHandle::ZkDeleteNode delete node path=/test_2 version=-1 ret=0
CZkHandle::ZkGetChildren get children for path=/
CZkHandle::ZkGetChildren get children succ. children_num=7
CZkHandle::ZkGetChildren children_idx=0, children_name=marrs
CZkHandle::ZkGetChildren children_idx=1, children_name=zookeeper
CZkHandle::ZkGetChildren children_idx=2, children_name=zk_test1
CZkHandle::ZkGetChildren children_idx=3, children_name=zk_test2
CZkHandle::ZkGetChildren children_idx=4, children_name=test_10000000146
CZkHandle::ZkGetChildren children_idx=5, children_name=test_10000000145
CZkHandle::ZkGetChildren children_idx=6, children_name=zk_test
CZkHandle::ZkExists: [ret=0]
CZkHandle::ZkExists: [path=/] [czxid=0] [mzxid=0] [version=0] [cversion=289] [child_num=7]
CZkHandle::ZkExists: [ret=0]
CZkHandle::ZkExists: [path=/] [czxid=0] [mzxid=0] [version=0] [cversion=289] [child_num=7]

 

zookeeper 实现一个简单的服务注册与发现(C++) 一:与zk保持连接

标签:logs   com   check   seq   std   ror   max   base   connect   

原文地址:https://www.cnblogs.com/chinxi/p/12994151.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!