码迷,mamicode.com
首页 > 其他好文 > 详细

nacos源码解析(三)-注册中心服务注册处理

时间:2020-07-01 22:31:39      阅读:62      评论:0      收藏:0      [点我收藏+]

标签:isp   tor   alibaba   根据   puts   licensed   fabs   default   _id   

概述

  注册中心服务端的主要功能包括,接收客户端的服务注册,服务发现,服务下线的功能,但是除了这些和客户端的交互之外,服务端还要做一些更重要的事情,就是我们常常会在分布式系统中听到的AP和CP,作为一个集群,nacos即实现了AP也实现了CP,其中AP使用的自己实现的Distro协议,而CP是采用raft协议实现的,这个过程中牵涉到心跳啊,选主啊等操作,说复杂还是挺复杂的。

  本文主要介绍一下注册中心服务端接收客户端服务注册的功能,其他功能暂时先不涉及。

 

服务端接收客户端注册的接口如下

@CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {

        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
                Constants.DEFAULT_NAMESPACE_ID);

        final Instance instance = parseInstance(request);

        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

该方法在com.alibaba.nacos.naming.controllers.InstanceController类中

 

服务注册流程图

技术图片

根据流程图,我把这个过程拆分成了两块,第一块就是更细本地缓存,因为服务注册并不是想集群中每个节点都注册的,而是随机选择其中一个节点进行注册的。第二块其实就是把注册的服务信息同步给集群中别的节点的,好,接下来我们详细分析一下这两块。

 

第一块:更新本地缓存

 public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        //判断本地缓存中是否存在该命名空间,如果不存在就创建,之后判断该命名空间下是否
        //存在该服务,如果不存在就创建空的服务
        //注意这里并没有更新服务的实例信息
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        //从本地缓存中获取服务信息
        Service service = getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        //服务注册,这一步才会把服务的实例信息和服务绑定起来
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

进入com.alibaba.nacos.naming.core.ServiceManager#createEmptyService

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            //如果服务不存在,创建一个空的服务
            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();
            //将创建的空的服务插入缓存,并初始化
            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

进入com.alibaba.nacos.naming.core.ServiceManager#putServiceAndInit

 private void putServiceAndInit(Service service) throws NacosException {
        //将服务插入缓存
        putService(service);
        //对服务启动一个健康检查的定时任务
        service.init();
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
    }

1、putService很简单就不分析了

2.进入com.alibaba.nacos.naming.core.Service#init

这里就不深入分析了,关于健康检查我打算专门写一篇文章分析,因为这里阿里的大佬们使用了tcp连接,并且使用了nio来进行检查,里面还是挺复杂的。

3.监听的代码,这部分涉及的东西也很多,这里也是使用的观察者模式实现的。

总结:到这里,第一部分的代码就算是分析完了,里面我觉得要仔细分析,还是有很多地方可以讲的。

 

第二部分:将服务信息持久化到磁盘并同步到其他节点

进入com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put()方法

@Override
    public void put(String key, Record value) throws NacosException {
        mapConsistencyService(key).put(key, value);
    }

由于我们注册的服务是临时的,至于为什么是临时的,看校验参数那一块,具体方法为:

com.alibaba.nacos.naming.controllers.InstanceController#parseInstance(),在这个方法内部会调用getIPAddress()方法

之后进com.alibaba.nacos.naming.controllers.InstanceController#getIPAddress(),之后你就会发现有这段代码

boolean ephemeral = BooleanUtils.toBoolean(WebUtils.optional(request, "ephemeral",
                String.valueOf(switchDomain.isDefaultInstanceEphemeral())));

        Instance instance = new Instance();
        instance.setPort(Integer.parseInt(port));
        instance.setIp(ip);
        instance.setWeight(Double.parseDouble(weight));
        instance.setClusterName(cluster);
        instance.setHealthy(healthy);
        instance.setEnabled(enabled);
        instance.setEphemeral(ephemeral);

