Erlang进阶(五)
这里来到了关键 Erlang 概念部分, OTP 最重要的模板 Supervisor 说明.
Erlang 之前讲过天然带有 Actor 支持, 那么在启动将模块(module) 声明以下的 behaviour 就能达成注册到 Erlang 进程:
%%%-------------------------------------------------------------------
%%% @author MeteorCat
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%% @end
%%%-------------------------------------------------------------------
-module(sys_gateway_supervisor). % 声明模块
-author("MeteorCat").
-behaviour(supervisor). % 声明为 Actor
% 默认 supervisor 的必须实现的回调
-export([start_link/0, init/1]).
这里就是编写 sys_gateway_supervisor.erl 文件, 内部已经实现 Actor 模式, 当然内部还有些需要说明.
supervisor是重点中的重点, 基本可以视作为Erlang的核心.
如果点击进源码就会看到内部的另外实现:
-behaviour(gen_server).
%% External exports
-export([start_link/2, start_link/3,
start_child/2, restart_child/2,
delete_child/2, terminate_child/2,
which_children/1, count_children/1,
check_childspecs/1, check_childspecs/2,
get_childspec/2]).
%% Internal exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% logger callback
-export([format_log/1, format_log/2]).
%% For release_handler only
-export([get_callback_module/1]).
-type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}.
-type modules() :: [module()] | 'dynamic'.
-type restart() :: 'permanent' | 'transient' | 'temporary'.
-type significant() :: boolean().
-type shutdown() :: 'brutal_kill' | timeout().
-type worker() :: 'worker' | 'supervisor'.
内部声明 -behaviour(gen_server)., 还有内部这么多原子配置项是用来干什么 ?
gen_server 也是 Erlang 当中关键的组件, 用于声明挂起 Actor 服务启动, 内部提供功能:
- 并发处理请求, 支持异步处理多个并发
- 支持动态添加|退出进程, 用于按照负载扩展
- 提供监控和重启服务功能, 保证了容错问题
- 基于信箱消息投递, 保证消息投递可靠性
服务函数启动都为
start_link, 这个回调是最重要的实例化返回本体 Actor 函数
默认 supervisor 关键回调:
%% 最为关键的启动回调函数
start_link() ->
% supervisor:start_link 是 supervisor 模块内置的启动方法, 和 application:start 差不多
% 实际上就是转发到 gen_server:start_link 的内部函数
% 第一个参数: {local, ?MODULE} 是服务启动名, 用于标识服务名称
% 第二个参数: ?MODULE 就是所在模块名
% 第三个参数: [] 就是下发给 `init` 回调的参数, 注意这里有多少参数, init[xx,yy] 就要这样声明多少个接收参数
% --------------------------------------------------------------------
% 但是正式情况当中我们一般会自己高度定制自己 supervisor, 所以会用 gen_server:start_link 去实例化而不用 supervisor:start_link
% gen_server:start_link 内部可以定制的更多, 比如可以自己设置 `spawn` 参数让其在指定线程运行(自己实现线程池调度)
% 这也是后期重点, 用来定制自己的 acceptor 和 workers 进行任务调度
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
上面说到的线程池和定制自己的 supervisor 也是必须要学会的, 游戏业务后续会大量使用到执行池技术来分摊功能负载.
最明显的就是 TCP 服务启动的时候会构建
worker_核心数和acceptor_核心数大量服务来监听推送网络转发池.
supervisor 可以看做 Erlang 进程管理器, 其可以监控所属于它子进程从而实现 启动|关闭|重启 方法.
需要区分 Erlang 的进程管理是有两个对象:
supervisor: 进程管理器, 声明为-behaviour(supervisor).worker: 进程管理器分离子进程, 被supervisor监控, 声明为-behaviour(gen_server).
主要任务都是由
worker来做功能业务处理, 而supervisor仅仅作为监督者.
进程管理器构建
首先就是构建构建进程管理器, 这里构建个 global_sup 全局管理器备用:
# 这里全局管理器作为系统功能全局功能会放入 `sys` 目录
vim src/sys/global/sys_global_sup.erl
文件内容最初版本如下:
%%%-------------------------------------------------------------------
%%% @author MeteorCat
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 17. 7月 2024 上午9:37
%%%-------------------------------------------------------------------
-module(sys_global_sup).
-author("MeteorCat").
-include("../include/common.hrl").
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
%%%===================================================================
%%% API functions
%%%===================================================================
%% 初始启动的方法入口
%% @doc Starts the supervisor
-spec(start_link() -> {?ok, Pid :: pid()} | ?ignore | {?error, Reason :: term()}).
start_link() ->
supervisor:start_link({?local, ?MODULE}, ?MODULE, []).
%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
%% 最初版本的 supervisor 初始化方法, 需要返回监督的 worker 工作组
%% @private
%% @doc Whenever a supervisor is started using supervisor:start_link/[2,3],
%% this function is called by the new process to find out about
%% restart strategy, maximum restart frequency and child
%% specifications.
-spec(init(Args :: term()) ->
{?ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(),
MaxR :: non_neg_integer(), MaxT :: non_neg_integer()},
[ChildSpec :: supervisor:child_spec()]}}
| ?ignore | {?error, Reason :: term()}).
init([]) ->
MaxRestarts = 1000,
MaxSecondsBetweenRestarts = 3600,
SupFlags = #{strategy => one_for_one,
intensity => MaxRestarts,
period => MaxSecondsBetweenRestarts},
AChild = #{id => 'AName',
start => {'AModule', start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['AModule']},
{?ok, {SupFlags, [AChild]}}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
注: 如果看到
?开头的小写变量定义, 基本上都是定义在common头文件的原子量.
这里主要关注两个函数:
start_link: 启动方法, 被外部应用所调用启动init: 启动后的初始化方法, 用于初始化 worker 工作进程开始监控
实际上 start_link 方法是默认反向进行注册:
-spec(start_link() -> {?ok, Pid :: pid()} | ?ignore | {?error, Reason :: term()}).
start_link() ->
%% 这里就是将当前模块的 supervisor 注册到 Erlang 虚拟机进行进程管理
% supervisor:start_link({?local, ?MODULE}, ?MODULE, []).
%% 实际上内部是采用
% gen_server:start_link(SupName, supervisor, {SupName, Mod, Args}, []).
%% 多了第二个 supervisor|worker 类型可以变动, 而且第三个列表对象就是 `Spawn` 分离出来进程配置
%% 这代表了可以进行高级 Erlang 进程管理, 不过目前主要将 supervisor, 其他高阶操作以后扩展
SupName = {?local, ?MODULE},
Args = [],
Mod = ?MODULE,
gen_server:start_link(SupName, supervisor, {SupName, Mod, Args}, []).
最后关联的就是初始化注册服务 init:
- 如果你采用
supervisor:start_link, 在第三个参数(最后的参数)就是传递给init的参数 - 如果你采用
gen_server:start_link, 在第三个参数的Args就是传递给init的参数
这里示范传入字符串给 init:
%%% 测试启动并且传入 `Hello.World`
-spec(start_link() -> {?ok, Pid :: pid()} | ?ignore | {?error, Reason :: term()}).
start_link() ->
supervisor:start_link({?local, ?MODULE}, ?MODULE, [
"Hello.World"
]).
%%% 初始化接收到参数
-spec(init(Args :: term()) ->
{?ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(),
MaxR :: non_neg_integer(), MaxT :: non_neg_integer()},
[ChildSpec :: supervisor:child_spec()]}}
| ?ignore | {?error, Reason :: term()}).
init([
_Msg
]) -> done.
目前还没办法启动监督器, 因为缺少 worker 服务作为子进程被监控, 所以要开始编写个定时进程托管给 sys_global_sup 管理.
Worker 服务编写
之前编写好初始化的全局管理器, 现在就要开始编写管理器应该处理的进程任务:
# 这里命名习惯性将工作进程 xxx_srv , 有的喜欢称为 xxx_worker, 本质上按个人需求处理
vim src/sys/global/sys_global_srv.erl
sys_global_srv 服务最初内容如下:
%%%-------------------------------------------------------------------
%%% @author MeteorCat
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 17. 7月 2024 上午10:17
%%%-------------------------------------------------------------------
-module(sys_global_srv).
-author("MeteorCat").
-include("../include/common.hrl").
-behaviour(gen_server).
%% API
-export([start_link/0]).
%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
%% 定义接收到推送通知数据
%% 一般 record 数据实体都是保存在头文件之中被全局访问到
-record(sys_global_srv_state, {}).
%%%===================================================================
%%% API
%%%===================================================================
%% 启动入口
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{?ok, Pid :: pid()} | ?ignore | {?error, Reason :: term()}).
start_link() ->
io:format("GlobalServer Started!~n"),
gen_server:start_link({?local, ?MODULE}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% 初始方法
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{?ok, State :: #sys_global_srv_state{}} | {?ok, State :: #sys_global_srv_state{}, timeout() | ?hibernate} |
{?stop, Reason :: term()} | ?ignore).
init([]) ->
% 屏蔽进程之中错误信号干扰, 否则无法触发 Terminate 回调
process_flag(?trap_exit, ?true),
io:format("[GlobalServer:Init] By Pid ~p~n", [erlang:self()]),
{?ok, #sys_global_srv_state{}}.
%% 外部调用方法回调 - call 为带有返回值
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #sys_global_srv_state{}) ->
{?reply, Reply :: term(), NewState :: #sys_global_srv_state{}} |
{?reply, Reply :: term(), NewState :: #sys_global_srv_state{}, timeout() | ?hibernate} |
{?noreply, NewState :: #sys_global_srv_state{}} |
{?noreply, NewState :: #sys_global_srv_state{}, timeout() | ?hibernate} |
{?stop, Reason :: term(), Reply :: term(), NewState :: #sys_global_srv_state{}} |
{?stop, Reason :: term(), NewState :: #sys_global_srv_state{}}).
handle_call(_Request, _From, State = #sys_global_srv_state{}) ->
io:format("[GlobalServer:Call] By Pid ~p~n", [erlang:self()]),
{?reply, ?ok, State}.
%% 外部调用方法回调 - cast 为不带有返回值
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #sys_global_srv_state{}) ->
{?noreply, NewState :: #sys_global_srv_state{}} |
{?noreply, NewState :: #sys_global_srv_state{}, timeout() | ?hibernate} |
{?stop, Reason :: term(), NewState :: #sys_global_srv_state{}}).
handle_cast(_Request, State = #sys_global_srv_state{}) ->
io:format("[GlobalServer:Cast] By Pid ~p~n", [erlang:self()]),
{?noreply, State}.
%% 其他无法识别的调用回调 - 当 call|cast 都获取不到时候将会回调至此
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #sys_global_srv_state{}) ->
{?noreply, NewState :: #sys_global_srv_state{}} |
{?noreply, NewState :: #sys_global_srv_state{}, timeout() | ?hibernate} |
{?stop, Reason :: term(), NewState :: #sys_global_srv_state{}}).
handle_info(_Info, State = #sys_global_srv_state{}) ->
io:format("[GlobalServer:Info] By Pid ~p~n", [erlang:self()]),
{?noreply, State}.
%% 进程中断回调
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (?normal | ?shutdown | {?shutdown, term()} | term()),
State :: #sys_global_srv_state{}) -> term()).
terminate(_Reason, _State = #sys_global_srv_state{}) ->
io:format("[GlobalServer:Terminate] By Pid ~p~n", [erlang:self()]),
?ok.
%% 代码变动回调 - 热更新处理
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {?down, term()}, State :: #sys_global_srv_state{},
Extra :: term()) ->
{?ok, NewState :: #sys_global_srv_state{}} | {?error, Reason :: term()}).
code_change(_OldVsn, State = #sys_global_srv_state{}, _Extra) ->
io:format("[GlobalServer:CodeChange] By Pid ~p~n", [erlang:self()]),
{?ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
现在试着把这个服务过度给 supervisor 处理:
%%%-------------------------------------------------------------------
%%% @author MeteorCat
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%% 全局进程管理器
%%% @end
%%% Created : 17. 7月 2024 上午9:37
%%%-------------------------------------------------------------------
-module(sys_global_sup).
-author("MeteorCat").
-include("../include/common.hrl").
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
%%%===================================================================
%%% API functions
%%%===================================================================
%% @doc Starts the supervisor
-spec(start_link() -> {?ok, Pid :: pid()} | ?ignore | {?error, Reason :: term()}).
start_link() ->
supervisor:start_link({?local, ?MODULE}, ?MODULE, [
"Hello.World"
]).
%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
%% @private
%% @doc Whenever a supervisor is started using supervisor:start_link/[2,3],
%% this function is called by the new process to find out about
%% restart strategy, maximum restart frequency and child
%% specifications.
-spec(init(Args :: term()) ->
{?ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(),
MaxR :: non_neg_integer(), MaxT :: non_neg_integer()},
[ChildSpec :: supervisor:child_spec()]}}
| ?ignore | {?error, Reason :: term()}).
init([Msg]) ->
io:format("Msg: ~ts By Pid ~p~n", [Msg, erlang:self()]),
process_flag(?trap_exit, ?true),% 屏蔽进程之中错误信号干扰, 否则无法触发 Terminate 回调
% 进程是可以批量创建
Workers = [
%% 装载 刚刚自定义构建的服务
#{
id => sys_global_srv, % 进程标识id, 注意需要保持全局唯一
type => worker, % gen_server 启动类型, 子进程为 worker
restart => permanent, % 重启配置, permanent|transient|temporary
shutdown => 1000, % 休眠时间
start => {sys_global_srv, start_link, []}, % 调用的启动参数 { 模块, 启动函数, 传入参数 }
modules => [sys_global_srv] % 对应模块
}
],
%% Supervisor 启动配置
MaxRestarts = 100,
MaxSecondsBetweenRestarts = 5,
SupFlags = #{
strategy => one_for_one, % 启动类型
intensity => MaxRestarts, % 最大重启次数
period => MaxSecondsBetweenRestarts % 重启的最大秒数
},
{?ok, {SupFlags, Workers}}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
现在入口设定启动等待初始化:
% 应用启动入口
main() ->
%% 启动进程管理器和进程
io:format("Main Started By ~p~n", [erlang:self()]),
{?ok, _} = sys_global_sup:start_link(),
% 打印内容如下:
% Main Started By <0.10.0>
% Msg: Hello.World By Pid <0.83.0>
% GlobalServer Started!
% [GlobalServer:Init] By Pid <0.84.0>
%
% 可以看到执行的话是处于三个进程在运行: 主进程 - <0.10.0>, 管理器进程 - <0.83.0>, Worker进程 - <0.84.0>
% 这里就是 Erlang 最出名的进程模型, 这时候就需要采用进程推送通信
% 千万注意不要用原生 Pid ! Msg 做消息推送, 而是要用 gen_server 自带的消息推送机制来推送
%% Erlang 的 GenServer 带了两个推送方法:
%% gen_server:call | gen_server:cast
%% 分别对应了服务当中的 handle_call 和 handle_cast
gen_server:cast(sys_global_srv, hello), % 调用服务 sys_global_srv:do_cast(hello,From,State) 信号, 这里不需要返回值
Msg = gen_server:call(sys_global_srv, {echo, "hello.world"}), %调用服务 sys_global_srv:do_call({echo,Msg},From,State) 信号, 需要返回值
io:format("GlobalServer Echo: ~ts~n", [Msg]), % 这里返回 ok. 因为内部 handle_cast 和 handle_call 都没有拦截信号从而处理
%% 需要注意: 两个服务器进程不能互相 call|cast, 不然会出现两者依赖进入死锁
% 最后退出的时候回调
%% [GlobalServer:Terminate] By Pid <0.84.0>
%% 需要注意如果进程没有设定 process_flag(?trap_exit, ?true) 是无法触发 Terminate 回调
?ok.
至此就已经跑完整体的 Erlang 进程模型的架构, 剩下的业务代码将在 sys_global_srv 当中定义,
比如通过网络请求的时候按照指定协议号推送给指定服务.