MeteorCat / Erlang进阶(五)

Created Wed, 17 Jul 2024 20:19:07 +0800 Modified Wed, 29 Oct 2025 23:24:53 +0800
3067 Words

Erlang进阶(五)

这里来到了关键 Erlang 概念部分, OTP 最重要的模板 Supervisor 说明.

Erlang 之前讲过天然带有 Actor 支持, 那么在启动将模块(module) 声明以下的 behaviour 就能达成注册到 Erlang 进程:

%%%-------------------------------------------------------------------
%%% @author MeteorCat
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%% @end
%%%-------------------------------------------------------------------
-module(sys_gateway_supervisor). % 声明模块
-author("MeteorCat").
-behaviour(supervisor). % 声明为 Actor

% 默认 supervisor 的必须实现的回调
-export([start_link/0, init/1]).

这里就是编写 sys_gateway_supervisor.erl 文件, 内部已经实现 Actor 模式, 当然内部还有些需要说明.

supervisor 是重点中的重点, 基本可以视作为 Erlang 的核心.

如果点击进源码就会看到内部的另外实现:

-behaviour(gen_server).

%% External exports
-export([start_link/2, start_link/3,
  start_child/2, restart_child/2,
  delete_child/2, terminate_child/2,
  which_children/1, count_children/1,
  check_childspecs/1, check_childspecs/2,
  get_childspec/2]).

%% Internal exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  terminate/2, code_change/3]).

%% logger callback
-export([format_log/1, format_log/2]).

%% For release_handler only
-export([get_callback_module/1]).


-type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}.
-type modules() :: [module()] | 'dynamic'.
-type restart() :: 'permanent' | 'transient' | 'temporary'.
-type significant() :: boolean().
-type shutdown() :: 'brutal_kill' | timeout().
-type worker() :: 'worker' | 'supervisor'.

内部声明 -behaviour(gen_server)., 还有内部这么多原子配置项是用来干什么 ?

gen_server 也是 Erlang 当中关键的组件, 用于声明挂起 Actor 服务启动, 内部提供功能:

  • 并发处理请求, 支持异步处理多个并发
  • 支持动态添加|退出进程, 用于按照负载扩展
  • 提供监控和重启服务功能, 保证了容错问题
  • 基于信箱消息投递, 保证消息投递可靠性

服务函数启动都为 start_link, 这个回调是最重要的实例化返回本体 Actor 函数

默认 supervisor 关键回调:

%% 最为关键的启动回调函数
start_link() ->
  % supervisor:start_link 是 supervisor 模块内置的启动方法, 和 application:start 差不多
  % 实际上就是转发到 gen_server:start_link 的内部函数
  % 第一个参数: {local, ?MODULE} 是服务启动名, 用于标识服务名称
  % 第二个参数: ?MODULE 就是所在模块名
  % 第三个参数: [] 就是下发给 `init` 回调的参数, 注意这里有多少参数, init[xx,yy] 就要这样声明多少个接收参数
  % --------------------------------------------------------------------
  % 但是正式情况当中我们一般会自己高度定制自己 supervisor, 所以会用 gen_server:start_link 去实例化而不用 supervisor:start_link
  % gen_server:start_link 内部可以定制的更多, 比如可以自己设置 `spawn` 参数让其在指定线程运行(自己实现线程池调度)
  % 这也是后期重点, 用来定制自己的 acceptor 和 workers 进行任务调度
  supervisor:start_link({local, ?MODULE}, ?MODULE, []).

上面说到的线程池和定制自己的 supervisor 也是必须要学会的, 游戏业务后续会大量使用到执行池技术来分摊功能负载.

最明显的就是 TCP 服务启动的时候会构建 worker_核心数acceptor_核心数 大量服务来监听推送网络转发池.

supervisor 可以看做 Erlang 进程管理器, 其可以监控所属于它子进程从而实现 启动|关闭|重启 方法.

需要区分 Erlang 的进程管理是有两个对象:

  • supervisor: 进程管理器, 声明为 -behaviour(supervisor).
  • worker: 进程管理器分离子进程, 被 supervisor 监控, 声明为 -behaviour(gen_server).

主要任务都是由 worker 来做功能业务处理, 而 supervisor 仅仅作为监督者.

进程管理器构建

首先就是构建构建进程管理器, 这里构建个 global_sup 全局管理器备用:

# 这里全局管理器作为系统功能全局功能会放入 `sys` 目录
vim src/sys/global/sys_global_sup.erl

