`
wqtn22
  • 浏览: 99587 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

R15B01版本controlling_process一个port到self的问题

阅读更多


在R15B01上,遇到一个gen_tcp/gen_udp:controlling_process(Port, self())导致的port泄露问题,下列链接详细的说明了产生问题的步骤:

https://github.com/erlang/otp/commit/944a57a11a79c5a9bb2f554c921e2e00e7d56c91

该问题在R15B03得到了修复,此处分析这个问题如下:


1> {ok,Port} = gen_udp:open(9000, [binary]).

{ok,#Port<0.581>}

2> i(0,31,0).                              

[{current_function,{c,pinfo,1}},

 {initial_call,{erlang,apply,2}},

 {status,running},

 {message_queue_len,0},

 {messages,[]},

 {links,[<0.25.0>,#Port<0.581>]},

 {dictionary,[]},

 {trap_exit,false},

 {error_handler,error_handler},

 {priority,normal},

 {group_leader,<0.24.0>},

 {total_heap_size,5168},

 {heap_size,2584},

 {stack_size,27},

 {reductions,3814},

 {garbage_collection,[{min_bin_vheap_size,46368},

                      {min_heap_size,233},

                      {fullsweep_after,65535},

                      {minor_gcs,1}]},

 {suspending,[]}]

此时,这个新建的port已经关联到shell进程<0,31,0>上。

3> gen_udp:controlling_process(Port, self()).

ok

4> i(0,31,0).                               

[{current_function,{c,pinfo,1}},

 {initial_call,{erlang,apply,2}},

 {status,running},

 {message_queue_len,0},

 {messages,[]},

 {links,[<0.25.0>]},

 {dictionary,[]},

 {trap_exit,false},

 {error_handler,error_handler},

 {priority,normal},

 {group_leader,<0.24.0>},

 {total_heap_size,5168},

 {heap_size,2584},

 {stack_size,27},

 {reductions,8889},

 {garbage_collection,[{min_bin_vheap_size,46368},

                      {min_heap_size,233},

                      {fullsweep_after,65535},

                      {minor_gcs,7}]},


 {suspending,[]}]



controlling_process到自身后,到portlink消失了。

分析这个过程的代码,发现如下问题:




gen_udp.erl

controlling_process(S, NewOwner) ->

inet:udp_controlling_process(S, NewOwner).

inet.erl

udp_controlling_process(S, NewOwner) when is_port(S), is_pid(NewOwner) ->

    case erlang:port_info(S, connected) of

         {connected, Pid} when Pid =/= self() ->

             {error, not_owner};

         _ ->

             {ok, A0} = prim_inet:getopt(S, active),

             prim_inet:setopt(S, active, false),

             udp_sync_input(S, NewOwner),

             try erlang:port_connect(S, NewOwner) of

                   true ->

                       unlink(S),

                       prim_inet:setopt(S, active, A0),

                       ok

             catch

                   error:Reason ->

                       {error, Reason}

             end


end.


一次controlling_process包括两个动作:首先在新进程上port_connect这个port,接着在原进程上unlink这个portport_connectunlink均为bif函数。



BIF_RETTYPE port_connect_2(BIF_ALIST_2)

{

    Port* prt;

    Process* rp;

    Eterm pid = BIF_ARG_2;

 

    if (is_not_internal_pid(pid)) {

    error:

         BIF_ERROR(BIF_P, BADARG);

    }

    prt = id_or_name2port(BIF_P, BIF_ARG_1);

    if (!prt) {

         goto error;

    }

 

    rp = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN,

                          pid, ERTS_PROC_LOCK_LINK);

    if (!rp) {

         erts_smp_port_unlock(prt);

         ERTS_SMP_ASSERT_IS_NOT_EXITING(BIF_P);

         goto error;

    }

 

    erts_add_link(&(rp->nlinks), LINK_PID, prt->id);

    erts_add_link(&(prt->nlinks), LINK_PID, pid);

 

    erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);

 

    prt->connected = pid; /* internal pid */

    erts_smp_port_unlock(prt);

#ifdef USE_VM_PROBES

    if (DTRACE_ENABLED(port_connect)) {

        DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE);

        DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);

        DTRACE_CHARBUF(newprocess_str, DTRACE_TERM_BUF_SIZE);

 

        dtrace_pid_str(prt->connected, process_str);

        erts_snprintf(port_str, sizeof(port_str), "%T", prt->id);

        dtrace_proc_str(rp, newprocess_str);

        DTRACE4(port_connect, process_str, port_str, prt->name, newprocess_str);

    }

#endif

    BIF_RET(am_true);


}


port_connect将在port与新进程上均添加一个link进行关联,但是存在一个问题。



int erts_add_link(ErtsLink **root, Uint type, Eterm pid)

{

    void *tstack[STACK_NEED];

    int tpos = 0;

    int dstack[STACK_NEED+1];

    int dpos = 1;

    int state = 0;

    ErtsLink **this = root;

    Sint c;

 

    dstack[0] = DIR_END;

    for (;;) {

         if (!*this) { /* Found our place */

             state = 1;

             *this = create_link(type,pid);

             break;

         } else if ((c = CMP(pid,(*this)->pid)) < 0) {

             /* go left */

             dstack[dpos++] = DIR_LEFT;

             tstack[tpos++] = this;

             this = &((*this)->left);

         } else if (c > 0) { /* go right */

             dstack[dpos++] = DIR_RIGHT;

             tstack[tpos++] = this;

             this = &((*this)->right);

         } else { /* Equal key is an error for monitors */

             return -1;

         }

    }

    insertion_rotation(dstack, dpos, tstack, tpos, state);

    return 0;


}



erts_add_link中,会遍历进程/portlink表,如果发现link节点已经在自身的link表中,则返回-1,但是在上层调用者port_connect_2处并未检查这个错误,而是直接返回。对于controlling_process到自身的场景中,此处返回后,立即调用unlink。



BIF_RETTYPE unlink_1(BIF_ALIST_1)

{

    Process *rp;

    DistEntry *dep;

    ErtsLink *l = NULL, *rl = NULL;

 

    /*

     * SMP specific note concerning incoming exit signals:

     *   We have to have at least the status lock during removal of

     *   the link half on current process, and check for and handle

     *   a present pending exit while the status lock is held. This

     *   in order to ensure that we wont be exited by a link after

     *   it has been removed.

     *

     *   (We also have to have the link lock, of course, in order to

     *    be allowed to remove the link...)

     */

 

    if (IS_TRACED_FL(BIF_P, F_TRACE_PROCS)) {

         trace_proc(BIF_P, BIF_P, am_unlink, BIF_ARG_1);

    }

 

    if (is_internal_port(BIF_ARG_1)) {

         Port *pt = erts_id2port_sflgs(BIF_ARG_1,

                                           BIF_P,

                                           ERTS_PROC_LOCK_MAIN,

                                           ERTS_PORT_SFLGS_DEAD);

 

         erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCK_STATUS);

#ifdef ERTS_SMP

         if (ERTS_PROC_PENDING_EXIT(BIF_P)) {

             if (pt)

                   erts_smp_port_unlock(pt);

             goto handle_pending_exit;

         }

#endif

 

         l = erts_remove_link(&BIF_P->nlinks, BIF_ARG_1);

 

         ASSERT(pt || !l);

 

         if (pt) {

             rl = erts_remove_link(&pt->nlinks, BIF_P->id);

             erts_smp_port_unlock(pt);

             if (rl)

                   erts_destroy_link(rl);

         }

 

         erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCK_STATUS);

 

         if (l)

             erts_destroy_link(l);

 

         BIF_RET(am_true);

    }

    else if (is_external_port(BIF_ARG_1)

              && external_port_dist_entry(BIF_ARG_1) == erts_this_dist_entry) {

         BIF_RET(am_true);

    }

 

    if (is_not_pid(BIF_ARG_1))

         BIF_ERROR(BIF_P, BADARG);

    …

}



代码篇幅较大,这里仅列出port相关的部分,其中对erts_remove_link的调用是关键部分,它将移除原先通过erts_add_link加入的link节点。



ErtsLink *erts_remove_link(ErtsLink **root, Eterm pid)

{

    ErtsLink **tstack[STACK_NEED];

    int tpos = 0;

    int dstack[STACK_NEED+1];

    int dpos = 1;

    int state = 0;

    ErtsLink **this = root;

    Sint c;

    int dir;

    ErtsLink *q = NULL;

 

    dstack[0] = DIR_END;

    for (;;) {

         if (!*this) { /* Failure */

             return NULL;

         } else if ((c = CMP(pid,(*this)->pid)) < 0) {

             dstack[dpos++] = DIR_LEFT;

             tstack[tpos++] = this;

             this = &((*this)->left);

         } else if (c > 0) { /* go right */

             dstack[dpos++] = DIR_RIGHT;

             tstack[tpos++] = this;

             this = &((*this)->right);

         } else { /* Equal key, found the one to delete */

             q = (*this);

             if (q->right == NULL) {

                   (*this) = q->left;

                   state = 1;

             } else if (q->left == NULL) {

                   (*this) = q->right;

                   state = 1;

             } else {

                   dstack[dpos++] = DIR_LEFT;

                   tstack[tpos++] = this;

                   state = delsub((ErtsMonitorOrLink **) this);

             }

             break;

         }

    }

    while (state && ( dir = dstack[--dpos] ) != DIR_END) {

         this = tstack[--tpos];

         if (dir == DIR_LEFT) {

             state = balance_left((ErtsMonitorOrLink **) this);

         } else {

             state = balance_right((ErtsMonitorOrLink **) this);

         }

    }

    return q;


}



可以看到,在这个场景中,由于原先进程与portlink在一起的,此处必然能将它们unlink


回到udp_controlling_process



udp_controlling_process(S, NewOwner) when is_port(S), is_pid(NewOwner) ->

    case erlang:port_info(S, connected) of

         {connected, Pid} when Pid =/= self() ->

             {error, not_owner};

         _ ->

             {ok, A0} = prim_inet:getopt(S, active),

             prim_inet:setopt(S, active, false),

             udp_sync_input(S, NewOwner),

             try erlang:port_connect(S, NewOwner) of

                   true ->

                       unlink(S),

                       prim_inet:setopt(S, active, A0),

                       ok

             catch

                   error:Reason ->

                       {error, Reason}

             end


end.



综上所述,在controlling_process到自身的场景中,erlang:port_connect不能将port再次加入原进程的link列表,而unlink却会正常地将port与原进程断开,此后,port变为无主port


此时,若需要关闭这个port,可以通过erlang:port_close关闭。



5> lists:last(erlang:ports()).

#Port<0.581>

6> P = lists:last(erlang:ports()).

#Port<0.581>

7> erlang:port_close(P).

true

8> inet:i().


ok



修复这个bug的代码,作了一项额外检查,若发现新进程与原进程相同,则直接返回ok

udp_controlling_process(S, NewOwner) when is_port(S), is_pid(NewOwner) ->

    case erlang:port_info(S, connected) of

         {connected, NewOwner} ->

             ok;

         {connected, Pid} when Pid =/= self() ->

             {error, not_owner};

         _ ->

             {ok, A0} = prim_inet:getopt(S, active),

             prim_inet:setopt(S, active, false),

             udp_sync_input(S, NewOwner),

             try erlang:port_connect(S, NewOwner) of

                   true ->

                       unlink(S),

                       prim_inet:setopt(S, active, A0),

                       ok

             catch

                   error:Reason ->

                       {error, Reason}

             end

    end.


 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics