码迷,mamicode.com
首页 > 其他好文 > 详细

Apollo服务端设计原理剖析

时间:2020-08-02 10:09:56      阅读:61      评论:0      收藏:0      [点我收藏+]

标签:thread   没有   local   发布   tca   port   一个   mysql   测试的   

本文摘自于《Spring Cloud微服务 入门 实战与进阶》一书。

1 配置发布后的实时推送设计

配置中心最重要的一个特性就是实时推送了,正因为有这个特性,我们可以依赖配置中心做很多事情。在我自己开发的Smconf这个配置中心,Smconf是依赖于Zookeeper的Watch机制来实现实时推送。
技术图片

上图简要描述了配置发布的大致过程:

  • 用户在Portal中进行配置的编辑和发布
  • Portal会调用Admin Service提供的接口进行发布操作
  • Admin Service收到请求后,发送ReleaseMessage给各个Config Service,通知Config Service配置发生变化
  • Config Service收到ReleaseMessage后,通知对应的客户端,基于Http长连接实现
    2 发送ReleaseMessage的实现方式

ReleaseMessage消息是通过Mysql实现了一个简单的消息队列。之所有没有采用消息中间件,是为了让Apollo在部署的时候尽量简单,尽可能减少外部依赖。
技术图片

上图简要描述了发送ReleaseMessage的大致过程:

  • Admin Service在配置发布后会往ReleaseMessage表插入一条消息记录
  • Config Service会启动一个线程定时扫描ReleaseMessage表,去查看是否有新的消息记录
  • Config Service发现有新的消息记录,那么就会通知到所有的消息监听器
  • 消息监听器得到配置发布的信息后,则会通知对应的客户端
    3 Config Service通知客户端的实现方式

通知是采用基于Http长连接实现,主要分为下面几个步骤:

  • 客户端会发起一个Http请求到Config Service的notifications/v2接口
  • v2接口通过Spring DeferredResult把请求挂起,不会立即返回
  • 如果在60秒内没有该客户端关心的配置发布,那么会返回Http状态码304给客户端
  • 如果发现配置有修改,则会调用DeferredResult的setResult方法,传入有配置变化的namespace信息,同时该请求会立即返回
  • 客户端从返回的结果中获取到配置变化的namespace后,会立即请求Config Service获取该namespace的最新配置
    4 源码解析实时推送设计

Apollo推送这块代码比较多,就不在本书中详细分析了,我把推送这块的代码稍微简化了下,给大家进行讲解,这样理解起来会更容易。当然我这边会比较简单,很多细节就不做考虑了,只是为了能够让大家明白Apollo推送的核心原理。

发送ReleaseMessage的逻辑我们就写一个简单的接口,用队列存储,测试的时候就调用这个接口模拟配置有更新,发送ReleaseMessage消息。

@RestController
public

class

NotificationControllerV2

implements

ReleaseMessageListener
 {

// 模拟配置更新,往里插入数据表示有更新

public

static

Queue
<
String
> queue = 
new

LinkedBlockingDeque
<>();

@GetMapping
(
"/addMsg"
)

public

String
 addMsg() {
        queue.add(
"xxx"
);

return

"success"
;
    }

}

消息发送之后,前面我们有讲过Config Service会启动一个线程定时扫描ReleaseMessage表,去查看是否有新的消息记录,然后取通知客户端,这边我们也启动一个线程去扫描:

@Component
public

class

ReleaseMessageScanner

implements

InitializingBean
 {

@Autowired

private

NotificationControllerV2
 configController;

@Override

public

void
 afterPropertiesSet() 
throws

Exception
 {

// 定时任务从数据库扫描有没有新的配置发布

new

Thread
(() -> {

for
 (;;) {

String
 result = 
NotificationControllerV2
.queue.poll();

if
 (result != 
null
) {

ReleaseMessage
 message = 
new

ReleaseMessage
();
                    message.setMessage(result);
                    configController.handleMessage(message);
                }
            }
        }).start();;
    }

}

循环去读取NotificationControllerV2中的队列,如果有消息的话就构造一个ReleaseMessage的对象,然后调用NotificationControllerV2中的handleMessage()方法进行消息的处理。

ReleaseMessage就一个字段,模拟消息内容:

public

class

ReleaseMessage
 {

private

String
 message;

public

void
 setMessage(
String
 message) {

this
.message = message;
    }

public

String
 getMessage() {

return
 message;
    }
}

接下来,我们看handleMessage做了什么样的工作

NotificationControllerV2实现了ReleaseMessageListener接口,ReleaseMessageListener中定义了handleMessage()方法。