文件内容最初版本如下:

%%%-------------------------------------------------------------------
%%% @author MeteorCat
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 17. 7月 2024 上午9:37
%%%-------------------------------------------------------------------
-module(sys_global_sup).
-author("MeteorCat").
-include("../include/common.hrl").
-behaviour(supervisor).

%% API
-export([start_link/0]).

%% Supervisor callbacks
-export([init/1]).

%%%===================================================================
%%% API functions
%%%===================================================================
%% 初始启动的方法入口
%% @doc Starts the supervisor
-spec(start_link() -> {?ok, Pid :: pid()} | ?ignore | {?error, Reason :: term()}).
start_link() ->
  supervisor:start_link({?local, ?MODULE}, ?MODULE, []).

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
%% 最初版本的 supervisor 初始化方法, 需要返回监督的 worker 工作组
%% @private
%% @doc Whenever a supervisor is started using supervisor:start_link/[2,3],
%% this function is called by the new process to find out about
%% restart strategy, maximum restart frequency and child
%% specifications.
-spec(init(Args :: term()) ->
  {?ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(),
    MaxR :: non_neg_integer(), MaxT :: non_neg_integer()},
    [ChildSpec :: supervisor:child_spec()]}}
  | ?ignore | {?error, Reason :: term()}).
init([]) ->
  MaxRestarts = 1000,
  MaxSecondsBetweenRestarts = 3600,
  SupFlags = #{strategy => one_for_one,
    intensity => MaxRestarts,
    period => MaxSecondsBetweenRestarts},

  AChild = #{id => 'AName',
    start => {'AModule', start_link, []},
    restart => permanent,
    shutdown => 2000,
    type => worker,
    modules => ['AModule']},

  {?ok, {SupFlags, [AChild]}}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

注: 如果看到 ? 开头的小写变量定义, 基本上都是定义在 common 头文件的原子量.

这里主要关注两个函数:

  • start_link: 启动方法, 被外部应用所调用启动
  • init: 启动后的初始化方法, 用于初始化 worker 工作进程开始监控

实际上 start_link 方法是默认反向进行注册:

-spec(start_link() -> {?ok, Pid :: pid()} | ?ignore | {?error, Reason :: term()}).
start_link() ->
  %% 这里就是将当前模块的 supervisor 注册到 Erlang 虚拟机进行进程管理
  % supervisor:start_link({?local, ?MODULE}, ?MODULE, []).
  %% 实际上内部是采用
  % gen_server:start_link(SupName, supervisor, {SupName, Mod, Args}, []).
  %% 多了第二个  supervisor|worker 类型可以变动, 而且第三个列表对象就是 `Spawn` 分离出来进程配置
  %% 这代表了可以进行高级 Erlang 进程管理, 不过目前主要将 supervisor, 其他高阶操作以后扩展
  SupName = {?local, ?MODULE},
  Args = [],
  Mod = ?MODULE,
  gen_server:start_link(SupName, supervisor, {SupName, Mod, Args}, []).

最后关联的就是初始化注册服务 init:

  • 如果你采用 supervisor:start_link, 在第三个参数(最后的参数)就是传递给 init 的参数
  • 如果你采用 gen_server:start_link, 在第三个参数的 Args 就是传递给 init 的参数

这里示范传入字符串给 init:


%%% 测试启动并且传入 `Hello.World`
-spec(start_link() -> {?ok, Pid :: pid()} | ?ignore | {?error, Reason :: term()}).
start_link() ->
  supervisor:start_link({?local, ?MODULE}, ?MODULE, [
    "Hello.World"
  ]).


%%% 初始化接收到参数
-spec(init(Args :: term()) ->
  {?ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(),
    MaxR :: non_neg_integer(), MaxT :: non_neg_integer()},
    [ChildSpec :: supervisor:child_spec()]}}
  | ?ignore | {?error, Reason :: term()}).
init([
  _Msg
]) -> done.

目前还没办法启动监督器, 因为缺少 worker 服务作为子进程被监控, 所以要开始编写个定时进程托管给 sys_global_sup 管理.

Worker 服务编写

之前编写好初始化的全局管理器, 现在就要开始编写管理器应该处理的进程任务:

# 这里命名习惯性将工作进程 xxx_srv , 有的喜欢称为 xxx_worker, 本质上按个人需求处理   
vim src/sys/global/sys_global_srv.erl

