1 // Copyright (C) 2018-2019 HuntLabs. All rights reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 module hunt.gossip.net.udp.UDPMsgService;
16 
17 import hunt.text.StringUtils;
18 import hunt.gossip.Buffer;
19 import std.json;
20 import hunt.gossip.GossipManager;
21 import hunt.gossip.GossipMessageFactory;
22 import hunt.gossip.handler.Ack2MessageHandler;
23 import hunt.gossip.handler.AckMessageHandler;
24 import hunt.gossip.handler.MessageHandler;
25 import hunt.gossip.handler.ShutdownMessageHandler;
26 import hunt.gossip.handler.SyncMessageHandler;
27 import hunt.gossip.model.MessageType;
28 import hunt.gossip.net.MsgService;
29 import hunt.gossip.JsonObject;
30 import hunt.gossip.Common;
31 import hunt.Integer;
32 import hunt.logging;
33 import hunt.event;
34 import hunt.io.UdpSocket : UdpSocket;
35 import std.socket;
36 import std.functional;
37 import std.exception;
38 import core.thread;
39 import core.time;
40 import std.stdio;
41 
42 public class UDPMsgService : MsgService
43 {
44     EventLoop _loop;
45     UdpSocket _udpSocket;
46 
47     this()
48     {
49         _loop = new EventLoop();
50         _udpSocket = new UdpSocket(_loop);
51     }
52 
53     override public void listen(string ipAddress, int port)
54     {
55         _udpSocket.bind(ipAddress, cast(ushort)port).setReadData((in ubyte[] data, Address addr) {
56             logInfof("Server => client: %s, received: %s", addr, cast(string) data);
57             handleMsg(Buffer.buffer().appendString(cast(string) data));
58         }).start();
59 
60     }
61 
62     public void start()
63     {
64         _loop.run();
65     }
66 
67     public void stop()
68     {
69         _loop.stop();
70         unListen();
71     }
72 
73     override public void handleMsg(Buffer data)
74     {
75         JsonObject j = data.toJsonObject();
76         string msgType = j.getString(GossipMessageFactory.KEY_MSG_TYPE);
77         string _data = j.getString(GossipMessageFactory.KEY_DATA);
78         string cluster = j.getString(GossipMessageFactory.KEY_CLUSTER);
79         string from = j.getString(GossipMessageFactory.KEY_FROM);
80         if (isNullOrEmpty(cluster) || !(GossipManager.getInstance().getCluster() == cluster))
81         {
82             logError("This message shouldn't exist my world!" ~ data.toString());
83             return;
84         }
85         MessageHandler handler = null;
86         MessageType type = MessageType(msgType);
87         if (type.type() == MessageType.SYNC_MESSAGE.type())
88         {
89             handler = new SyncMessageHandler();
90         }
91         else if (type.type() == MessageType.ACK_MESSAGE.type())
92         {
93             handler = new AckMessageHandler();
94         }
95         else if (type.type() == MessageType.ACK2_MESSAGE.type())
96         {
97             handler = new Ack2MessageHandler();
98         }
99         else if (type.type() == MessageType.SHUTDOWN.type())
100         {
101             handler = new ShutdownMessageHandler();
102         }
103         else
104         {
105             logError("Not supported message type");
106         }
107         if (handler !is null)
108         {
109             handler.handle(cluster, _data, from);
110         }
111     }
112 
113     override public void sendMsg(string targetIp, Integer targetPort, Buffer data)
114     {
115         if (targetIp !is null && targetPort !is null && data !is null && _udpSocket !is null)
116         {
117             _udpSocket.sendTo(data.data(), new InternetAddress(targetIp,cast(ushort)(targetPort.intValue)));
118         }
119     }
120 
121     override public void unListen()
122     {
123         if (_udpSocket !is null)
124         {
125             _udpSocket.close();
126         }
127     }
128 }