1 // Copyright (C) 2018-2019 HuntLabs. All rights reserved. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 module hunt.gossip.net.udp.UDPMsgService; 16 17 import hunt.text.StringUtils; 18 import hunt.gossip.Buffer; 19 import std.json; 20 import hunt.gossip.GossipManager; 21 import hunt.gossip.GossipMessageFactory; 22 import hunt.gossip.handler.Ack2MessageHandler; 23 import hunt.gossip.handler.AckMessageHandler; 24 import hunt.gossip.handler.MessageHandler; 25 import hunt.gossip.handler.ShutdownMessageHandler; 26 import hunt.gossip.handler.SyncMessageHandler; 27 import hunt.gossip.model.MessageType; 28 import hunt.gossip.net.MsgService; 29 import hunt.gossip.JsonObject; 30 import hunt.gossip.Common; 31 import hunt.Integer; 32 import hunt.logging; 33 import hunt.event; 34 import hunt.io.UdpSocket : UdpSocket; 35 import std.socket; 36 import std.functional; 37 import std.exception; 38 import core.thread; 39 import core.time; 40 import std.stdio; 41 42 public class UDPMsgService : MsgService 43 { 44 EventLoop _loop; 45 UdpSocket _udpSocket; 46 47 this() 48 { 49 _loop = new EventLoop(); 50 _udpSocket = new UdpSocket(_loop); 51 } 52 53 override public void listen(string ipAddress, int port) 54 { 55 _udpSocket.bind(ipAddress, cast(ushort)port).setReadData((in ubyte[] data, Address addr) { 56 logInfof("Server => client: %s, received: %s", addr, cast(string) data); 57 handleMsg(Buffer.buffer().appendString(cast(string) data)); 58 }).start(); 59 60 } 61 62 public void start() 63 { 64 _loop.run(); 65 } 66 67 public void stop() 68 { 69 _loop.stop(); 70 unListen(); 71 } 72 73 override public void handleMsg(Buffer data) 74 { 75 JsonObject j = data.toJsonObject(); 76 string msgType = j.getString(GossipMessageFactory.KEY_MSG_TYPE); 77 string _data = j.getString(GossipMessageFactory.KEY_DATA); 78 string cluster = j.getString(GossipMessageFactory.KEY_CLUSTER); 79 string from = j.getString(GossipMessageFactory.KEY_FROM); 80 if (isNullOrEmpty(cluster) || !(GossipManager.getInstance().getCluster() == cluster)) 81 { 82 logError("This message shouldn't exist my world!" ~ data.toString()); 83 return; 84 } 85 MessageHandler handler = null; 86 MessageType type = MessageType(msgType); 87 if (type.type() == MessageType.SYNC_MESSAGE.type()) 88 { 89 handler = new SyncMessageHandler(); 90 } 91 else if (type.type() == MessageType.ACK_MESSAGE.type()) 92 { 93 handler = new AckMessageHandler(); 94 } 95 else if (type.type() == MessageType.ACK2_MESSAGE.type()) 96 { 97 handler = new Ack2MessageHandler(); 98 } 99 else if (type.type() == MessageType.SHUTDOWN.type()) 100 { 101 handler = new ShutdownMessageHandler(); 102 } 103 else 104 { 105 logError("Not supported message type"); 106 } 107 if (handler !is null) 108 { 109 handler.handle(cluster, _data, from); 110 } 111 } 112 113 override public void sendMsg(string targetIp, Integer targetPort, Buffer data) 114 { 115 if (targetIp !is null && targetPort !is null && data !is null && _udpSocket !is null) 116 { 117 _udpSocket.sendTo(data.data(), new InternetAddress(targetIp,cast(ushort)(targetPort.intValue))); 118 } 119 } 120 121 override public void unListen() 122 { 123 if (_udpSocket !is null) 124 { 125 _udpSocket.close(); 126 } 127 } 128 }