sys_global_srv 服务最初内容如下:

%%%-------------------------------------------------------------------
%%% @author MeteorCat
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 17. 7月 2024 上午10:17
%%%-------------------------------------------------------------------
-module(sys_global_srv).
-author("MeteorCat").
-include("../include/common.hrl").
-behaviour(gen_server).

%% API
-export([start_link/0]).

%% gen_server callbacks
-export([
  init/1,
  handle_call/3,
  handle_cast/2,
  handle_info/2,
  terminate/2,
  code_change/3
]).


%% 定义接收到推送通知数据
%% 一般 record 数据实体都是保存在头文件之中被全局访问到
-record(sys_global_srv_state, {}).

%%%===================================================================
%%% API
%%%===================================================================
%% 启动入口
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
  {?ok, Pid :: pid()} | ?ignore | {?error, Reason :: term()}).
start_link() ->
  io:format("GlobalServer Started!~n"),
  gen_server:start_link({?local, ?MODULE}, ?MODULE, [], []).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% 初始方法
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
  {?ok, State :: #sys_global_srv_state{}} | {?ok, State :: #sys_global_srv_state{}, timeout() | ?hibernate} |
  {?stop, Reason :: term()} | ?ignore).
init([]) ->
  % 屏蔽进程之中错误信号干扰, 否则无法触发 Terminate 回调
  process_flag(?trap_exit, ?true),
  io:format("[GlobalServer:Init] By Pid ~p~n", [erlang:self()]),
  {?ok, #sys_global_srv_state{}}.

%% 外部调用方法回调 - call 为带有返回值
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
    State :: #sys_global_srv_state{}) ->
  {?reply, Reply :: term(), NewState :: #sys_global_srv_state{}} |
  {?reply, Reply :: term(), NewState :: #sys_global_srv_state{}, timeout() | ?hibernate} |
  {?noreply, NewState :: #sys_global_srv_state{}} |
  {?noreply, NewState :: #sys_global_srv_state{}, timeout() | ?hibernate} |
  {?stop, Reason :: term(), Reply :: term(), NewState :: #sys_global_srv_state{}} |
  {?stop, Reason :: term(), NewState :: #sys_global_srv_state{}}).
handle_call(_Request, _From, State = #sys_global_srv_state{}) ->
  io:format("[GlobalServer:Call] By Pid ~p~n", [erlang:self()]),
  {?reply, ?ok, State}.

%% 外部调用方法回调 - cast 为不带有返回值
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #sys_global_srv_state{}) ->
  {?noreply, NewState :: #sys_global_srv_state{}} |
  {?noreply, NewState :: #sys_global_srv_state{}, timeout() | ?hibernate} |
  {?stop, Reason :: term(), NewState :: #sys_global_srv_state{}}).
handle_cast(_Request, State = #sys_global_srv_state{}) ->
  io:format("[GlobalServer:Cast] By Pid ~p~n", [erlang:self()]),
  {?noreply, State}.

%% 其他无法识别的调用回调 - 当 call|cast 都获取不到时候将会回调至此
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #sys_global_srv_state{}) ->
  {?noreply, NewState :: #sys_global_srv_state{}} |
  {?noreply, NewState :: #sys_global_srv_state{}, timeout() | ?hibernate} |
  {?stop, Reason :: term(), NewState :: #sys_global_srv_state{}}).
handle_info(_Info, State = #sys_global_srv_state{}) ->
  io:format("[GlobalServer:Info] By Pid ~p~n", [erlang:self()]),
  {?noreply, State}.


%% 进程中断回调
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (?normal | ?shutdown | {?shutdown, term()} | term()),
    State :: #sys_global_srv_state{}) -> term()).
terminate(_Reason, _State = #sys_global_srv_state{}) ->
  io:format("[GlobalServer:Terminate] By Pid ~p~n", [erlang:self()]),
  ?ok.

%% 代码变动回调 - 热更新处理
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {?down, term()}, State :: #sys_global_srv_state{},
    Extra :: term()) ->
  {?ok, NewState :: #sys_global_srv_state{}} | {?error, Reason :: term()}).
code_change(_OldVsn, State = #sys_global_srv_state{}, _Extra) ->
  io:format("[GlobalServer:CodeChange] By Pid ~p~n", [erlang:self()]),
  {?ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

现在试着把这个服务过度给 supervisor 处理:

%%%-------------------------------------------------------------------
%%% @author MeteorCat
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%% 全局进程管理器
%%% @end
%%% Created : 17. 7月 2024 上午9:37
%%%-------------------------------------------------------------------
-module(sys_global_sup).
-author("MeteorCat").
-include("../include/common.hrl").
-behaviour(supervisor).

%% API
-export([start_link/0]).

%% Supervisor callbacks
-export([init/1]).

%%%===================================================================
%%% API functions
%%%===================================================================

%% @doc Starts the supervisor
-spec(start_link() -> {?ok, Pid :: pid()} | ?ignore | {?error, Reason :: term()}).
start_link() ->
  supervisor:start_link({?local, ?MODULE}, ?MODULE, [
    "Hello.World"
  ]).

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================

%% @private
%% @doc Whenever a supervisor is started using supervisor:start_link/[2,3],
%% this function is called by the new process to find out about
%% restart strategy, maximum restart frequency and child
%% specifications.
-spec(init(Args :: term()) ->
  {?ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(),
    MaxR :: non_neg_integer(), MaxT :: non_neg_integer()},
    [ChildSpec :: supervisor:child_spec()]}}
  | ?ignore | {?error, Reason :: term()}).
init([Msg]) ->
  io:format("Msg: ~ts By Pid ~p~n", [Msg, erlang:self()]),
  process_flag(?trap_exit, ?true),% 屏蔽进程之中错误信号干扰, 否则无法触发 Terminate 回调

  % 进程是可以批量创建
  Workers = [

    %% 装载 刚刚自定义构建的服务
    #{
      id => sys_global_srv, % 进程标识id, 注意需要保持全局唯一
      type => worker, % gen_server 启动类型, 子进程为 worker
      restart => permanent, % 重启配置, permanent|transient|temporary
      shutdown => 1000, % 休眠时间
      start => {sys_global_srv, start_link, []}, % 调用的启动参数 { 模块, 启动函数, 传入参数 }
      modules => [sys_global_srv] % 对应模块
    }
  ],


  %% Supervisor 启动配置
  MaxRestarts = 100,
  MaxSecondsBetweenRestarts = 5,
  SupFlags = #{
    strategy => one_for_one, % 启动类型
    intensity => MaxRestarts, % 最大重启次数
    period => MaxSecondsBetweenRestarts % 重启的最大秒数
  },

  {?ok, {SupFlags, Workers}}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

现在入口设定启动等待初始化:

% 应用启动入口
main() ->
  %% 启动进程管理器和进程
  io:format("Main Started By ~p~n", [erlang:self()]),
  {?ok, _} = sys_global_sup:start_link(),
  % 打印内容如下:
  % Main Started By <0.10.0>
  % Msg: Hello.World By Pid <0.83.0>
  % GlobalServer Started!
  % [GlobalServer:Init] By Pid <0.84.0>
  %
  % 可以看到执行的话是处于三个进程在运行: 主进程 - <0.10.0>, 管理器进程 - <0.83.0>, Worker进程 - <0.84.0>
  % 这里就是 Erlang 最出名的进程模型, 这时候就需要采用进程推送通信
  % 千万注意不要用原生 Pid ! Msg 做消息推送, 而是要用 gen_server 自带的消息推送机制来推送

  %% Erlang 的 GenServer 带了两个推送方法:
  %% gen_server:call | gen_server:cast
  %% 分别对应了服务当中的 handle_call 和 handle_cast
  gen_server:cast(sys_global_srv, hello), % 调用服务 sys_global_srv:do_cast(hello,From,State) 信号, 这里不需要返回值
  Msg = gen_server:call(sys_global_srv, {echo, "hello.world"}), %调用服务 sys_global_srv:do_call({echo,Msg},From,State) 信号, 需要返回值
  io:format("GlobalServer Echo: ~ts~n", [Msg]), % 这里返回 ok. 因为内部 handle_cast 和 handle_call 都没有拦截信号从而处理
  %% 需要注意: 两个服务器进程不能互相 call|cast, 不然会出现两者依赖进入死锁

  % 最后退出的时候回调
  %% [GlobalServer:Terminate] By Pid <0.84.0>
  %% 需要注意如果进程没有设定 process_flag(?trap_exit, ?true) 是无法触发 Terminate 回调
  ?ok.

至此就已经跑完整体的 Erlang 进程模型的架构, 剩下的业务代码将在 sys_global_srv 当中定义, 比如通过网络请求的时候按照指定协议号推送给指定服务.