这个参数其实是由你注册服务的时候,客户端自己设计设置的。

 

所以上面的put方法是com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put(),我们进入

    @Override
    public void put(String key, Record value) throws NacosException {
        //将服务信息放入缓存
        onPut(key, value);
        //新增任务,把服务信息同步给别的节点
        taskDispatcher.addTask(key);
    }

1.onput方法比较简单,不介绍了

2.进入taskDispatcher.addTask(key);,由于代码太长,我就折叠了

技术图片
/*
 * Copyright 1999-2018 Alibaba Group Holding Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.nacos.naming.consistency.ephemeral.distro;

import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.naming.misc.*;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Data sync task dispatcher
 *
 * @author nkorange
 * @since 1.0.0
 */
@Component
public class TaskDispatcher {

    @Autowired
    private GlobalConfig partitionConfig;

    @Autowired
    private DataSyncer dataSyncer;

    private List<TaskScheduler> taskSchedulerList = new ArrayList<>();

    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();
    //当这个bean加载的时候执行
    @PostConstruct
    public void init() {
        //根据cpu个数,新建任务,并且执行每个任务
        for (int i = 0; i < cpuCoreCount; i++) {
            TaskScheduler taskScheduler = new TaskScheduler(i);
            taskSchedulerList.add(taskScheduler);
            GlobalExecutor.submitTaskDispatch(taskScheduler);
        }
    }

    public void addTask(String key) {
        //从任务列表中随机选择一个任务处理当前的key
        taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
    }
    //任务类
    public class TaskScheduler implements Runnable {

        private int index;

        private int dataSize = 0;

        private long lastDispatchTime = 0L;

        private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);

        public TaskScheduler(int index) {
            this.index = index;
        }

        public void addTask(String key) {
            queue.offer(key);
        }

        public int getIndex() {
            return index;
        }

        @Override
        public void run() {

            List<String> keys = new ArrayList<>();
            while (true) {

                try {

                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                        TimeUnit.MILLISECONDS);

//                    String key = queue.take();
//                    System.out.println("test");


                    if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                        Loggers.DISTRO.debug("got key: {}", key);
                    }

                    if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
                        continue;
                    }

                    if (StringUtils.isBlank(key)) {
                        continue;
                    }

                    if (dataSize == 0) {
                        keys = new ArrayList<>();
                    }

                    keys.add(key);
                    dataSize++;

                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {

                        for (Member member : dataSyncer.getServers()) {
                            if (NetUtils.localServer().equals(member.getAddress())) {
                                continue;
                            }
                            //new一个同步任务,用于同步
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getAddress());

                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                            }
                            //执行同步
                            dataSyncer.submit(syncTask, 0);
                        }
                        lastDispatchTime = System.currentTimeMillis();
                        dataSize = 0;
                    }

                } catch (Exception e) {
                    Loggers.DISTRO.error("dispatch sync task failed.", e);
                }
            }
        }
    }
}
View Code
技术图片
/*
 * Copyright 1999-2018 Alibaba Group Holding Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.nacos.naming.consistency.ephemeral.distro;

import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.misc.*;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Data replicator
 *
 * @author nkorange
 * @since 1.0.0
 */
@Component
@DependsOn("ProtocolManager")
public class DataSyncer {

    @Autowired
    private DataStore dataStore;

    @Autowired
    private GlobalConfig partitionConfig;

    @Autowired
    private Serializer serializer;

    @Autowired
    private DistroMapper distroMapper;

    @Autowired
    private ServerMemberManager memberManager;

    private Map<String, String> taskMap = new ConcurrentHashMap<>();

    @PostConstruct
    public void init() {
        startTimedSync();
    }

