1 // Copyright (C) 2018-2019 HuntLabs. All rights reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 module hunt.gossip.GossipManager;
16 
17 import hunt.gossip.Buffer;
18 import std.json;
19 
20 import hunt.logging;
21 import hunt.gossip.event.GossipListener;
22 import hunt.gossip.model.Ack2Message;
23 import hunt.gossip.model.AckMessage;
24 import hunt.gossip.model.CandidateMemberState;
25 import hunt.gossip.model.GossipDigest;
26 import hunt.gossip.model.GossipMember;
27 import hunt.gossip.model.GossipState;
28 import hunt.gossip.model.HeartbeatState;
29 import hunt.gossip.model.MessageType;
30 import hunt.gossip.model.SeedMember;
31 import hunt.gossip.GossipMessageFactory;
32 
33 // import java.net.UnknownHostException;
34 import hunt.collection.ArrayList;
35 import hunt.collection.Collections;
36 import hunt.collection.List;
37 import hunt.collection.Map;
38 import std.random;
39 import hunt.collection.Set;
40 import hunt.collection.HashMap;
41 import hunt.concurrency.Executors;
42 import hunt.concurrency.ScheduledExecutorService;
43 import hunt.concurrency.ScheduledThreadPoolExecutor;
44 import hunt.util.DateTime;
45 // import hunt.concurrency.locks.ReentrantReadWriteLock;
46 import core.sync.rwmutex;
47 import hunt.gossip.JsonObject;
48 import hunt.gossip.GossipSettings;
49 import hunt.Integer;
50 import hunt.util.Common;
51 import hunt.gossip.Serializer;
52 import hunt.util.DateTime;
53 import std.conv;
54 import hunt.Exceptions;
55 import core.time;
56 
57 public class GossipManager {
58     // private static final Logger LOGGER = LoggerFactory.getLogger(GossipManager.class);
59     private static GossipManager instance;
60     private long executeGossipTime = 500;
61     private bool _isWorking = false;
62     // private ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock();
63     private ReadWriteMutex rwlock;
64     private ScheduledThreadPoolExecutor doGossipExecutor;
65 //    private ScheduledExecutorService clearExecutor = Executors.newSingleThreadScheduledExecutor();
66 
67     private Map!(GossipMember, HeartbeatState) endpointMembers ;
68     private List!(GossipMember) liveMembers;
69     private List!(GossipMember) deadMembers;
70     private Map!(GossipMember, CandidateMemberState) candidateMembers;
71     private GossipSettings settings;
72     private GossipMember localGossipMember;
73     private string cluster;
74     private GossipListener listener;
75     private Random random;
76 
77     private this() {
78         rwlock = new ReadWriteMutex();
79         doGossipExecutor = cast(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
80         endpointMembers = new HashMap!(GossipMember, HeartbeatState)();
81         liveMembers = new ArrayList!(GossipMember)();
82         deadMembers = new ArrayList!(GossipMember)();
83         candidateMembers = new HashMap!(GossipMember, CandidateMemberState)();
84     }
85 
86     public static GossipManager getInstance() {
87         if(instance is null)
88         {
89             instance = new GossipManager();
90         }
91         return instance;
92     }
93 
94     public void init(string cluster, string ipAddress, Integer port, string id, List!(SeedMember) seedMembers, GossipSettings settings, GossipListener listener) {
95         this.cluster = cluster;
96         this.localGossipMember = new GossipMember();
97         this.localGossipMember.setCluster(cluster);
98         this.localGossipMember.setIpAddress(ipAddress);
99         this.localGossipMember.setPort(port);
100         this.localGossipMember.setId(id);
101         this.localGossipMember.setState(GossipState.JOIN);
102         this.endpointMembers.put(localGossipMember, new HeartbeatState());
103         this.listener = listener;
104         this.settings = settings;
105         this.settings.setSeedMembers(seedMembers);
106         fireGossipEvent(localGossipMember, GossipState.JOIN);
107     }
108 
109     public  void start() {
110         logInfof("Starting gossip! cluster[%s] ip[%s] port[%d] id[%s]", localGossipMember.getCluster(), localGossipMember.getIpAddress(), localGossipMember.getPort().intValue(), localGossipMember.getId());
111         _isWorking = true;
112         settings.getMsgService().listen(getSelf().getIpAddress(), getSelf().getPort().intValue);
113         doGossipExecutor.scheduleAtFixedRate(new GossipTask(), msecs(settings.getGossipInterval()), msecs(settings.getGossipInterval()));
114         settings.getMsgService().start();
115     }
116 
117     public List!(GossipMember) getLiveMembers() {
118         return liveMembers;
119     }
120 
121     public List!(GossipMember) getDeadMembers() {
122         return deadMembers;
123     }
124 
125     public GossipSettings getSettings() {
126         return settings;
127     }
128 
129     public GossipMember getSelf() {
130         return localGossipMember;
131     }
132 
133     public string getID() {
134         return getSelf().getId();
135     }
136 
137     public bool isWorking() {
138         return _isWorking;
139     }
140 
141     public Map!(GossipMember, HeartbeatState) getEndpointMembers() {
142         return endpointMembers;
143     }
144 
145     public string getCluster() {
146         return cluster;
147     }
148 
149     private void randomGossipDigest(List!(GossipDigest) digests) /* throws UnknownHostException */ {
150         GossipMember[] gms;
151         foreach(GossipMember k , HeartbeatState v ; endpointMembers)
152         {
153             gms ~= k;
154         }
155         // List!(GossipMember) endpoints = new ArrayList!(GossipMember)(gms);
156         randomShuffle(gms);
157         foreach(GossipMember ep ; gms) {
158             HeartbeatState hb = endpointMembers.get(ep);
159             long hbTime = 0;
160             long hbVersion = 0;
161             if (hb !is null) {
162                 hbTime = hb.getHeartbeatTime();
163                 hbVersion = hb.getVersion();
164             }
165             digests.add(new GossipDigest(ep, hbTime, hbVersion));
166         }
167     }
168 
169     class GossipTask : Runnable {
170 
171         override
172         public void run() {
173             //Update local member version
174             long newversion = endpointMembers.get(getSelf()).updateVersion();
175             if (isDiscoverable(getSelf())) {
176                 up(getSelf());
177             }
178             version(HUNT_DEBUG) {
179                 trace("sync data");
180                 tracef("Now my heartbeat version is %ld", newversion);
181             }
182 
183             List!(GossipDigest) digests = new ArrayList!(GossipDigest)();
184             try {
185                 randomGossipDigest(digests);
186                 if (digests.size() > 0) {
187                     Buffer syncMessageBuffer = encodeSyncMessage(digests);
188                     //step 1. goosip to a random live member
189                     bool b = gossip2LiveMember(syncMessageBuffer);
190 
191                     //step 2. goosip to a random dead memeber
192                     gossip2UndiscoverableMember(syncMessageBuffer);
193 
194                     //step3.
195                     if (!b || liveMembers.size() <= settings.getSeedMembers().size()) {
196                         gossip2Seed(syncMessageBuffer);
197                     }
198 
199                 }
200                 checkStatus();
201                 version(HUNT_DEBUG) {
202                     trace("live member : " ~ getLiveMembers().toString);
203                     trace("dead member : " ~ getDeadMembers().toString);
204                     trace("endpoint : " ~ getEndpointMembers().toString);
205                 }
206             } catch (Throwable e) {
207                 logError(e.msg);
208             }
209 
210         }
211     }
212 
213     private Buffer encodeSyncMessage(List!(GossipDigest) digests) {
214         Buffer buffer = Buffer.buffer();
215         JSONValue[] array ;
216         foreach(GossipDigest e ; digests) {
217             array ~= e.encode()/* JSONValue(Serializer.getInstance().encode!(GossipDigest)(e).toString()) */;
218         }
219         if(buffer !is null)
220             buffer.appendString(GossipMessageFactory.getInstance().makeMessage(MessageType.SYNC_MESSAGE, JSONValue(array).toString, getCluster(), getSelf().ipAndPort()).encode());
221         return buffer;
222     }
223 
224     public Buffer encodeAckMessage(AckMessage ackMessage) {
225         Buffer buffer = Buffer.buffer();
226         JsonObject ackJson = JsonObject.mapFrom(ackMessage);
227         buffer.appendString(GossipMessageFactory.getInstance().makeMessage(MessageType.ACK_MESSAGE, ackJson.encode(), getCluster(), getSelf().ipAndPort()).encode());
228         return buffer;
229     }
230 
231     public Buffer encodeAck2Message(Ack2Message ack2Message) {
232         Buffer buffer = Buffer.buffer();
233         JsonObject ack2Json = JsonObject.mapFrom(ack2Message);
234         buffer.appendString(GossipMessageFactory.getInstance().makeMessage(MessageType.ACK2_MESSAGE, ack2Json.encode(), getCluster(), getSelf().ipAndPort()).encode());
235         return buffer;
236     }
237 
238     private Buffer encodeShutdownMessage() {
239         Buffer buffer = Buffer.buffer();
240         JsonObject self = JsonObject.mapFrom(getSelf());
241         buffer.appendString(GossipMessageFactory.getInstance().makeMessage(MessageType.SHUTDOWN, self.encode(), getCluster(), getSelf().ipAndPort()).encode());
242         return buffer;
243     }
244 
245     public void apply2LocalState(Map!(GossipMember, HeartbeatState) endpointMembers) {
246         GossipMember[] keys;
247         foreach(GossipMember k, HeartbeatState v; endpointMembers)
248         {
249             keys ~= k;
250         }
251         // Set!(GossipMember) keys = endpointMembers.keySet();
252         foreach(GossipMember m ; keys) {
253             if (getSelf().opEquals(m)) {
254                 continue;
255             }
256 
257             try {
258                 HeartbeatState localState = getEndpointMembers().get(m);
259                 HeartbeatState remoteState = endpointMembers.get(m);
260 
261                 if (localState !is null) {
262                     long localHeartbeatTime = localState.getHeartbeatTime();
263                     long remoteHeartbeatTime = remoteState.getHeartbeatTime();
264                     if (remoteHeartbeatTime > localHeartbeatTime) {
265                         remoteStateReplaceLocalState(m, remoteState);
266                     } else if (remoteHeartbeatTime == localHeartbeatTime) {
267                         long localVersion = localState.getVersion();
268                         long remoteVersion = remoteState.getVersion();
269                         if (remoteVersion > localVersion) {
270                             remoteStateReplaceLocalState(m, remoteState);
271                         }
272                     }
273                 } else {
274                     remoteStateReplaceLocalState(m, remoteState);
275                 }
276             } catch (Exception e) {
277                 logError(e.msg);
278             }
279         }
280     }
281 
282     private void remoteStateReplaceLocalState(GossipMember member, HeartbeatState remoteState) {
283         if (member.getState() == GossipState.UP) {
284             up(member);
285         }
286         if (member.getState() == GossipState.DOWN) {
287             down(member);
288         }
289         if (endpointMembers.containsKey(member)) {
290             endpointMembers.remove(member);
291         }
292         endpointMembers.put(member, remoteState);
293     }
294 
295     public GossipMember createByDigest(GossipDigest digest) {
296         GossipMember member = new GossipMember();
297         member.setPort(new Integer(digest.getEndpoint().getPort()));
298         member.setIpAddress(digest.getEndpoint().getIp());
299         member.setCluster(cluster);
300         GossipMember[] keys;
301         auto em = getEndpointMembers();
302         foreach(GossipMember k, HeartbeatState v; em)
303         {
304             keys ~= k;
305         }
306         // Set!(GossipMember) keys = getEndpointMembers().keySet();
307         foreach(GossipMember m ; keys) {
308             if (m.opEquals(member)) {
309                 member.setId(m.getId());
310                 member.setState(m.getState());
311                 break;
312             }
313         }
314 
315         return member;
316     }
317 
318     /**
319      * send sync message to a live member
320      *
321      * @param buffer sync data
322      * @return if send to a seed member then return TURE
323      */
324     private bool gossip2LiveMember(Buffer buffer) {
325         int liveSize = liveMembers.size();
326         if (liveSize <= 0) {
327             return false;
328         }
329         int index = (liveSize == 1) ? 0 : uniform(0,liveSize);
330         return sendGossip(buffer, liveMembers, index);
331     }
332 
333     /**
334      * send sync message to a dead member
335      *
336      * @param buffer sync data
337      */
338     private void gossip2UndiscoverableMember(Buffer buffer) {
339         int deadSize = deadMembers.size();
340         if (deadSize <= 0) {
341             return;
342         }
343         int index = (deadSize == 1) ? 0 : uniform(0,deadSize);
344         sendGossip(buffer, deadMembers, index);
345     }
346 
347     private void gossip2Seed(Buffer buffer) {
348         int size = settings.getSeedMembers().size();
349         if (size > 0) {
350             if (size == 1 && settings.getSeedMembers().contains(gossipMember2SeedMember(getSelf()))) {
351                 return;
352             }
353             int index = (size == 1) ? 0 : uniform(0,size);
354             // logInfo("size : ",size," index : ",index);
355             if (liveMembers.size() == 1) {
356                 sendGossip2Seed(buffer, settings.getSeedMembers(), index);
357             } else {
358                 double prob = size / /* Double.valueOf */(liveMembers.size())*1.0;
359                 if (uniform(0.0f, 1.0f) < prob) {
360                     sendGossip2Seed(buffer, settings.getSeedMembers(), index);
361                 }
362             }
363         }
364     }
365 
366     private bool sendGossip(Buffer buffer, List!(GossipMember) members, int index) {
367         if (buffer !is null && index >= 0) {
368             try {
369                 GossipMember target = members.get(index);
370                 if (target.opEquals(getSelf())) {
371                     int m_size = members.size();
372                     if (m_size == 1) {
373                         return false;
374                     } else {
375                         target = members.get((index + 1) % m_size);
376                     }
377                 }
378                 settings.getMsgService().sendMsg(target.getIpAddress(), target.getPort(), buffer);
379                 return settings.getSeedMembers().contains(gossipMember2SeedMember(target));
380             } catch (Exception e) {
381                 logError(e.msg);
382             }
383         }
384         return false;
385     }
386 
387     private bool sendGossip2Seed(Buffer buffer, List!(SeedMember) members, int index) {
388         if (buffer !is null && index >= 0) {
389             try {
390                 SeedMember target = members.get(index);
391                 int m_size = members.size();
392                 if (target.opEquals(getSelf())) {
393                     if (m_size <= 1) {
394                         return false;
395                     } else {
396                         target = members.get((index + 1) % m_size);
397                     }
398                 }
399                 settings.getMsgService().sendMsg(target.getIpAddress(), target.getPort(), buffer);
400                 return true;
401             } catch (Exception e) {
402                 logError(e.msg);
403             }
404         }
405         return false;
406     }
407 
408     private SeedMember gossipMember2SeedMember(GossipMember member) {
409         SeedMember seed = new SeedMember(member.getCluster(), member.getIpAddress(), member.getPort(), member.getId());
410         return seed;
411     }
412 
413     private void checkStatus() {
414         try {
415             GossipMember local = getSelf();
416             Map!(GossipMember, HeartbeatState) endpoints = getEndpointMembers();
417             GossipMember[] gms;
418             foreach(GossipMember k, HeartbeatState v; endpoints)
419             {
420                 gms ~= k;
421             }
422             // Set!(GossipMember) epKeys = endpoints.keySet();
423             foreach(GossipMember k ; gms) {
424                 if (!k.opEquals(local)) {
425                     HeartbeatState state = endpoints.get(k);
426                     long now = DateTimeHelper.currentTimeMillis();
427                     long duration = now - state.getHeartbeatTime();
428                     long convictedTime = convictedTime();
429                     logInfo("check : " ~ k.toString() ~ " state : " ~ state.toString() ~ " duration : " ~ duration.to!string ~ " convictedTime : " ~ convictedTime.to!string);
430                     if (duration > convictedTime && (isAlive(k) || getLiveMembers().contains(k))) {
431                         downing(k, state);
432                     }
433                     if (duration <= convictedTime && (isDiscoverable(k) || getDeadMembers().contains(k))) {
434                         up(k);
435                     }
436                 }
437             }
438             checkCandidate();
439         } catch (Exception e) {
440             logError(e.msg);
441         }
442     }
443 
444     private int convergenceCount() {
445         int size = getEndpointMembers().size();
446         import std.math;
447         int count = cast(int) floor(log10(size) + log(size) + 1);
448         return count;
449     }
450 
451     private long convictedTime() {
452         return ((convergenceCount() * (settings.getNetworkDelay() * 3 + executeGossipTime)) << 1) + settings.getGossipInterval();
453     }
454 
455     private bool isDiscoverable(GossipMember member) {
456         return member.getState() == GossipState.JOIN || member.getState() == GossipState.DOWN;
457     }
458 
459     private bool isAlive(GossipMember member) {
460         return member.getState() == GossipState.UP;
461     }
462 
463     public GossipListener getListener() {
464         return listener;
465     }
466 
467     private void fireGossipEvent(GossipMember member, GossipState state) {
468         if (getListener() !is null) {
469             getListener().gossipEvent(member, state);
470         }
471     }
472 
473 //    private void clearMember(GossipMember member) {
474 //        rwlock.writeLock().lock();
475 //        try {
476 //            endpointMembers.remove(member);
477 //        } finally {
478 //            rwlock.writeLock().unlock();
479 //        }
480 //    }
481 
482     public void down(GossipMember member) {
483         logInfo("down ~~");
484         try {
485             rwlock.writer.lock();
486             member.setState(GossipState.DOWN);
487             liveMembers.remove(member);
488             if (!deadMembers.contains(member)) {
489                 deadMembers.add(member);
490             }
491 //            clearExecutor.schedule(() -> clearMember(member), getSettings().getDeleteThreshold() * getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
492             fireGossipEvent(member, GossipState.DOWN);
493         } catch (Exception e) {
494             logError(e.msg);
495         } finally {
496             rwlock.writer.unlock();
497         }
498     }
499 
500     private void up(GossipMember member) {
501         try {
502             rwlock.writer.lock();
503             member.setState(GossipState.UP);
504             if (!liveMembers.contains(member)) {
505                 liveMembers.add(member);
506             }
507             if (candidateMembers.containsKey(member)) {
508                 candidateMembers.remove(member);
509             }
510             if (deadMembers.contains(member)) {
511                 deadMembers.remove(member);
512                 logInfo("up ~~");
513                  if (!member.opEquals(getSelf())) {
514                     fireGossipEvent(member, GossipState.UP);
515                 }
516             }
517            
518         } catch (Exception e) {
519             logError(e.msg);
520         } finally {
521             rwlock.writer.unlock();
522         }
523 
524     }
525 
526     private void downing(GossipMember member, HeartbeatState state) {
527         logInfo("downing ~~");
528         try {
529             if (candidateMembers.containsKey(member)) {
530                 CandidateMemberState cState = candidateMembers.get(member);
531                 if (state.getHeartbeatTime() == cState.getHeartbeatTime()) {
532                     cState.updateCount();
533                 } else if (state.getHeartbeatTime() > cState.getHeartbeatTime()) {
534                     candidateMembers.remove(member);
535                 }
536             } else {
537                 candidateMembers.put(member, new CandidateMemberState(state.getHeartbeatTime()));
538             }
539         } catch (Exception e) {
540             logError(e.msg);
541         }
542     }
543 
544     private void checkCandidate() {
545         GossipMember[] keys;
546         foreach(GossipMember k, CandidateMemberState v; candidateMembers)
547         {
548             keys ~= k;
549         }
550         // Set!(GossipMember) keys = candidateMembers.keySet();
551         foreach(GossipMember m ; keys) {
552             if (candidateMembers.get(m).getDowningCount() >= convergenceCount()) {
553                 down(m);
554                 candidateMembers.remove(m);
555             }
556         }
557     }
558 
559 
560     public void shutdown() {
561         getSettings().getMsgService().stop();
562         doGossipExecutor.shutdown();
563         try {
564             import core.thread;
565             Thread.sleep(dur!("msecs")(getSettings().getGossipInterval()));
566         } catch (InterruptedException e) {
567             throw new RuntimeException(e);
568         }
569         Buffer buffer = encodeShutdownMessage();
570         for (int i = 0; i < getLiveMembers().size(); i++) {
571             sendGossip(buffer, getLiveMembers(), i);
572         }
573         _isWorking = false;
574     }
575 
576 }