1 // Copyright (C) 2018-2019 HuntLabs. All rights reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 module hunt.gossip.handler.AckMessageHandler;
16 
17 import hunt.gossip.Buffer;
18 import std.json;
19 import hunt.gossip.GossipManager;
20 import hunt.gossip.model.Ack2Message;
21 import hunt.gossip.model.AckMessage;
22 import hunt.gossip.model.GossipDigest;
23 import hunt.gossip.model.GossipMember;
24 import hunt.gossip.model.HeartbeatState;
25 import hunt.gossip.handler.MessageHandler;
26 import hunt.collection.HashMap;
27 import hunt.collection.List;
28 import hunt.collection.Map;
29 import hunt.gossip.JsonObject;
30 
31 public class AckMessageHandler : MessageHandler {
32     override
33     public void handle(string cluster, string data, string from) {
34         // JsonObject dj = new JsonObject(data);
35         AckMessage ackMessage = AckMessage.decode(parseJSON(data))/* dj.mapTo!(AckMessage)() */;
36 
37         List!(GossipDigest) olders = ackMessage.getOlders();
38         Map!(GossipMember, HeartbeatState) newers = ackMessage.getNewers();
39 
40         //update local state
41         if (newers.size() > 0) {
42             GossipManager.getInstance().apply2LocalState(newers);
43         }
44 
45         Map!(GossipMember, HeartbeatState) deltaEndpoints = new HashMap!(GossipMember, HeartbeatState)();
46         if (olders !is null) {
47             foreach(GossipDigest d ; olders) {
48                 GossipMember member = GossipManager.getInstance().createByDigest(d);
49                 HeartbeatState hb = GossipManager.getInstance().getEndpointMembers().get(member);
50                 if (hb !is null) {
51                     deltaEndpoints.put(member, hb);
52                 }
53             }
54         }
55 
56         if (!deltaEndpoints.isEmpty()) {
57             Ack2Message ack2Message = new Ack2Message(deltaEndpoints);
58             Buffer ack2Buffer = GossipManager.getInstance().encodeAck2Message(ack2Message);
59             if (from !is null) {
60                 import std.array;
61                 string[] host = from.split(":");
62                 import hunt.Integer;
63                 GossipManager.getInstance().getSettings().getMsgService().sendMsg(host[0], Integer.valueOf(host[1]), ack2Buffer);
64             }
65         }
66     }
67 }