码迷,mamicode.com
首页 > 其他好文 > 详细

让我们聊聊Erlang的节点互联(二)

时间:2015-03-21 17:16:41      阅读:281      评论:0      收藏:0      [点我收藏+]

标签:

由于一篇Blog写太长无法发表,这里面我们将继续分析下dist.c中的setnode_3这个函数的作用和net_kernel得到连接成功之后又进行了什么操作。

BIF_RETTYPE setnode_3(BIF_ALIST_3)
{
    BIF_RETTYPE ret;
    Uint flags;
    unsigned long version;
    Eterm ic, oc;
    Eterm *tp;
    DistEntry *dep = NULL;
    Port *pp = NULL;

    /* Prepare for success */
    ERTS_BIF_PREP_RET(ret, am_true);

    /*
     * Check and pick out arguments
     */

    if (!is_node_name_atom(BIF_ARG_1) ||
		is_not_internal_port(BIF_ARG_2) ||
		(erts_this_node->sysname == am_Noname)) {
		 goto badarg;
    }

    if (!is_tuple(BIF_ARG_3))
		 goto badarg;
    tp = tuple_val(BIF_ARG_3);
    if (*tp++ != make_arityval(4))
		 goto badarg;
    if (!is_small(*tp))
		 goto badarg;
    flags = unsigned_val(*tp++);
    if (!is_small(*tp) || (version = unsigned_val(*tp)) == 0)
		 goto badarg;
    ic = *(++tp);
    oc = *(++tp);
    if (!is_atom(ic) || !is_atom(oc))
		 goto badarg;

    /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */
    if (!(DFLAG_EXTENDED_REFERENCES & flags)) {
		 erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
		 erts_dsprintf(dsbufp, "%T", BIF_P->common.id);
		 if (BIF_P->common.u.alive.reg)
			  erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name);
		 erts_dsprintf(dsbufp,
					   " attempted to enable connection to node %T "
					   "which is not able to handle extended references.\n",
					   BIF_ARG_1);
		 erts_send_error_to_logger(BIF_P->group_leader, dsbufp);
		 goto badarg;
    }

    /*
     * Arguments seem to be in order.
     */

    /* get dist_entry */
    dep = erts_find_or_insert_dist_entry(BIF_ARG_1);
    if (dep == erts_this_dist_entry)
		 goto badarg;
    else if (!dep)
		 goto system_limit; /* Should never happen!!! */
//通过Port的ID获取Port的结构
    pp = erts_id2port_sflgs(BIF_ARG_2,
			    BIF_P,
			    ERTS_PROC_LOCK_MAIN,
			    ERTS_PORT_SFLGS_INVALID_LOOKUP);
    erts_smp_de_rwlock(dep);

    if (!pp || (erts_atomic32_read_nob(&pp->state)
		& ERTS_PORT_SFLG_EXITING))
		 goto badarg;

    if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
		 goto badarg;
//如果当前cid和传入的Port的ID相同,且port的sist_entry和找到的dep相同
//那么直接进入结束阶段
    if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep)
		 goto done; /* Already set */

    if (dep->status & ERTS_DE_SFLG_EXITING) {
		 /* Suspend on dist entry waiting for the exit to finish */
		 ErtsProcList *plp = erts_proclist_create(BIF_P);
		 plp->next = NULL;
		 erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
		 erts_smp_mtx_lock(&dep->qlock);
		 erts_proclist_store_last(&dep->suspended, plp);
		 erts_smp_mtx_unlock(&dep->qlock);
		 goto yield;
    }

    ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING));

    if (pp->dist_entry || is_not_nil(dep->cid))
		 goto badarg;

    erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);

    /*
     * Dist-ports do not use the "busy port message queue" functionality, but
     * instead use "busy dist entry" functionality.
     */
    {
		 ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
		 erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
    }
//更新Port所关联的dist
    pp->dist_entry = dep;

    dep->version = version;
    dep->creation = 0;

    ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);

#if 1
    dep->send = (pp->drv_ptr->outputv
		 ? dist_port_commandv
		 : dist_port_command);
#else
    dep->send = dist_port_command;
#endif
    ASSERT(dep->send);

#ifdef DEBUG
    erts_smp_mtx_lock(&dep->qlock);
    ASSERT(dep->qsize == 0);
    erts_smp_mtx_unlock(&dep->qlock);
#endif
//更新dist_entry的cid
    erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);

    if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
		 create_cache(dep);

    erts_smp_de_rwunlock(dep);
    dep = NULL; /* inc of refc transferred to port (dist_entry field) */