public

interface

ReleaseMessageListener
 {

void
 handleMessage(
ReleaseMessage
 message);
}

handleMessage就是当配置发生变化的时候,通知的消息监听器,消息监听器得到配置发布的信息后,则会通知对应的客户端:

@RestController
public

class

NotificationControllerV2

implements

ReleaseMessageListener
 {

private

final

Multimap
<
String
, 
DeferredResultWrapper
> deferredResults = 
Multimaps
            .synchronizedSetMultimap(
HashMultimap
.create());

@Override

public

void
 handleMessage(
ReleaseMessage
 message) {

System
.err.println(
"handleMessage:"
+ message);

List
<
DeferredResultWrapper
> results = 
Lists
.newArrayList(deferredResults.
get
(
"xxxx"
));

for
 (
DeferredResultWrapper
 deferredResultWrapper : results) {

List
<
ApolloConfigNotification
> list = 
new

ArrayList
<>();
            list.add(
new

ApolloConfigNotification
(
"application"
, 
1
));
            deferredResultWrapper.setResult(list);
        }
    }

}

Apollo的实时推送是基于Spring DeferredResult实现的,在handleMessage()方法中可以看到是通过deferredResults获取DeferredResult,deferredResults就是第一行的Multimap,Key其实就是消息内容,Value就是DeferredResult的业务包装类DeferredResultWrapper,我们来看下DeferredResultWrapper的代码:

public

class

DeferredResultWrapper
 {

private

static

final

long
 TIMEOUT = 
60
 * 
1000
;
// 60 seconds

private

static

final

ResponseEntity
<
List
<
ApolloConfigNotification
>> NOT_MODIFIED_RESPONSE_LIST = 

new

ResponseEntity
<>(
HttpStatus
.NOT_MODIFIED);

private

DeferredResult
<
ResponseEntity
<
List
<
ApolloConfigNotification
>>> result;

public

DeferredResultWrapper
() {
        result = 
new

DeferredResult
<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
    }

public

void
 onTimeout(
Runnable
 timeoutCallback) {
        result.onTimeout(timeoutCallback);
    }

public

void
 onCompletion(
Runnable
 completionCallback) {
        result.onCompletion(completionCallback);
    }

public

void
 setResult(
ApolloConfigNotification
 notification) {
        setResult(
Lists
.newArrayList(notification));
    }

public

void
 setResult(
List
<
ApolloConfigNotification
> notifications) {
        result.setResult(
new

ResponseEntity
<>(notifications, 
HttpStatus
.OK));
    }

public

DeferredResult
<
ResponseEntity
<
List
<
ApolloConfigNotification
>>> getResult() {

return
 result;
    }
}

通过setResult()方法设置返回结果给客户端,以上就是当配置发生变化,然后通过消息监听器通知客户端的原理,那么客户端是在什么时候接入的呢?

@RestController
public

class

NotificationControllerV2

implements

ReleaseMessageListener
 {

// 模拟配置更新,往里插入数据表示有更新

public

static

Queue
<
String
> queue = 
new

LinkedBlockingDeque
<>();

private

final

Multimap
<
String
, 
DeferredResultWrapper
> deferredResults = 
Multimaps
            .synchronizedSetMultimap(
HashMultimap
.create());

@GetMapping
(
"/getConfig"
)

public

DeferredResult
<
ResponseEntity
<
List
<
ApolloConfigNotification
>>> getConfig() {

DeferredResultWrapper
 deferredResultWrapper = 
new

DeferredResultWrapper
();

List
<
ApolloConfigNotification
> newNotifications = getApolloConfigNotifications();

if
 (!
CollectionUtils
.isEmpty(newNotifications)) {
            deferredResultWrapper.setResult(newNotifications);
        } 
else
 {
            deferredResultWrapper.onTimeout(() -> {

System
.err.println(
"onTimeout"
);
            });

            deferredResultWrapper.onCompletion(() -> {

System
.err.println(
"onCompletion"
);
            });
            deferredResults.put(
"xxxx"
, deferredResultWrapper);
        }

return
 deferredResultWrapper.getResult();
    }

private

List
<
ApolloConfigNotification
> getApolloConfigNotifications() {

List
<
ApolloConfigNotification
> list = 
new

ArrayList
<>();

String
 result = queue.poll();

if
 (result != 
null
) {
            list.add(
new

ApolloConfigNotification
(
"application"
, 
1
));
        }

return
 list;
    }
}

