1 // Copyright (C) 2018-2019 HuntLabs. All rights reserved. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 module hunt.gossip.handler.SyncMessageHandler; 16 17 import hunt.gossip.Buffer; 18 import hunt.gossip.GossipManager; 19 import hunt.gossip.Serializer; 20 import hunt.gossip.model.AckMessage; 21 import hunt.gossip.model.GossipDigest; 22 import hunt.gossip.model.GossipMember; 23 import hunt.gossip.model.HeartbeatState; 24 import hunt.gossip.handler.MessageHandler; 25 import hunt.collection.ArrayList; 26 import hunt.collection.HashMap; 27 import hunt.collection.List; 28 import hunt.collection.Map; 29 import hunt.collection.Set; 30 import std.json; 31 import hunt.Integer; 32 import std.array; 33 import hunt.logging; 34 35 public class SyncMessageHandler : MessageHandler { 36 override 37 public void handle(string cluster, string data, string from) { 38 if (data !is null) { 39 try { 40 JSONValue array = parseJSON(data); 41 List!(GossipDigest) olders = new ArrayList!(GossipDigest)(); 42 Map!(GossipMember, HeartbeatState) newers = new HashMap!(GossipMember, HeartbeatState)(); 43 List!(GossipMember) gMemberList = new ArrayList!(GossipMember)(); 44 foreach(JSONValue e ; array.array) { 45 GossipDigest g = GossipDigest.decode(e)/* Serializer.getInstance().decode!(GossipDigest)(Buffer.buffer().appendString(e.toString())) */; 46 if(g is null) 47 logError("GossipDigest decode error : ",e); 48 GossipMember member = new GossipMember(); 49 member.setCluster(cluster); 50 member.setIpAddress(g.getEndpoint().getIp()); 51 member.setPort(new Integer(g.getEndpoint().getPort())); 52 member.setId(g.getId()); 53 gMemberList.add(member); 54 55 compareDigest(g, member, cluster, olders, newers); 56 } 57 // I have, you don't have 58 Map!(GossipMember, HeartbeatState) endpoints = GossipManager.getInstance().getEndpointMembers(); 59 GossipMember[] epKeys; 60 foreach(GossipMember k, HeartbeatState v; endpoints) { 61 epKeys ~= k; 62 } 63 // Set!(GossipMember) epKeys = endpoints.keySet(); 64 foreach(GossipMember m ; epKeys) { 65 if (!gMemberList.contains(m)) { 66 newers.put(m, endpoints.get(m)); 67 } 68 if (m.opEquals(GossipManager.getInstance().getSelf())) { 69 newers.put(m, endpoints.get(m)); 70 } 71 } 72 AckMessage ackMessage = new AckMessage(olders, newers); 73 Buffer ackBuffer = GossipManager.getInstance().encodeAckMessage(ackMessage); 74 if (from !is null) { 75 string[] host = from.split(":"); 76 GossipManager.getInstance().getSettings().getMsgService().sendMsg(host[0], Integer.valueOf(host[1]), ackBuffer); 77 } 78 } catch (Throwable e) { 79 logError(e.msg); 80 } 81 } 82 } 83 84 private void compareDigest(GossipDigest g, GossipMember member, string cluster, List!(GossipDigest) olders, Map!(GossipMember, HeartbeatState) newers) { 85 86 try { 87 HeartbeatState hb = GossipManager.getInstance().getEndpointMembers().get(member); 88 long remoteHeartbeatTime = g.getHeartbeatTime(); 89 long remoteVersion = g.getVersion(); 90 if (hb !is null) { 91 long localHeartbeatTime = hb.getHeartbeatTime(); 92 long localVersion = hb.getVersion(); 93 94 if (remoteHeartbeatTime > localHeartbeatTime) { 95 olders.add(g); 96 } else if (remoteHeartbeatTime < localHeartbeatTime) { 97 newers.put(member, hb); 98 } else if (remoteHeartbeatTime == localHeartbeatTime) { 99 if (remoteVersion > localVersion) { 100 olders.add(g); 101 } else if (remoteVersion < localVersion) { 102 newers.put(member, hb); 103 } 104 } 105 } else { 106 olders.add(g); 107 } 108 } catch (Exception e) { 109 logError(e.msg); 110 } 111 } 112 }