1 // Copyright (C) 2018-2019 HuntLabs. All rights reserved. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 module hunt.gossip.GossipManager; 16 17 import hunt.gossip.Buffer; 18 import std.json; 19 20 import hunt.logging; 21 import hunt.gossip.event.GossipListener; 22 import hunt.gossip.model.Ack2Message; 23 import hunt.gossip.model.AckMessage; 24 import hunt.gossip.model.CandidateMemberState; 25 import hunt.gossip.model.GossipDigest; 26 import hunt.gossip.model.GossipMember; 27 import hunt.gossip.model.GossipState; 28 import hunt.gossip.model.HeartbeatState; 29 import hunt.gossip.model.MessageType; 30 import hunt.gossip.model.SeedMember; 31 import hunt.gossip.GossipMessageFactory; 32 33 // import java.net.UnknownHostException; 34 import hunt.collection.ArrayList; 35 import hunt.collection.Collections; 36 import hunt.collection.List; 37 import hunt.collection.Map; 38 import std.random; 39 import hunt.collection.Set; 40 import hunt.collection.HashMap; 41 import hunt.concurrency.Executors; 42 import hunt.concurrency.ScheduledExecutorService; 43 import hunt.concurrency.ScheduledThreadPoolExecutor; 44 import hunt.util.DateTime; 45 // import hunt.concurrency.locks.ReentrantReadWriteLock; 46 import core.sync.rwmutex; 47 import hunt.gossip.JsonObject; 48 import hunt.gossip.GossipSettings; 49 import hunt.Integer; 50 import hunt.util.Common; 51 import hunt.gossip.Serializer; 52 import hunt.util.DateTime; 53 import std.conv; 54 import hunt.Exceptions; 55 import core.time; 56 57 public class GossipManager { 58 // private static final Logger LOGGER = LoggerFactory.getLogger(GossipManager.class); 59 private static GossipManager instance; 60 private long executeGossipTime = 500; 61 private bool _isWorking = false; 62 // private ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock(); 63 private ReadWriteMutex rwlock; 64 private ScheduledThreadPoolExecutor doGossipExecutor; 65 // private ScheduledExecutorService clearExecutor = Executors.newSingleThreadScheduledExecutor(); 66 67 private Map!(GossipMember, HeartbeatState) endpointMembers ; 68 private List!(GossipMember) liveMembers; 69 private List!(GossipMember) deadMembers; 70 private Map!(GossipMember, CandidateMemberState) candidateMembers; 71 private GossipSettings settings; 72 private GossipMember localGossipMember; 73 private string cluster; 74 private GossipListener listener; 75 private Random random; 76 77 private this() { 78 rwlock = new ReadWriteMutex(); 79 doGossipExecutor = cast(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1); 80 endpointMembers = new HashMap!(GossipMember, HeartbeatState)(); 81 liveMembers = new ArrayList!(GossipMember)(); 82 deadMembers = new ArrayList!(GossipMember)(); 83 candidateMembers = new HashMap!(GossipMember, CandidateMemberState)(); 84 } 85 86 public static GossipManager getInstance() { 87 if(instance is null) 88 { 89 instance = new GossipManager(); 90 } 91 return instance; 92 } 93 94 public void init(string cluster, string ipAddress, Integer port, string id, List!(SeedMember) seedMembers, GossipSettings settings, GossipListener listener) { 95 this.cluster = cluster; 96 this.localGossipMember = new GossipMember(); 97 this.localGossipMember.setCluster(cluster); 98 this.localGossipMember.setIpAddress(ipAddress); 99 this.localGossipMember.setPort(port); 100 this.localGossipMember.setId(id); 101 this.localGossipMember.setState(GossipState.JOIN); 102 this.endpointMembers.put(localGossipMember, new HeartbeatState()); 103 this.listener = listener; 104 this.settings = settings; 105 this.settings.setSeedMembers(seedMembers); 106 fireGossipEvent(localGossipMember, GossipState.JOIN); 107 } 108 109 public void start() { 110 logInfof("Starting gossip! cluster[%s] ip[%s] port[%d] id[%s]", localGossipMember.getCluster(), localGossipMember.getIpAddress(), localGossipMember.getPort().intValue(), localGossipMember.getId()); 111 _isWorking = true; 112 settings.getMsgService().listen(getSelf().getIpAddress(), getSelf().getPort().intValue); 113 doGossipExecutor.scheduleAtFixedRate(new GossipTask(), msecs(settings.getGossipInterval()), msecs(settings.getGossipInterval())); 114 settings.getMsgService().start(); 115 } 116 117 public List!(GossipMember) getLiveMembers() { 118 return liveMembers; 119 } 120 121 public List!(GossipMember) getDeadMembers() { 122 return deadMembers; 123 } 124 125 public GossipSettings getSettings() { 126 return settings; 127 } 128 129 public GossipMember getSelf() { 130 return localGossipMember; 131 } 132 133 public string getID() { 134 return getSelf().getId(); 135 } 136 137 public bool isWorking() { 138 return _isWorking; 139 } 140 141 public Map!(GossipMember, HeartbeatState) getEndpointMembers() { 142 return endpointMembers; 143 } 144 145 public string getCluster() { 146 return cluster; 147 } 148 149 private void randomGossipDigest(List!(GossipDigest) digests) /* throws UnknownHostException */ { 150 GossipMember[] gms; 151 foreach(GossipMember k , HeartbeatState v ; endpointMembers) 152 { 153 gms ~= k; 154 } 155 // List!(GossipMember) endpoints = new ArrayList!(GossipMember)(gms); 156 randomShuffle(gms); 157 foreach(GossipMember ep ; gms) { 158 HeartbeatState hb = endpointMembers.get(ep); 159 long hbTime = 0; 160 long hbVersion = 0; 161 if (hb !is null) { 162 hbTime = hb.getHeartbeatTime(); 163 hbVersion = hb.getVersion(); 164 } 165 digests.add(new GossipDigest(ep, hbTime, hbVersion)); 166 } 167 } 168 169 class GossipTask : Runnable { 170 171 override 172 public void run() { 173 //Update local member version 174 long newversion = endpointMembers.get(getSelf()).updateVersion(); 175 if (isDiscoverable(getSelf())) { 176 up(getSelf()); 177 } 178 version(HUNT_DEBUG) { 179 trace("sync data"); 180 tracef("Now my heartbeat version is %ld", newversion); 181 } 182 183 List!(GossipDigest) digests = new ArrayList!(GossipDigest)(); 184 try { 185 randomGossipDigest(digests); 186 if (digests.size() > 0) { 187 Buffer syncMessageBuffer = encodeSyncMessage(digests); 188 //step 1. goosip to a random live member 189 bool b = gossip2LiveMember(syncMessageBuffer); 190 191 //step 2. goosip to a random dead memeber 192 gossip2UndiscoverableMember(syncMessageBuffer); 193 194 //step3. 195 if (!b || liveMembers.size() <= settings.getSeedMembers().size()) { 196 gossip2Seed(syncMessageBuffer); 197 } 198 199 } 200 checkStatus(); 201 version(HUNT_DEBUG) { 202 trace("live member : " ~ getLiveMembers().toString); 203 trace("dead member : " ~ getDeadMembers().toString); 204 trace("endpoint : " ~ getEndpointMembers().toString); 205 } 206 } catch (Throwable e) { 207 logError(e.msg); 208 } 209 210 } 211 } 212 213 private Buffer encodeSyncMessage(List!(GossipDigest) digests) { 214 Buffer buffer = Buffer.buffer(); 215 JSONValue[] array ; 216 foreach(GossipDigest e ; digests) { 217 array ~= e.encode()/* JSONValue(Serializer.getInstance().encode!(GossipDigest)(e).toString()) */; 218 } 219 if(buffer !is null) 220 buffer.appendString(GossipMessageFactory.getInstance().makeMessage(MessageType.SYNC_MESSAGE, JSONValue(array).toString, getCluster(), getSelf().ipAndPort()).encode()); 221 return buffer; 222 } 223 224 public Buffer encodeAckMessage(AckMessage ackMessage) { 225 Buffer buffer = Buffer.buffer(); 226 JsonObject ackJson = JsonObject.mapFrom(ackMessage); 227 buffer.appendString(GossipMessageFactory.getInstance().makeMessage(MessageType.ACK_MESSAGE, ackJson.encode(), getCluster(), getSelf().ipAndPort()).encode()); 228 return buffer; 229 } 230 231 public Buffer encodeAck2Message(Ack2Message ack2Message) { 232 Buffer buffer = Buffer.buffer(); 233 JsonObject ack2Json = JsonObject.mapFrom(ack2Message); 234 buffer.appendString(GossipMessageFactory.getInstance().makeMessage(MessageType.ACK2_MESSAGE, ack2Json.encode(), getCluster(), getSelf().ipAndPort()).encode()); 235 return buffer; 236 } 237 238 private Buffer encodeShutdownMessage() { 239 Buffer buffer = Buffer.buffer(); 240 JsonObject self = JsonObject.mapFrom(getSelf()); 241 buffer.appendString(GossipMessageFactory.getInstance().makeMessage(MessageType.SHUTDOWN, self.encode(), getCluster(), getSelf().ipAndPort()).encode()); 242 return buffer; 243 } 244 245 public void apply2LocalState(Map!(GossipMember, HeartbeatState) endpointMembers) { 246 GossipMember[] keys; 247 foreach(GossipMember k, HeartbeatState v; endpointMembers) 248 { 249 keys ~= k; 250 } 251 // Set!(GossipMember) keys = endpointMembers.keySet(); 252 foreach(GossipMember m ; keys) { 253 if (getSelf().opEquals(m)) { 254 continue; 255 } 256 257 try { 258 HeartbeatState localState = getEndpointMembers().get(m); 259 HeartbeatState remoteState = endpointMembers.get(m); 260 261 if (localState !is null) { 262 long localHeartbeatTime = localState.getHeartbeatTime(); 263 long remoteHeartbeatTime = remoteState.getHeartbeatTime(); 264 if (remoteHeartbeatTime > localHeartbeatTime) { 265 remoteStateReplaceLocalState(m, remoteState); 266 } else if (remoteHeartbeatTime == localHeartbeatTime) { 267 long localVersion = localState.getVersion(); 268 long remoteVersion = remoteState.getVersion(); 269 if (remoteVersion > localVersion) { 270 remoteStateReplaceLocalState(m, remoteState); 271 } 272 } 273 } else { 274 remoteStateReplaceLocalState(m, remoteState); 275 } 276 } catch (Exception e) { 277 logError(e.msg); 278 } 279 } 280 } 281 282 private void remoteStateReplaceLocalState(GossipMember member, HeartbeatState remoteState) { 283 if (member.getState() == GossipState.UP) { 284 up(member); 285 } 286 if (member.getState() == GossipState.DOWN) { 287 down(member); 288 } 289 if (endpointMembers.containsKey(member)) { 290 endpointMembers.remove(member); 291 } 292 endpointMembers.put(member, remoteState); 293 } 294 295 public GossipMember createByDigest(GossipDigest digest) { 296 GossipMember member = new GossipMember(); 297 member.setPort(new Integer(digest.getEndpoint().getPort())); 298 member.setIpAddress(digest.getEndpoint().getIp()); 299 member.setCluster(cluster); 300 GossipMember[] keys; 301 auto em = getEndpointMembers(); 302 foreach(GossipMember k, HeartbeatState v; em) 303 { 304 keys ~= k; 305 } 306 // Set!(GossipMember) keys = getEndpointMembers().keySet(); 307 foreach(GossipMember m ; keys) { 308 if (m.opEquals(member)) { 309 member.setId(m.getId()); 310 member.setState(m.getState()); 311 break; 312 } 313 } 314 315 return member; 316 } 317 318 /** 319 * send sync message to a live member 320 * 321 * @param buffer sync data 322 * @return if send to a seed member then return TURE 323 */ 324 private bool gossip2LiveMember(Buffer buffer) { 325 int liveSize = liveMembers.size(); 326 if (liveSize <= 0) { 327 return false; 328 } 329 int index = (liveSize == 1) ? 0 : uniform(0,liveSize); 330 return sendGossip(buffer, liveMembers, index); 331 } 332 333 /** 334 * send sync message to a dead member 335 * 336 * @param buffer sync data 337 */ 338 private void gossip2UndiscoverableMember(Buffer buffer) { 339 int deadSize = deadMembers.size(); 340 if (deadSize <= 0) { 341 return; 342 } 343 int index = (deadSize == 1) ? 0 : uniform(0,deadSize); 344 sendGossip(buffer, deadMembers, index); 345 } 346 347 private void gossip2Seed(Buffer buffer) { 348 int size = settings.getSeedMembers().size(); 349 if (size > 0) { 350 if (size == 1 && settings.getSeedMembers().contains(gossipMember2SeedMember(getSelf()))) { 351 return; 352 } 353 int index = (size == 1) ? 0 : uniform(0,size); 354 // logInfo("size : ",size," index : ",index); 355 if (liveMembers.size() == 1) { 356 sendGossip2Seed(buffer, settings.getSeedMembers(), index); 357 } else { 358 double prob = size / /* Double.valueOf */(liveMembers.size())*1.0; 359 if (uniform(0.0f, 1.0f) < prob) { 360 sendGossip2Seed(buffer, settings.getSeedMembers(), index); 361 } 362 } 363 } 364 } 365 366 private bool sendGossip(Buffer buffer, List!(GossipMember) members, int index) { 367 if (buffer !is null && index >= 0) { 368 try { 369 GossipMember target = members.get(index); 370 if (target.opEquals(getSelf())) { 371 int m_size = members.size(); 372 if (m_size == 1) { 373 return false; 374 } else { 375 target = members.get((index + 1) % m_size); 376 } 377 } 378 settings.getMsgService().sendMsg(target.getIpAddress(), target.getPort(), buffer); 379 return settings.getSeedMembers().contains(gossipMember2SeedMember(target)); 380 } catch (Exception e) { 381 logError(e.msg); 382 } 383 } 384 return false; 385 } 386 387 private bool sendGossip2Seed(Buffer buffer, List!(SeedMember) members, int index) { 388 if (buffer !is null && index >= 0) { 389 try { 390 SeedMember target = members.get(index); 391 int m_size = members.size(); 392 if (target.opEquals(getSelf())) { 393 if (m_size <= 1) { 394 return false; 395 } else { 396 target = members.get((index + 1) % m_size); 397 } 398 } 399 settings.getMsgService().sendMsg(target.getIpAddress(), target.getPort(), buffer); 400 return true; 401 } catch (Exception e) { 402 logError(e.msg); 403 } 404 } 405 return false; 406 } 407 408 private SeedMember gossipMember2SeedMember(GossipMember member) { 409 SeedMember seed = new SeedMember(member.getCluster(), member.getIpAddress(), member.getPort(), member.getId()); 410 return seed; 411 } 412 413 private void checkStatus() { 414 try { 415 GossipMember local = getSelf(); 416 Map!(GossipMember, HeartbeatState) endpoints = getEndpointMembers(); 417 GossipMember[] gms; 418 foreach(GossipMember k, HeartbeatState v; endpoints) 419 { 420 gms ~= k; 421 } 422 // Set!(GossipMember) epKeys = endpoints.keySet(); 423 foreach(GossipMember k ; gms) { 424 if (!k.opEquals(local)) { 425 HeartbeatState state = endpoints.get(k); 426 long now = DateTimeHelper.currentTimeMillis(); 427 long duration = now - state.getHeartbeatTime(); 428 long convictedTime = convictedTime(); 429 logInfo("check : " ~ k.toString() ~ " state : " ~ state.toString() ~ " duration : " ~ duration.to!string ~ " convictedTime : " ~ convictedTime.to!string); 430 if (duration > convictedTime && (isAlive(k) || getLiveMembers().contains(k))) { 431 downing(k, state); 432 } 433 if (duration <= convictedTime && (isDiscoverable(k) || getDeadMembers().contains(k))) { 434 up(k); 435 } 436 } 437 } 438 checkCandidate(); 439 } catch (Exception e) { 440 logError(e.msg); 441 } 442 } 443 444 private int convergenceCount() { 445 int size = getEndpointMembers().size(); 446 import std.math; 447 int count = cast(int) floor(log10(size) + log(size) + 1); 448 return count; 449 } 450 451 private long convictedTime() { 452 return ((convergenceCount() * (settings.getNetworkDelay() * 3 + executeGossipTime)) << 1) + settings.getGossipInterval(); 453 } 454 455 private bool isDiscoverable(GossipMember member) { 456 return member.getState() == GossipState.JOIN || member.getState() == GossipState.DOWN; 457 } 458 459 private bool isAlive(GossipMember member) { 460 return member.getState() == GossipState.UP; 461 } 462 463 public GossipListener getListener() { 464 return listener; 465 } 466 467 private void fireGossipEvent(GossipMember member, GossipState state) { 468 if (getListener() !is null) { 469 getListener().gossipEvent(member, state); 470 } 471 } 472 473 // private void clearMember(GossipMember member) { 474 // rwlock.writeLock().lock(); 475 // try { 476 // endpointMembers.remove(member); 477 // } finally { 478 // rwlock.writeLock().unlock(); 479 // } 480 // } 481 482 public void down(GossipMember member) { 483 logInfo("down ~~"); 484 try { 485 rwlock.writer.lock(); 486 member.setState(GossipState.DOWN); 487 liveMembers.remove(member); 488 if (!deadMembers.contains(member)) { 489 deadMembers.add(member); 490 } 491 // clearExecutor.schedule(() -> clearMember(member), getSettings().getDeleteThreshold() * getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); 492 fireGossipEvent(member, GossipState.DOWN); 493 } catch (Exception e) { 494 logError(e.msg); 495 } finally { 496 rwlock.writer.unlock(); 497 } 498 } 499 500 private void up(GossipMember member) { 501 try { 502 rwlock.writer.lock(); 503 member.setState(GossipState.UP); 504 if (!liveMembers.contains(member)) { 505 liveMembers.add(member); 506 } 507 if (candidateMembers.containsKey(member)) { 508 candidateMembers.remove(member); 509 } 510 if (deadMembers.contains(member)) { 511 deadMembers.remove(member); 512 logInfo("up ~~"); 513 if (!member.opEquals(getSelf())) { 514 fireGossipEvent(member, GossipState.UP); 515 } 516 } 517 518 } catch (Exception e) { 519 logError(e.msg); 520 } finally { 521 rwlock.writer.unlock(); 522 } 523 524 } 525 526 private void downing(GossipMember member, HeartbeatState state) { 527 logInfo("downing ~~"); 528 try { 529 if (candidateMembers.containsKey(member)) { 530 CandidateMemberState cState = candidateMembers.get(member); 531 if (state.getHeartbeatTime() == cState.getHeartbeatTime()) { 532 cState.updateCount(); 533 } else if (state.getHeartbeatTime() > cState.getHeartbeatTime()) { 534 candidateMembers.remove(member); 535 } 536 } else { 537 candidateMembers.put(member, new CandidateMemberState(state.getHeartbeatTime())); 538 } 539 } catch (Exception e) { 540 logError(e.msg); 541 } 542 } 543 544 private void checkCandidate() { 545 GossipMember[] keys; 546 foreach(GossipMember k, CandidateMemberState v; candidateMembers) 547 { 548 keys ~= k; 549 } 550 // Set!(GossipMember) keys = candidateMembers.keySet(); 551 foreach(GossipMember m ; keys) { 552 if (candidateMembers.get(m).getDowningCount() >= convergenceCount()) { 553 down(m); 554 candidateMembers.remove(m); 555 } 556 } 557 } 558 559 560 public void shutdown() { 561 getSettings().getMsgService().stop(); 562 doGossipExecutor.shutdown(); 563 try { 564 import core.thread; 565 Thread.sleep(dur!("msecs")(getSettings().getGossipInterval())); 566 } catch (InterruptedException e) { 567 throw new RuntimeException(e); 568 } 569 Buffer buffer = encodeShutdownMessage(); 570 for (int i = 0; i < getLiveMembers().size(); i++) { 571 sendGossip(buffer, getLiveMembers(), i); 572 } 573 _isWorking = false; 574 } 575 576 }