NotificationControllerV2中提供了一个/getConfig的接口,客户端在启动的时候会调用这个接口,这个时候会执行getApolloConfigNotifications()方法去获取有没有配置的变更信息,如果有的话证明配置修改过,直接就通过deferredResultWrapper.setResult(newNotifications);返回结果给客户端了,客户端收到结果后重新拉取配置的信息进行覆盖本地的配置。

如果getApolloConfigNotifications()方法没有返回配置修改的信息,证明配置没有发生修改,就将DeferredResultWrapper对象添加到deferredResults中,等待后续配置发生变化时消息监听器进行通知。

同时这个请求就会挂起,不会立即返回,挂起是通过DeferredResultWrapper中的下面的代码实现的:

private

static

final

long
 TIMEOUT = 
60
 * 
1000
;
// 60 seconds

private

static

final

ResponseEntity
<
List
<
ApolloConfigNotification
>> NOT_MODIFIED_RESPONSE_LIST = 

new

ResponseEntity
<>(
HttpStatus
.NOT_MODIFIED);

private

DeferredResult
<
ResponseEntity
<
List
<
ApolloConfigNotification
>>> result;

public

DeferredResultWrapper
() {
    result = 
new

DeferredResult
<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
}

在创建DeferredResult对象的时候指定了超时的时间和超时后返回的响应码,如果60秒内没有消息监听器进行通知,那么这个请求就会超时,超时后客户端就收到的响应码就是304。

整个Config Service的流程就走完了,接下来我们看客户端是怎么实现的,我们简单的写个测试类模拟客户端注册:

public

class

ClientTest
 {

public

static

void
 main(
String
[] args) {
        reg();
    }

private

static

void
 reg() {

System
.err.println(
"注册"
);

String
 result = request(
"http://localhost:8081/getConfig"
);

if
 (result != 
null
) {

// 配置有更新,重新拉取配置

// ......
        }

// 重新注册
        reg();
    }

private

static

String
 request(
String
 url) {

HttpURLConnection
 connection = 
null
;

BufferedReader
 reader = 
null
;

try
 {
            URL getUrl = 
new
 URL(url);
            connection = (
HttpURLConnection
) getUrl.openConnection();
            connection.setReadTimeout(
90000
);
            connection.setConnectTimeout(
3000
);
            connection.setRequestMethod(
"GET"
);
            connection.setRequestProperty(
"Accept-Charset"
, 
"utf-8"
);
            connection.setRequestProperty(
"Content-Type"
, 
"application/json"
);
            connection.setRequestProperty(
"Charset"
, 
"UTF-8"
);

System
.
out
.println(connection.getResponseCode());

if
 (
200
 == connection.getResponseCode()) {
                reader = 
new

BufferedReader
(
new

InputStreamReader
(connection.getInputStream(), 
"UTF-8"
));

StringBuilder
 result = 
new

StringBuilder
();

String
 line = 
null
;

while
 ((line = reader.readLine()) != 
null
) {
                    result.append(line);
                }

System
.
out
.println(
"结果 "
 + result);

return
 result.toString();
            }
        } 
catch
 (
IOException
 e) {
            e.printStackTrace();
        } 
finally
 {

if
 (connection != 
null
) {
                connection.disconnect();
            }
        }

return

null
;
    }
}

首先启动/getConfig接口所在的服务,然后启动客户端,客户端就会发起注册请求,如果有修改直接获取到结果,进行配置的更新操作。如果无修改,请求会挂起,这边客户端设置的读取超时时间是90秒,大于服务端的60秒超时时间。

每次收到结果后,无论是有修改还是没修改,都必须重新进行注册,通过这样的方式就可以达到配置实时推送的效果。

我们可以调用之前写的/addMsg接口来模拟配置发生变化,调用之后客户端就能马上得到返回结果。

本文摘自于《Spring Cloud微服务 入门 实战与进阶》一书。

技术图片

去年出版的《Spring Cloud微服务:全栈技术与案例解析》一书,得到了大家的支持以及反馈,基于大家的反馈,重新进行了更正和改进。

基于比较稳定的 Spring Cloud Finchley.SR2 版本和 Spring Boot 2.0.6.RELEASE 版本编写。

同时将示列代码进行标准的归档,之前的都在一起,不方便读者参考和运行。

技术图片

同时还增加了像Apollo,Spring Cloud Gateway,生产实践经验等新的内容。

尹吉欢
我不差钱啊
喜欢作者

Apollo服务端设计原理剖析

标签:thread   没有   local   发布   tca   port   一个   mysql   测试的   

原文地址:https://blog.51cto.com/14888386/2515767

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有
迷上了代码!