    public void submit(SyncTask task, long delay) {

        // If it‘s a new task:
        if (task.getRetryCount() == 0) {
            Iterator<String> iterator = task.getKeys().iterator();
            while (iterator.hasNext()) {
                String key = iterator.next();
                if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
                    // associated key already exist:
                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("sync already in process, key: {}", key);
                    }
                    iterator.remove();
                }
            }
        }

        if (task.getKeys().isEmpty()) {
            // all keys are removed:
            return;
        }

        GlobalExecutor.submitDataSync(() -> {
            // 1. check the server
            if (getServers() == null || getServers().isEmpty()) {
                Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
                return;
            }

            List<String> keys = task.getKeys();

            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
            }
            // 2. get the datums by keys and check the datum is empty or not
            Map<String, Datum> datumMap = dataStore.batchGet(keys);
            if (datumMap == null || datumMap.isEmpty()) {
                // clear all flags of this task:
                for (String key : keys) {
                    taskMap.remove(buildKey(key, task.getTargetServer()));
                }
                return;
            }

            byte[] data = serializer.serialize(datumMap);

            long timestamp = System.currentTimeMillis();
            //同步到别的节点
            boolean success = NamingProxy.syncData(data, task.getTargetServer());
            if (!success) {
                SyncTask syncTask = new SyncTask();
                syncTask.setKeys(task.getKeys());
                syncTask.setRetryCount(task.getRetryCount() + 1);
                syncTask.setLastExecuteTime(timestamp);
                syncTask.setTargetServer(task.getTargetServer());
                retrySync(syncTask);
            } else {
                // clear all flags of this task:
                for (String key : task.getKeys()) {
                    taskMap.remove(buildKey(key, task.getTargetServer()));
                }
            }
        }, delay);
    }

    public void retrySync(SyncTask syncTask) {
        Member member = new Member();
        member.setIp(syncTask.getTargetServer().split(":")[0]);
        member.setPort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
        if (!getServers().contains(member)) {
            // if server is no longer in healthy server list, ignore this task:
            //fix #1665 remove existing tasks
            if (syncTask.getKeys() != null) {
                for (String key : syncTask.getKeys()) {
                    taskMap.remove(buildKey(key, syncTask.getTargetServer()));
                }
            }
            return;
        }

        // TODO may choose other retry policy.
        submit(syncTask, partitionConfig.getSyncRetryDelay());
    }

    public void startTimedSync() {
        GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
    }

    public class TimedSync implements Runnable {

        @Override
        public void run() {

            try {

                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("server list is: {}", getServers());
                }

                // send local timestamps to other servers:
                Map<String, String> keyChecksums = new HashMap<>(64);
                for (String key : dataStore.keys()) {
                    if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
                        continue;
                    }

                    Datum datum = dataStore.get(key);
                    if (datum == null) {
                        continue;
                    }
                    keyChecksums.put(key, datum.value.getChecksum());
                }

                if (keyChecksums.isEmpty()) {
                    return;
                }

                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);
                }

                for (Member member : getServers()) {
                    if (NetUtils.localServer().equals(member.getAddress())) {
                        continue;
                    }
                    NamingProxy.syncCheckSums(keyChecksums, member.getAddress());
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("timed sync task failed.", e);
            }
        }

    }

    public Collection<Member> getServers() {
        return memberManager.allMembers();
    }

    public String buildKey(String key, String targetServer) {
        return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
    }
}
View Code

上面的两个类就是处理同步的,其实同步除了这个之外,在com.alibaba.nacos.naming.consistency.ephemeral.distro.DataSyncer这个类中还有一个定时同步,这个同步是上一个同步的补充。

  第二个定时同步,会把每个服务和这个服务的校验和封装到一个map中发送,然后异步发送给别的节点。

 

总结

  里面还是有很多问题没有搞明白,只是搞明白了流程,还有很多细节没有仔细看,有什么问题,忘大家指教

 

nacos源码解析(三)-注册中心服务注册处理

标签:isp   tor   alibaba   根据   puts   licensed   fabs   default   _id   

原文地址:https://www.cnblogs.com/gunduzi/p/13219894.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!