ref: rc/2.0.0 utos/utos/src/utos_net_websocket_handler.erl -rw-r--r-- 6.9 KiB View raw
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
-module(utos_net_websocket_handler).
-behaviour(gen_server).

-export([start_link/2]).

-export(
   [
    actor_send/3,
    code_change/3,
    handle_call/3,
    handle_cast/2,
    handle_info/2,
    init/1,
    terminate/2
   ]
  ).

-export(
   [
    io_send/2,
    uname/1
   ]
  ).

-define(SECKEY, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").

actor_send(Pid, Name, Msg) when is_pid(Pid), is_binary(Name) ->
    {ok, Actor} = call(Pid, {actor, get, Name}),
    cast(Actor, Msg).

call(To, Msg) ->
    gen_server:call(To, Msg).

cast(To, Msg) ->
    gen_server:cast(To, Msg).

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

handle_call({actor, get, N}, _, S) ->
    Result = utos_net_websocket_handler_state:actor_get(N, S),
    {reply, Result, S};
handle_call({actor, set, N, Pid}, _, S) ->
    {ok, State} = utos_net_websocket_handler_state:actor_set(N, Pid, S),
    {reply, {ok, set}, State};
handle_call({db, get, K}, _, S) ->
    Result = utos_net_websocket_handler_state:db_get(K, S),
    {reply, Result, S};
handle_call({db, set, K, V}, _, S) ->
    {ok, State} = utos_net_websocket_handler_state:db_set(K, V, S),
    {reply, {ok, set}, State};
handle_call({db, unset, K}, _, S) ->
    {ok, State} = utos_net_websocket_handler_state:db_unset(K, S),
    {reply, {ok, unset}, State};
handle_call(state, _, S) ->
    {reply, {ok, S}, S};
handle_call(_Request, _From, State) ->
    {reply, ok, State}.

handle_cast({actor, set, N, Pid}, S) ->
    {ok, State} = utos_net_websocket_handler_state:actor_set(N, Pid, S),
    {noreply, State};
handle_cast({buffer, append, empty}, S) ->
    {ok, FlagState} = utos_net_websocket_handler_state:iq_flag(false, S),
    {noreply, FlagState};
handle_cast({buffer, append, Data}, S) ->
    {ok, State} = utos_net_websocket_handler_state:buffer_append(Data, S),
    cast(self(), {buffer, consume}),
    {noreply, State};
handle_cast({buffer, consume}, #{<<"buffer">> := <<>>} = S) ->
    {ok, FlagState} = utos_net_websocket_handler_state:iq_flag(false, S),
    cast(self(), {iq, consume}),
    {noreply, FlagState};
handle_cast({buffer, consume}, #{<<"buffer">> := Buffer} = S) ->
    {ok, Result} = utos_net_websocket_handler_frame:unpack(Buffer),
    cast(self(), {buffer, {consume, result}, Result}),
    {noreply, S};
handle_cast({buffer, {consume, result}, closed}, S) ->
    cast(self(), {net, closed, websocket}),
    {noreply, S};
handle_cast({buffer, {consume, result}, incomplete}, S) ->
    {ok, FlagState} = utos_net_websocket_handler_state:iq_flag(false, S),
    cast(self(), {iq, consume}),
    {noreply, FlagState};
handle_cast({buffer, {consume, result}, {_F, _Op, _L, incomplete, _Rest}}, S) ->
    {ok, FlagState} = utos_net_websocket_handler_state:iq_flag(false, S),
    cast(self(), {iq, consume}),
    {noreply, FlagState};
handle_cast({buffer, {consume, result}, {_F, _Op, _L, Decode, Rest}}, #{<<"bus">> := Bus} = S) ->
    cast(Bus, {net, input, {websocket, self()}, Decode}),
    {ok, BufferState} = utos_net_websocket_handler_state:buffer_set(Rest, S),
    cast(self(), {buffer, consume}),
    {noreply, BufferState};
handle_cast({db, set, K, V}, S) ->
    {ok, State} = utos_net_websocket_handler_state:db_set(K, V, S),
    {noreply, State};
handle_cast({db, unset, K}, S) ->
    {ok, State} = utos_net_websocket_handler_state:db_unset(K, S),
    {noreply, State};
handle_cast({{http, header}, <<"Sec-WebSocket-Key: ", K/binary>>}, #{<<"bus">> := Bus, <<"socket">> := Socket} = S) ->
    cast(Bus, {net, init, {websocket, self()}}),
    Hash = crypto:hash(sha, <<K/binary, ?SECKEY>>),
    Token = base64:encode(Hash),
    Payload = [
	       "HTTP/1.1 101 Switching Protocols\r\n",
	       "Upgrade: websocket\r\n",
	       "Connection: upgrade\r\n",
	       "Sec-WebSocket-Accept: ",
	       Token,
	       "\r\n\r\n"
	      ],
    gen_tcp:send(Socket, Payload),
    {noreply, S};
handle_cast({iq, consume}, #{<<"iq_state">> := false} = S) ->
    {ok, FlagState} = utos_net_websocket_handler_state:iq_flag(true, S),
    {ok, {Next, State}} = utos_net_websocket_handler_state:iq_next(FlagState),
    cast(self(), {buffer, append, Next}),
    {noreply, State};
handle_cast({net, closed, websocket}, #{<<"socket">> := Socket} = S) ->
    Pid = self(),
    spawn(fun() ->
		  gen_tcp:close(Socket),
		  gen_server:stop(Pid)
	  end),
    {noreply, S};
handle_cast({net, input, websocket, <<"GET / HTTP/1.1\r\n", Msg/binary>>}, S) ->
    Pid = self(),
    List = binary:split(Msg, <<"\r\n">>, [global]),
    [cast(Pid, {{http, header}, Header}) || Header <- List],
    {noreply, S};
handle_cast({net, input, websocket, Payload}, S) ->
    {ok, State} = utos_net_websocket_handler_state:iq_add(Payload, S),
    cast(self(), {iq, consume}),
    {noreply, State};
handle_cast({net, output, {raid, {Code, Etag}}}, S) ->
    {ok, Msg} = utos_net_raid:pack(Code, Etag),
    cast(self(), {net, output, Msg}),
    {noreply, S};
handle_cast({net, output, {raid, {Code, Etag, Body}}}, S) ->
    {ok, Msg} = utos_net_raid:pack(Code, Etag, Body),
    cast(self(), {net, output, Msg}),
    {noreply, S};
handle_cast({net, output, Payload}, #{<<"socket">> := Socket} = S) when is_binary(Payload) ->
    {ok, Pack} = utos_net_websocket_handler_frame:pack(Payload),
    gen_tcp:send(Socket, Pack),
    {noreply, S};
handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info(_Info, State) ->
    {noreply, State}.

init({Name, Socket, Bus}) ->
    process_flag(trap_exit, true),
    State = #{
      <<"actors">> => #{},
      <<"bus">> => Bus,
      <<"db">> => #{},
      <<"iq_state">> => false,
      <<"name">> => Name,
      <<"socket">> => Socket
     },
    {ok, State}.

io_send(_, []) ->
    {ok, send};
io_send(Pid, [H | T]) ->
    io_send(Pid, H),
    io_send(Pid, T);
io_send(Pid, {raid, {Code, Etag}}) ->
    {ok, Msg} = utos_net_raid:pack(Code, Etag),
    io_send(Pid, Msg);
io_send(Pid, {raid, {Code, Etag, Body}}) ->
    {ok, Msg} = utos_net_raid:pack(Code, Etag, Body),
    io_send(Pid, Msg);
io_send(Pid, Payload) when is_binary(Payload) ->
    cast(Pid, {net, output, Payload}),
    {ok, send}.

start_link(Socket, Bus) ->
    {ok, Name} = uname(Socket),
    gen_server:start_link({global, Name}, ?MODULE, {Name, Socket, Bus}, []).

terminate(_Reason, _State) ->
    terminate_actor(_State),
    ok.

terminate_actor(#{<<"actor">> := Actor} = _) ->
    List = maps:to_list(Actor),
    terminate_actor(List);
terminate_actor([]) ->
    {ok, terminate};
terminate_actor([{_, Pid} | T]) ->
    spawn(fun() -> gen_server:stop(Pid) end),
    terminate_actor(T),
    {ok, terminate};
terminate_actor(_) ->
    {error, request}.

uname(P) when is_pid(P); is_port(P) ->
    Bin = erlang:term_to_binary(P),
    uname(Bin);
uname(Bin) when is_binary(Bin) ->
    {ok, <<"websocket_handler:", Bin/binary>>};
uname(_) ->
    {error, request}.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

uname_returning_error_request_test() ->
    ?assertEqual({error, request}, uname(error)).

uname_bin_test() ->
    ?assertEqual({ok, <<"websocket_handler:test">>}, uname(<<"test">>)).

-endif.