//增加远程节点的数量
    inc_no_nodes();
//发送监控信息到调用的进程
    send_nodes_mon_msgs(BIF_P,
			am_nodeup,
			BIF_ARG_1,
			flags & DFLAG_PUBLISHED ? am_visible : am_hidden,
			NIL);
 done:

    if (dep && dep != erts_this_dist_entry) {
		 erts_smp_de_rwunlock(dep);
		 erts_deref_dist_entry(dep);
    }

    if (pp)
		 erts_port_release(pp);

    return ret;

 yield:
    ERTS_BIF_PREP_YIELD3(ret, bif_export[BIF_setnode_3], BIF_P,
			 BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
    goto done;

 badarg:
    ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG);
    goto done;

 system_limit:
    ERTS_BIF_PREP_ERROR(ret, BIF_P, SYSTEM_LIMIT);
    goto done;
}

setnode_3首先是将,得到的远程节点的名字放入dist的hash表中,并且将这个表项和连接到远程节点的Port进行了关联。

接着将和远程节点进行连接的Port标记为ERTS_PORT_SFLG_DISTRIBUTION,这样在一个Port出现Busy的时候我们能区分出是普通的Port还是远程连接的Port,在一个Port被销毁的时候,是否要调用dist.c中的erts_do_net_exits来告诉Erts远程节点已经掉线了。当这些都顺利的完成了之后,会在这个Erts内部广播nodeup这个消息,那么nodeup的接收者又是谁呢?nodeup的接收者是那些通过process_flag函数设置了monitor_nodes标记的进程,当然monitor_nodes这选项文档中是没有的。如果我们想监听nodeup事件,只能通过net_kernel:monitors函数来完成。

我们上次说到负责连接远程节点的进程会通知net_kernel进程,让我们接着看下net_kernel收到消息做了什么。

handle_info({SetupPid, {nodeup,Node,Address,Type,Immediate}},
	    State) ->
    case {Immediate, ets:lookup(sys_dist, Node)} of
	{true, [Conn]} when Conn#connection.state =:= pending,
			    Conn#connection.owner =:= SetupPid ->
	    ets:insert(sys_dist, Conn#connection{state = up,
						 address = Address,
						 waiting = [],
						 type = Type}),
	    SetupPid ! {self(), inserted},
	    reply_waiting(Node,Conn#connection.waiting, true),
	    {noreply, State};
	_ ->
	    SetupPid ! {self(), bad_request},
	    {noreply, State}
    end;

更新ets中的状态,同时发送消息给所有等待的进程,告诉他们远程连接已经成功了,你们可以继续进行后续操作了。

这个时候你会惊奇的发现,心跳在什么地方呢?不急,我们再回头看下net_kernel的init函数

init({Name, LongOrShortNames, TickT}) ->
    process_flag(trap_exit,true),
    case init_node(Name, LongOrShortNames) of
	{ok, Node, Listeners} ->
	    process_flag(priority, max),
	    Ticktime = to_integer(TickT),
	    Ticker = spawn_link(net_kernel, ticker, [self(), Ticktime]),
	    {ok, #state{name = Name,
			node = Node,
			type = LongOrShortNames,
			tick = #tick{ticker = Ticker, time = Ticktime},
			connecttime = connecttime(),
			connections =
			ets:new(sys_dist,[named_table,
					  protected,
					  {keypos, 2}]),
			listen = Listeners,
			allowed = [],
			verbose = 0
		       }};
	Error ->
	    {stop, Error}
    end.

net_kernel首先创建了一个ticker进程,它专门负责发心跳给net_kernel进程,然后net_kernel进程会遍历所有远程连接的进程,让他们进行一次心跳。当我们改变了一个节点的心跳时间的时候,我们会开启一个aux_ticker进程帮助我们进行过度,直到所有节点都知道了我们改变了心跳周期为止,当所有节点都知道我们改变了心跳周期,这个aux_ticker进程也就结束了它的历史性任务,安静的退出了。

那么是如何发现远程节点退出的,当然是TCP数据传输发生了故障Port被清理掉了,这个可参见dist.c中的erts_do_net_exits。

当这些都完成了,我们将继续回到global模块和global_group模块中去分析下nodeup的时候,两个节点是如何同步他们的全局名字的。

让我们聊聊Erlang的节点互联(二)

标签:

原文地址:http://my.oschina.net/u/236698/blog/389751

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!