`
wqtn22
  • 浏览: 99736 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

erlang:ports与erlang:processes引发的问题

    博客分类:
  • erts
阅读更多

最近4399的同学遇到一个问题,以下是他的描述:

 

“用erlang:ports得出来的port列表里,很多port的port_info都是undefined,实际上这些ports应该都已经被关闭了,手动调用close去关闭这些port的话会抛出异常。”

首先重现他的场景:

在时刻t1调用erlang:ports()时可以得到erlang虚拟机在t1的所有port,在t1时刻之后的t2时刻,再次调用erlang:port_info()得到每个port的信息,两次调用间有一个时间差tv。

根据这个场景,有两种假设:

1.在时间差tv内,可能有一部分端口被关闭,导致t1时刻有效的port不一定在t2时刻有效;

2.在时间差tv内,没有任何端口被关闭。

很显然,因为端口的关闭随时可能发生,第一种假设是绝对有可能存在的,我们仅仅需要分析第二种假设即可。

在第二种假设下,我们开始分析erlang虚拟机的部分代码:

bif.c

 

BIF_RETTYPE ports_0(BIF_ALIST_0)

{

    Eterm res = NIL;

    Eterm* port_buf = erts_alloc(ERTS_ALC_T_TMP, sizeof(Eterm)*erts_max_ports);

    /* 分配一个记录port的缓冲区,最大尺寸为erts_max_ports,该数值可由虚拟机环境变量ERL_MAX_PORTS控制,指定虚拟机最大可拥有的port数目 */

 

    Eterm* pp = port_buf;

    Eterm* dead_ports;

    int alive, dead;

    Uint32 next_ss;

    int i;

 

    /* To get a consistent snapshot... 

     * We add alive ports from start of the buffer

     * while dying ports are added from the other end by the killing threads.

     */

    /* 这段话看起来挺奇怪,为了取得一个一致的快照,我们从buffer的头部开始添加活port,从尾部添加退出port */

 

    erts_smp_mtx_lock(&ports_snapshot_mtx); /* One snapshot at a time */

 

    erts_smp_atomic_set(&erts_dead_ports_ptr, (erts_aint_t) (port_buf + erts_max_ports));

 

    /* 又是一段奇怪的代码,将一个全局指针设为此次分配的port记录缓冲区的尾部 */

 

 

    next_ss = erts_smp_atomic32_inctest(&erts_ports_snapshot);

 

    for (i = erts_max_ports-1; i >= 0; i--) {

Port* prt = &erts_port[i];

erts_smp_port_state_lock(prt);

if (!(prt->status & ERTS_PORT_SFLGS_DEAD)

   && prt->snapshot != next_ss) {

   ASSERT(prt->snapshot == next_ss - 1);

   *pp++ = prt->id;

   prt->snapshot = next_ss; /* Consumed by this snapshot */

}

erts_smp_port_state_unlock(prt);

    }

    /*

    这里遍历所有的port,将那些不是ERTS_PORT_SFLGS_DEAD的port记录到port记录缓冲区内,ERTS_PORT_SFLGS_DEAD是一个宏,其定义如下:

    #define ERTS_PORT_SFLGS_DEAD \

      (ERTS_PORT_SFLG_FREE \

       | ERTS_PORT_SFLG_FREE_SCHEDULED \

       | ERTS_PORT_SFLG_INITIALIZING)

     也即将那些活的port记录到这个缓冲区内,为了保持该函数的可重入性,使用了一个简单的区分标识next_ss和port的成员snapshot共同作用,读者可细细品味

    */

 

    dead_ports = (Eterm*)erts_smp_atomic_xchg(&erts_dead_ports_ptr,  (erts_aint_t) NULL);

    /* 这段代码的含义是,原子的进行一次读+写操作,该操作将之前设为port记录缓冲区尾部的全局指针erts_dead_ports_ptr,设置为NULL,同时取出其原值,这段操作和之前设置erts_dead_ports_ptr的操作遥相呼应,等价于为全局指针erts_dead_ports_ptr分配了一个临时缓冲区,共给其它部分使用,然后又在此处收回,稍侯来观察谁会使用这个由erts_dead_ports_ptr指向的临时缓冲区 */

 

 

 

    ASSERT(pp <= dead_ports);

 

    alive = pp - port_buf;

    dead = port_buf + erts_max_ports - dead_ports;

 

    /* 此处终于显现了一些端倪,alive比较好理解,即port记录缓冲区头部包含的port,而dead则是port记录缓冲区尾部包含的port */

 

    ASSERT((alive+dead) <= erts_max_ports);

 

 

    if (alive+dead > 0) {

erts_aint_t i;

Eterm *hp = HAlloc(BIF_P, (alive+dead)*2);

 

for (i = 0; i < alive; i++) {

   res = CONS(hp, port_buf[i], res);    

   hp += 2;

}

for (i = 0; i < dead; i++) {

   res = CONS(hp, dead_ports[i], res);

   hp += 2;

}

    }

    /* 这里构造返回结果,将记录缓冲区头尾两端的port统统加入返回结果中去 */

 

 

 

 

    erts_free(ERTS_ALC_T_TMP, port_buf);

    /* 释放临时分配的port记录缓冲区 */

 

    BIF_RET(res);

}

 

让我们再来看看是谁使用了erts_dead_ports_ptr记录的临时port记录缓冲区:

global.h

 

ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt)

{

    ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&prt->state_lck));

    if (prt->snapshot != erts_smp_atomic32_read_acqb(&erts_ports_snapshot)) {

/* Dead ports are added from the end of the snapshot buffer */

Eterm* tombstone = (Eterm*) erts_smp_atomic_addtest(&erts_dead_ports_ptr,

   -(erts_aint_t)sizeof(Eterm));

ASSERT(tombstone+1 != NULL);

ASSERT(prt->snapshot == erts_smp_atomic32_read(&erts_ports_snapshot) - 1);

*tombstone = prt->id;

        /* 注意此处,仅当发现port的snapshot标识变化时,才将port加入到erts_ports_snapshot所指的临时缓冲区,加入的顺序是从缓冲区的尾部向前,而erts_may_save_closed_port函数应该有可能在ports_0调用期间被别处调用 */

    }

    /*else no ongoing snapshot or port was already included or created after snapshot */

}

再来看看erts_may_save_closed_port的调用经历:

io.c terminate_port

io.c kill_port

erl_port_task.c erts_may_save_closed_port

terminate_port主要在port退出时调用,调用时,会产生一个额外的效果,即调用erts_may_save_closed_port,将退出的port记录到全局指针erts_ports_snapshot指向的临时缓冲区内,如果这个缓冲区存在,则一定有某个进程调用了ports_0,也即erlang:ports(),此时erts_ports_snapshot将这些已经退出的port也加入到ports_0的返回结果内。

为了验证这个过程,我们需要做一个实验,即调用ports_0时,也调用一些能够导致port关闭的函数,若已经关闭的port仍然出现在ports_0的返回结果内,就表明我们的猜测是正确的。

这需要对对ports_0做一点小小的hack:

在ports_0更改每个port的snapshot标识之后,加入一些延迟,保证关闭端口的函数可以在ports_0释放erts_ports_snapshot指向的临时缓冲区之前能够关闭port,以触发terminate_port最终调用erts_may_save_closed_port记录这些已经关闭的port。

改动如下:

bif.c

BIF_RETTYPE ports_0(BIF_ALIST_0)

{

    ....

 

    for (i = erts_max_ports-1; i >= 0; i--) {

Port* prt = &erts_port[i];

erts_smp_port_state_lock(prt);

if (!(prt->status & ERTS_PORT_SFLGS_DEAD)

   && prt->snapshot != next_ss) {

   ASSERT(prt->snapshot == next_ss - 1);

   *pp++ = prt->id;

   prt->snapshot = next_ss; /* Consumed by this snapshot */

}

erts_smp_port_state_unlock(prt);

    }

 

    sleep(10);

 

    /* 加入一个延迟,能够保证ports_0在返回前,某些关闭端口的函数得到执行 */

    dead_ports = (Eterm*)erts_smp_atomic_xchg(&erts_dead_ports_ptr, (erts_aint_t) NULL);

    ....

}

 

以简单的打开文件为例(都是port,套接字也同理),测试过程如下:

1.打开10个文件描述符:

FDList = [begin {ok, FD} = file:open(OCFile, [raw, append]), FD end||_I <- lists:seq(1, 10)].

2.启动一个独立的进程,不要与控制台进程在同一个调度器上,否则看不到并发执行的效果:

spawn(fun() -> process_flag(scheduler, 10), io:format("all ports: ~p~n", [erlang:ports()]) end).

3.我们有10秒的时间可以去关闭这些打开的文件句柄:

[file:close(FD)||FD <-  FDList ].

静候10秒,可以发现erlang:ports()返回的结果中包含了已经关闭的文件描述符,再次运行erlang:ports(),发现返回的结果中不包含已经关闭的文件描述符,而实际上两次erlang:ports()时间间隔内没有关闭任何的port,由于第一次的erlang:ports()返回的结果中包含了已经关闭的port,对其调用erlang:port_info自然返回undefined。

由于贴图神马的还没有用过,就直接告诉大家结果了,读者也可以自行验证一下。

对于erlang:processes(),文档中就直接说明了这种情况,即即使进程在调用erlang:processes()期间退出,仍然会包含在最终的返回结果集里面。

这样的场景在进程/端口数少的时候,体现的不太明显,但若进程/端口数很多时(尤其按照霸爷所经历过的场景,并发百万级进程),erlang:processes()/erlang:ports()将执行的很慢(erlang:processes()还对这样的情况做了特殊处理,有兴趣的读者可以看看它的代码,就是erl_process.c的processes_0函数),期间如果有任何进程退出,都将包含在最终的返回结果内,有时可能会引起误解,但erlang官方可能是想给用户一个更为一致的瞬时快照结果吧。

对于这些已经退出却仍然包含在返回结果内的进程/端口,其本身是不会产生资源泄露的,这里简单分析下port的释放过程:

 

static void terminate_port(Port *prt)

{

    Eterm send_closed_port_id;

    Eterm connected_id = NIL /* Initialize to silence compiler */;

    erts_driver_t *drv;

 

    ERTS_SMP_CHK_NO_PROC_LOCKS;

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));

 

    ASSERT(!prt->nlinks);

    ASSERT(!prt->monitors);

 

    if (prt->status & ERTS_PORT_SFLG_SEND_CLOSED) {

erts_port_status_band_set(prt, ~ERTS_PORT_SFLG_SEND_CLOSED);

send_closed_port_id = prt->id;

connected_id = prt->connected;

    }

    else {

send_closed_port_id = NIL;

    }

 

#ifdef ERTS_SMP

    erts_cancel_smp_ptimer(prt->ptimer);

#else

    erts_cancel_timer(&prt->tm);

#endif

 

    drv = prt->drv_ptr;

    if ((drv != NULL) && (drv->stop != NULL)) {

int fpe_was_unmasked = erts_block_fpe();

(*drv->stop)((ErlDrvData)prt->drv_data);

        /* 若有则调用port的driver的stop函数 */

erts_unblock_fpe(fpe_was_unmasked);

#ifdef ERTS_SMP

if (prt->xports)

   erts_smp_xports_unlock(prt);

ASSERT(!prt->xports);

#endif

    }

    if(drv->handle != NULL) {

erts_smp_mtx_lock(&erts_driver_list_lock);

erts_ddll_decrement_port_count(drv->handle); 

        /* 若driver使用了动态链接库或共享库,则减少其引用计数 */

erts_smp_mtx_unlock(&erts_driver_list_lock);

    }

    stopq(prt);        /* clear queue memory */

    if(prt->linebuf != NULL){

erts_free(ERTS_ALC_T_LINEBUF, (void *) prt->linebuf);

        /* 释放用于保存未集齐的数据的线性缓冲区 */

prt->linebuf = NULL;

    }

    if (prt->bp != NULL) {

free_message_buffer(prt->bp);

        /* 释放堆分片 */

prt->bp = NULL;

prt->data = am_undefined;

    }

 

    if (prt->psd)

erts_free(ERTS_ALC_T_PRTSD, prt->psd);

        /* 释放port特定数据结构占用的内存 */

 

    kill_port(prt);

 

    /*

     * We don't want to send the closed message until after the

     * port has been removed from the port table (in kill_port()).

     */

    if (is_internal_port(send_closed_port_id))

deliver_result(send_closed_port_id, connected_id, am_closed);

 

    ASSERT(prt->dist_entry == NULL);

}

 

static ERTS_INLINE void kill_port(Port *pp)

{

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));

    erts_port_task_free_port(pp);

    ASSERT(pp->status & ERTS_PORT_SFLGS_DEAD);

}

 

void erts_port_task_free_port(Port *pp)

{

    ErtsRunQueue *runq;

    int port_is_dequeued = 0;

 

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));

    ASSERT(!(pp->status & ERTS_PORT_SFLGS_DEAD));

    runq = erts_port_runq(pp);

    ASSERT(runq);

    ERTS_PT_CHK_PRES_PORTQ(runq, pp);

    if (pp->sched.exe_taskq) {

/* I (this thread) am currently executing this port, free it

  when scheduled out... */

ErtsPortTask *ptp = port_task_alloc();

erts_smp_port_state_lock(pp);

pp->status &= ~ERTS_PORT_SFLG_CLOSING;

pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED;

erts_may_save_closed_port(pp);

erts_smp_port_state_unlock(pp);

ERTS_SMP_LC_ASSERT(erts_smp_atomic_read(&pp->refc) > 1);

ptp->type = ERTS_PORT_TASK_FREE;

ptp->event = (ErlDrvEvent) -1;

ptp->event_data = NULL;

set_handle(ptp, NULL);

push_task(pp->sched.exe_taskq, ptp);

ERTS_PT_CHK_PRES_PORTQ(runq, pp);

erts_smp_runq_unlock(runq);

    }

    else {

        /* 仅仅分析这个简单的场景以说明问题,另外一个场景类似 */

ErtsPortTaskQueue *ptqp = pp->sched.taskq;

if (ptqp) {

   dequeue_port(runq, pp);

   ERTS_PORT_NOT_IN_RUNQ(pp);

   port_is_dequeued = 1;

}

erts_smp_port_state_lock(pp);

pp->status &= ~ERTS_PORT_SFLG_CLOSING;

pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED;

        /* port的状态被更改为了ERTS_PORT_SFLG_FREE_SCHEDULED,它也是ERTS_PORT_SFLGS_DEAD的一种 */

erts_may_save_closed_port(pp);

        /* 能够让erts_dead_ports_ptr保存已经退出的port,则port在退出时一定走到了这里,我们其实仅需要关注在这里之后是否有port资源泄露即可 */

erts_smp_port_state_unlock(pp);

#ifdef ERTS_SMP

erts_smp_atomic_dec(&pp->refc); /* Not alive */

#endif

ERTS_SMP_LC_ASSERT(erts_smp_atomic_read(&pp->refc) > 0); /* Lock */

handle_remaining_tasks(runq, pp); /* May release runq lock */

        /*这个函数将释放挂在port上的所有ErtsPortTask,port能够执行的各项任务,也被像消息一样发给port,由port异步执行,这里将释放port的任务队列上的所有ErtsPortTask任务的数据结构*/

ASSERT(!pp->sched.exe_taskq && (!ptqp || !ptqp->first));

pp->sched.taskq = NULL;

ERTS_PT_CHK_PRES_PORTQ(runq, pp);

#ifndef ERTS_SMP

ASSERT(pp->status & ERTS_PORT_SFLG_PORT_DEBUG);

erts_port_status_set(pp, ERTS_PORT_SFLG_FREE);

        /* port的状态又被改为了ERTS_PORT_SFLG_FREE,它也是ERTS_PORT_SFLGS_DEAD的一种,但设置为这个状态后,表名port原先的描述符Port数据结构可以被重新分配给一个新建立的port了,因为之前已经触发了erts_may_save_closed_port,因此按照顺序执行流的执行,除非发生异常,否则必然会到此处 */

#endif

erts_smp_runq_unlock(runq);

 

if (erts_system_profile_flags.runnable_ports && port_is_dequeued) {

       profile_runnable_port(pp, am_inactive);

    }

 

if (ptqp)

   port_taskq_free(ptqp);

        /*释放port的任务队列*/

    }

}

由此可见port的释放其实没有那么复杂,虚拟机本身就有port数量限制,每次的port释放都仅仅将port的描述符设置为ERTS_PORT_SFLG_FREE以进行复用,而不会真正释放数据结构。

再来看看用于获取空闲port描述符的get_free_port:

io.c

 

static int get_free_port(void)

{

    Uint num;

    Uint tries = erts_max_ports;

    Port* port;    

 

    erts_smp_spin_lock(&get_free_port_lck);

    num = last_port_num + 1;

    for (;; ++num) {

port = &erts_port[num & erts_port_tab_index_mask];

 

erts_smp_port_state_lock(port);

if (port->status & ERTS_PORT_SFLG_FREE) {

   last_port_num = num;

   erts_smp_spin_unlock(&get_free_port_lck);

   break;

}

erts_smp_port_state_unlock(port);

 

if (--tries == 0) {

   erts_smp_spin_unlock(&get_free_port_lck);

   return -1;

}

    }

    port->status = ERTS_PORT_SFLG_INITIALIZING;

#ifdef ERTS_SMP

    ERTS_SMP_LC_ASSERT(erts_smp_atomic_read(&port->refc) == 0);

    erts_smp_atomic_set(&port->refc, 2); /* Port alive + lock */

#endif

    erts_smp_port_state_unlock(port);

    return num & port_num_mask;

}

get_free_port用于取得一个空闲port描述符,它将遍历erts_port记录的所有port描述符,然后从中取得一个状态为ERTS_PORT_SFLG_FREE的描述符。

由此可见port的分配与释放都不会引发port描述符的内存分配与释放,仅仅会复用一个而已。

至此,问题原因已经基本清楚了,erlang:ports()和erlang:processes()将返回在某个时刻的端口和进程的快照,这样的结果更加一致,因为时刻的快照比时间间隔的快照更加精准。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics