1 // Copyright (C) 2018-2019 HuntLabs. All rights reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 module hunt.gossip.handler.SyncMessageHandler;
16 
17 import hunt.gossip.Buffer;
18 import hunt.gossip.GossipManager;
19 import hunt.gossip.Serializer;
20 import hunt.gossip.model.AckMessage;
21 import hunt.gossip.model.GossipDigest;
22 import hunt.gossip.model.GossipMember;
23 import hunt.gossip.model.HeartbeatState;
24 import hunt.gossip.handler.MessageHandler;
25 import hunt.collection.ArrayList;
26 import hunt.collection.HashMap;
27 import hunt.collection.List;
28 import hunt.collection.Map;
29 import hunt.collection.Set;
30 import std.json;
31 import hunt.Integer;
32 import std.array;
33 import hunt.logging;
34 
35 public class SyncMessageHandler : MessageHandler {
36     override
37     public void handle(string cluster, string data, string from) {
38         if (data !is null) {
39             try {
40                 JSONValue array = parseJSON(data);
41                 List!(GossipDigest) olders = new ArrayList!(GossipDigest)();
42                 Map!(GossipMember, HeartbeatState) newers = new HashMap!(GossipMember, HeartbeatState)();
43                 List!(GossipMember) gMemberList = new ArrayList!(GossipMember)();
44                 foreach(JSONValue e ; array.array) {
45                     GossipDigest g = GossipDigest.decode(e)/* Serializer.getInstance().decode!(GossipDigest)(Buffer.buffer().appendString(e.toString())) */;
46                     if(g is null)
47                         logError("GossipDigest decode error : ",e);
48                     GossipMember member = new GossipMember();
49                     member.setCluster(cluster);
50                     member.setIpAddress(g.getEndpoint().getIp());
51                     member.setPort(new Integer(g.getEndpoint().getPort()));
52                     member.setId(g.getId());
53                     gMemberList.add(member);
54 
55                     compareDigest(g, member, cluster, olders, newers);
56                 }
57                 // I have, you don't have
58                 Map!(GossipMember, HeartbeatState) endpoints = GossipManager.getInstance().getEndpointMembers();
59                 GossipMember[] epKeys;
60                 foreach(GossipMember k, HeartbeatState v; endpoints) {
61                     epKeys ~= k;
62                 }
63                 // Set!(GossipMember) epKeys = endpoints.keySet();
64                 foreach(GossipMember m ; epKeys) {
65                     if (!gMemberList.contains(m)) {
66                         newers.put(m, endpoints.get(m));
67                     }
68                     if (m.opEquals(GossipManager.getInstance().getSelf())) {
69                         newers.put(m, endpoints.get(m));
70                     }
71                 }
72                 AckMessage ackMessage = new AckMessage(olders, newers);
73                 Buffer ackBuffer = GossipManager.getInstance().encodeAckMessage(ackMessage);
74                 if (from !is null) {
75                     string[] host = from.split(":");
76                     GossipManager.getInstance().getSettings().getMsgService().sendMsg(host[0], Integer.valueOf(host[1]), ackBuffer);
77                 }
78             } catch (Throwable e) {
79                 logError(e.msg);
80             }
81         }
82     }
83 
84     private void compareDigest(GossipDigest g, GossipMember member, string cluster, List!(GossipDigest) olders, Map!(GossipMember, HeartbeatState) newers) {
85 
86         try {
87             HeartbeatState hb = GossipManager.getInstance().getEndpointMembers().get(member);
88             long remoteHeartbeatTime = g.getHeartbeatTime();
89             long remoteVersion = g.getVersion();
90             if (hb !is null) {
91                 long localHeartbeatTime = hb.getHeartbeatTime();
92                 long localVersion = hb.getVersion();
93 
94                 if (remoteHeartbeatTime > localHeartbeatTime) {
95                     olders.add(g);
96                 } else if (remoteHeartbeatTime < localHeartbeatTime) {
97                     newers.put(member, hb);
98                 } else if (remoteHeartbeatTime == localHeartbeatTime) {
99                     if (remoteVersion > localVersion) {
100                         olders.add(g);
101                     } else if (remoteVersion < localVersion) {
102                         newers.put(member, hb);
103                     }
104                 }
105             } else {
106                 olders.add(g);
107             }
108         } catch (Exception e) {
109             logError(e.msg);
110         }
111     }
112 }