1 // Copyright (C) 2018-2019 HuntLabs. All rights reserved. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 module hunt.gossip.handler.AckMessageHandler; 16 17 import hunt.gossip.Buffer; 18 import std.json; 19 import hunt.gossip.GossipManager; 20 import hunt.gossip.model.Ack2Message; 21 import hunt.gossip.model.AckMessage; 22 import hunt.gossip.model.GossipDigest; 23 import hunt.gossip.model.GossipMember; 24 import hunt.gossip.model.HeartbeatState; 25 import hunt.gossip.handler.MessageHandler; 26 import hunt.collection.HashMap; 27 import hunt.collection.List; 28 import hunt.collection.Map; 29 import hunt.gossip.JsonObject; 30 31 public class AckMessageHandler : MessageHandler { 32 override 33 public void handle(string cluster, string data, string from) { 34 // JsonObject dj = new JsonObject(data); 35 AckMessage ackMessage = AckMessage.decode(parseJSON(data))/* dj.mapTo!(AckMessage)() */; 36 37 List!(GossipDigest) olders = ackMessage.getOlders(); 38 Map!(GossipMember, HeartbeatState) newers = ackMessage.getNewers(); 39 40 //update local state 41 if (newers.size() > 0) { 42 GossipManager.getInstance().apply2LocalState(newers); 43 } 44 45 Map!(GossipMember, HeartbeatState) deltaEndpoints = new HashMap!(GossipMember, HeartbeatState)(); 46 if (olders !is null) { 47 foreach(GossipDigest d ; olders) { 48 GossipMember member = GossipManager.getInstance().createByDigest(d); 49 HeartbeatState hb = GossipManager.getInstance().getEndpointMembers().get(member); 50 if (hb !is null) { 51 deltaEndpoints.put(member, hb); 52 } 53 } 54 } 55 56 if (!deltaEndpoints.isEmpty()) { 57 Ack2Message ack2Message = new Ack2Message(deltaEndpoints); 58 Buffer ack2Buffer = GossipManager.getInstance().encodeAck2Message(ack2Message); 59 if (from !is null) { 60 import std.array; 61 string[] host = from.split(":"); 62 import hunt.Integer; 63 GossipManager.getInstance().getSettings().getMsgService().sendMsg(host[0], Integer.valueOf(host[1]), ack2Buffer); 64 } 65 } 66 } 67 }