什么是负载均衡 负载平衡(Load balancing)是一种计算机技术,用来在多个计算机(计算机集群)、网络连接、CPU、磁盘驱动器或其他资源中分配负载,以达到最优化资源使用、最大化吞吐率、最小化响应时间、同时避免过载的目的。 使用带有负载平衡的多个服务器组件,取代单一的组件,可以通过冗余提高可靠性。负载平衡服务通常是由专用软件和硬件来完成。 主要作用是将大量作业合理地分摊到多个操作单元上进行执行,用于解决互联网架构中的高并发和高可用的问题。负载均衡
常用负载均衡策略
轮询是指将请求轮流分配给每台服务器,当服务器群中各服务器的处理能力相同时,且每笔业务处理量差异不大时,最适合使用这种算法。但现实中不能保证每台服务器性能均相近,此时出现的就是加权轮询,将请求按权重分配进行轮询以调控每台服务器的负载。
按权重设置随机概率,调用量较少的时候在一个截面上碰撞的概率高(请求集中在一台服务器上),但调用量越大分配越均匀(越接近设定的权重比)。
最少活跃指服务提供者当前任务数(当前的活跃调用数)最少;加权则是在其过程中进行按权重分配,如活跃数相同的两个服务提供者,则按权(随机)分配。
将请求按一定规则哈希将请求分配到对应的服务器上;一致哈希是一种特殊的哈希算法。 在使用一致哈希算法后,哈希表槽位数(大小)的改变平均只需要对 K/n 个关键字重新映射,其中 K 是关键字的数量, n是槽位数量。然而在传统的哈希表中,添加或删除一个槽位的几乎需要对所有关键字进行重新映射。
根据服务提供者的响应速度来选择。
负载均衡的实现 Dubbo Dubbo的负载均衡代码在dubbo-cluster下com.alibaba.dubbo.rpc.cluster.loadbalance包中(Dubbo 2.6.6)。实现了四种负载均衡策略:一致性哈希,最少活跃,随机,加权轮询。
权重 dubbo服务在预热期间权重是计算出来的,这个值随着启动时间越长会越接近自定义的权重值,预热期过后才等于自定义权重值。
1 2 3 4 5 6 static int calculateWarmupWeight(int uptime, int warmup, int weight) { // 计算权重,下面代码逻辑上形似于 (uptime / warmup) * weight。 // 随着服务运行时间 uptime 增大,权重计算值 ww 会慢慢接近配置值 weight int ww = (int) ((float) uptime / ((float) warmup / (float) weight)); return ww < 1 ? 1 : (ww > weight ? weight : ww); }
一致性哈希 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private static final class ConsistentHashSelector<T> { // 虚拟节点 private final TreeMap<Long, Invoker<T>> virtualInvokers; // 虚拟节点数量 private final int replicaNumber; // 用于检查实际节点是否有变化(上下线等) private final int identityHashCode; // 用于参数定位 private final int[] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); // 获取虚拟节点数,默认为160 this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); // 获取参与 hash 计算的参数下标值,默认对第一个参数进行 hash 运算 String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } // 分散到环上 for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress(); for (int i = 0; i < replicaNumber / 4; i++) { // 对 address + i 进行 md5 运算,得到一个长度为16的字节数组 byte[] digest = md5(address + i); // 对 digest 每4字节进行hash for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } ... private Invoker<T> selectForKey(long hash) { // long hash是传入参数md5的hash // 查找第一个节点值大于或等于当前 hash 的 Invoker Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry(); // null说明在末尾,则取首个节点 if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); } ... }
加权最少活跃 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // 最小的活跃数值(最小的任务数量) int leastActive = -1; // 具有相同最小活跃数的服务者提供者数量 int leastCount = 0; //具有相同最小活跃数的提供者在列表中的下标 int[] leastIndexs = new int[length]; // 所有最小活跃的提供者的权重和 int totalWeight = 0; // 第一个最小活跃数的提供者权重值 int firstWeight = 0; // 多个最小活跃时,每个权重是否相等 boolean sameWeight = true; for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 获取权重,如果是预热期则是预热权重 int afterWarmup = getWeight(invoker, invocation); // 发现比当前有更小活跃的提供者,重置记录值 if (leastActive == -1 || active < leastActive) { leastActive = active; leastCount = 1; leastIndexs[0] = i; totalWeight = afterWarmup; firstWeight = afterWarmup; sameWeight = true; // 发现和当前最小活跃数相等的提供者,更新记录值 } else if (active == leastActive) { leastIndexs[leastCount++] = i; totalWeight += afterWarmup; if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } // 刚好仅有一个最小活跃 if (leastCount == 1) { return invokers.get(leastIndexs[0]); } // 如果有多个最小活跃,且权重不相等,则按权分配 if (!sameWeight && totalWeight > 0) { int offsetWeight = random.nextInt(totalWeight) + 1; for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight <= 0) return invokers.get(leastIndex); } } // 多个最小活跃权重相等,随机返回一个 return invokers.get(leastIndexs[random.nextInt(leastCount)]); }
随机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); int totalWeight = 0; // 所有服务提供者的权重是否相等 boolean sameWeight = true; // 计算总权重,并判断所有服务提供者的权重是否相等 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; } } // 权重不均等按权随机 if (totalWeight > 0 && !sameWeight) { int offset = random.nextInt(totalWeight); for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // 权重均等时随机返回一个 return invokers.get(random.nextInt(length)); }
加权轮询 这里加权轮询算法实现类似nginx的加权轮询,对于每个节点有两个属性,我们配置的weight和用于轮询的current weight。current weight初始为0。对于每次请求:
遍历集群中的所有可用提供者,使每个提供者的currentWeight += weight, 并统计出权重总和 totalWeight。
选取currentWeight最大的提供者处理本次请求。
对于本次选定的提供者进行currentWeight -= totalWeight。
来看一个权重配置为[A: 3, B: 2, C: 1]的例子。初始currentWeight数组为[0, 0, 0],totalWeight = 6
请求编号
currentWeight 数组
选择结果
选择结果后currentWeight 数组
1
[3, 2, 1]
A
[-3, 2, 1]
2
[0, 4, 2]
B
[0, -2, 2]
3
[3, 0, 3]
A
[-3, 0, 3]
4
[0, 2, 4]
C
[0, 2, -2]
5
[3, 4, -1]
B
[3, -2, -1]
6
[6, 0, 0]
A
[0, 0, 0]
这种策略避免了同一时间有大量请求集中到同一台机器上并实现轮询。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 protected static class WeightedRoundRobin { private int weight; // currentWeight private AtomicLong current = new AtomicLong(0); // 更新currentWeight时间戳 private long lastUpdate; public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; current.set(0); } // currentWeight += weight public long increaseCurrent() { return current.addAndGet(weight); } // currentWeight -= totalWeight public void sel(int total) { current.addAndGet(-1 * total); } public long getLastUpdate() { return lastUpdate; } public void setLastUpdate(long lastUpdate) { this.lastUpdate = lastUpdate; } } // {"服务名.方法名": {"url(服务提供者)": WeightedRoundRobin}} private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>(); // 更新锁,用于移除过期节点 private AtomicBoolean updateLock = new AtomicBoolean(); protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 第一层索引 "服务名.方法名" String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); // 获取{"url(服务提供者)": WeightedRoundRobin} 不存在就创建 ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map == null) { methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>()); map = methodWeightMap.get(key); } int totalWeight = 0; // 当前currentWeight最大值 long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Invoker<T> selectedInvoker = null; WeightedRoundRobin selectedWRR = null; // 遍历服务提供者 currentWeight+=weight 统计totalWeight 查找currentWeight最大的节点 for (Invoker<T> invoker : invokers) { // 根据服务提供者找到对应的WeightedRoundRobin String identifyString = invoker.getUrl().toIdentityString(); WeightedRoundRobin weightedRoundRobin = map.get(identifyString); // 获取权重(带预热机制) int weight = getWeight(invoker, invocation); if (weight < 0) { weight = 0; } // 如果不存在对应的weightedRoundRobin则新建 if (weightedRoundRobin == null) { weightedRoundRobin = new WeightedRoundRobin(); weightedRoundRobin.setWeight(weight); map.putIfAbsent(identifyString, weightedRoundRobin); weightedRoundRobin = map.get(identifyString); } // weight有更新,则重置weight与currentWeight if (weight != weightedRoundRobin.getWeight()) { //weight changed weightedRoundRobin.setWeight(weight); } // currentWeight += weight long cur = weightedRoundRobin.increaseCurrent(); weightedRoundRobin.setLastUpdate(now); // 如果大于当前currentWeight节点,则替换 if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } totalWeight += weight; } // for end // 服务提供者数量与历史数量不一致,需移除过期的节点 if (!updateLock.get() && invokers.size() != map.size()) { if (updateLock.compareAndSet(false, true)) { try { // copy -> modify -> update reference ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>(); newMap.putAll(map); Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator(); while (it.hasNext()) { Entry<String, WeightedRoundRobin> item = it.next(); if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) { it.remove(); } } methodWeightMap.put(key, newMap); } finally { updateLock.set(false); } } } // currentWeight -= totalWeight if (selectedInvoker != null) { selectedWRR.sel(totalWeight); return selectedInvoker; } // should not happen here return invokers.get(0); }
Spring Cloud Ribbon ribbon负载均衡组件
Rule 决定采用哪个服务提供者。
Ping 用于确认服务提供者存活。
ServerList 静态或动态的地址列表,如果是动态则后台会定时刷新过滤该列表。
使用配置文件配置
<clientName>.ribbon.NFLoadBalancerClassName: Should implement ILoadBalancer
<clientName>.ribbon.NFLoadBalancerRuleClassName: Should implement IRule
<clientName>.ribbon.NFLoadBalancerPingClassName: Should implement IPing
<clientName>.ribbon.NIWSServerListClassName: Should implement ServerList
<clientName>.ribbon.NIWSServerListFilterClassName: Should implement ServerListFilter
示例: 为user服务设定负载均衡策略:
1 2 3 4 users: ribbon: NIWSServerListClassName: com.netflix.loadbalancer.ConfigurationBasedServerList NFLoadBalancerRuleClassName: com.netflix.loadbalancer.WeightedResponseTimeRule
负载均衡策略 RoundRobinRule(轮询) 简单轮询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 //下一个服务提供者在列表中的位置 private AtomicInteger nextServerCyclicCounter; public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } // 选取的服务提供者 Server server = null; // 重试计数 int count = 0; // 轮询 while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); // 所有服务提供者 List<Server> allServers = lb.getAllServers(); // 可用的服务提供者数量 int upCount = reachableServers.size(); // 所有服务提供者数量 int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } // nextServerIndex = (nextServerIndex + 1) % total int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); // 如果提供者因某些原因(如down)从列表中移除了,则让步一段时间后再重新选取 if (server == null) { /* Transient. */ Thread.yield(); continue; } // 服务提供者存活且可提供服务则返回 if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } // 重试次数过多 if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; }
AvailabilityFilteringRule(可用服务轮询) 在简单轮询策略基础上,对服务提供者做可用性过滤:
在默认情况下,这台服务器如果3次连接失败,这台服务器就会被设置为“短路”状态。短路状态将持续30秒,如果再次连接失败,短路的持续时间就会几何级地增加。
并发数过高的服务器。如果一个服务器的并发连接数过高(默认Integer.MAX_INT),配置了AvailabilityFilteringRule规则的客户端也会将其忽略。
相关配置:
1 2 3 4 5 6 7 8 # 重试次数 niws.loadbalancer.<clientName>.connectionFailureCountThreshold # 短路状态持续时长 niws.loadbalancer.<clientName>.circuitTripMaxTimeoutSeconds # 并发连接数 <clientName>.<clientConfigNameSpace>.ActiveConnectionsLimit
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule { //默认简单轮询 RoundRobinRule roundRobinRule = new RoundRobinRule(); @Override public void initWithNiwsConfig(IClientConfig clientConfig) { roundRobinRule = new RoundRobinRule(); } @Override public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); roundRobinRule.setLoadBalancer(lb); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule { // 提供了一些过滤方法的抽象类,基于google guava public abstract AbstractServerPredicate getPredicate(); @Override public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); // 遍历所有服务提供者并过滤出符合条件的进行轮询选取(先过滤,再选取) Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class AvailabilityFilteringRule extends PredicateBasedRule { public AvailabilityFilteringRule() { super(); // 判断条件 CompositePredicate 组合过滤条件 // AvailabilityPredicate类 实际上这里的判断条件 predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null)) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); } @Override // 这里相比PredicateBasedRule会少一次服务器列表遍历 public Server choose(Object key) { // 重试次数 int count = 0; // 直接轮询选取 Server server = roundRobinRule.choose(key); while (count++ <= 10) { // 符合条件则返回 if (predicate.apply(new PredicateKey(server))) { return server; } server = roundRobinRule.choose(key); } return super.choose(key); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class AvailabilityPredicate extends AbstractServerPredicate { ... @Override public boolean apply(@Nullable PredicateKey input) { LoadBalancerStats stats = getLBStats(); if (stats == null) { return true; } return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); } private boolean shouldSkipServer(ServerStats stats) { //短路状态判断以及并发数量判断 if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true; } return false; } }
WeightedResponseTimeRule(响应时长作为权重的随机选择) 为每一个服务器赋予一个权重值。服务器响应时间越长,这个服务器的权重就越小。这个规则会随机选择服务器,这个权重值会影响服务器的选择。权重定时刷新(默认30s)。 对于每次请求:
遍历服务提供者列表,计算出平均响应时间总和totalResponseTime += avgResponseTime
再次遍历服务提供者列表,获得当前提供者的权重weight = totalResponseTime - avgResponseTime,并记录下列表累计权重和用于随机accumulatedWeights[i] = accumulatedWeights[i - 1] + weight
生成随机数并进行选取,失败的情况下会回退至简单轮询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 public class WeightedResponseTimeRule extends RoundRobinRule { ... // 累积权重存放,[0]存放了服务器[0]的权重,[1]存放了服务器[0]到[1]的权重和 // [n]则存放了服务器[0]到[n]的权重和 private volatile List<Double> accumulatedWeights = new ArrayList<Double>(); // 更新权重用的Timer protected Timer serverWeightTimer = null; // 更新权重锁 protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false); ... // 初始化维护权重任务 void initialize(ILoadBalancer lb) { if (serverWeightTimer != null) { serverWeightTimer.cancel(); } serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + name, true); // 启动维护权重任务 // DynamicServerWeightTask里执行了 new ServerWeight().maintainWeights() // serverWeightTaskTimerInterval 默认30 * 1000 serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval); // 初始化权重值 ServerWeight sw = new ServerWeight(); sw.maintainWeights(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { logger .info("Stopping NFLoadBalancer-serverWeightTimer-" + name); serverWeightTimer.cancel(); } })); } class ServerWeight { // 维护权重 public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } // 获取锁 if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { // no statistics, nothing to do return; } // 所有服务提供者的响应时间和 double totalResponseTime = 0; // 获取所有提供者平均响应时间和 for (Server server : nlb.getAllServers()) { // this will automatically load the stats if not in cache ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } // 累积权重 Double weightSoFar = 0.0; // create new list and hot swap the reference // 累积权重存放 accumulatedWeights List<Double> finalWeights = new ArrayList<Double>(); // 获取提供者的权重以及累积权重 for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); double weight = totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); } } } @Override public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { // get hold of the current reference in case it is changed from the other thread List<Double> currentWeights = accumulatedWeights; if (Thread.interrupted()) { return null; } List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0; // 获取所有服务器权重和 double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); // 权重值过期或未初始化时回退至简单轮询 if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) { server = super.choose(getLoadBalancer(), key); if(server == null) { return server; } } else { // 生成随机数并进行选取服务提供者 // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive) double randomWeight = random.nextDouble() * maxTotalWeight; // pick the server index based on the randomIndex int n = 0; for (Double d : currentWeights) { if (d >= randomWeight) { serverIndex = n; break; } else { n++; } } server = allList.get(serverIndex); } if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Next. server = null; } return server; } ... }
wiki上提到的规则是以上三种,下面是wiki上并未提及但Ribbon已实现的一些规则。
BestAvailableRule(最佳可用) 忽略短路的提供者,然后选择并发数(最少活跃)的提供者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule { ... @Override public Server choose(Object key) { if (loadBalancerStats == null) { return super.choose(key); } List<Server> serverList = getLoadBalancer().getAllServers(); // 最少活跃数 int minimalConcurrentConnections = Integer.MAX_VALUE; long currentTime = System.currentTimeMillis(); Server chosen = null; for (Server server: serverList) { ServerStats serverStats = loadBalancerStats.getSingleServerStat(server); // 选择未短路的服务提供者进行比较 if (!serverStats.isCircuitBreakerTripped(currentTime)) { // 比较当前活跃数 如果小则替换 int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); if (concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server; } } } if (chosen == null) { return super.choose(key); } else { return chosen; } } ... }
RandomRule(随机) 请勿使用,见github issue链接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { if (Thread.interrupted()) { return null; } // 可用服务提供者列表 List<Server> upList = lb.getReachableServers(); List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */ return null; } // index = ThreadLocalRandom.current().nextInt(serverCount) int index = chooseRandomInt(serverCount); // upList获取可用实例,越界警告 server = upList.get(index); if (server == null) { /* * The only time this should happen is if the server list were * somehow trimmed. This is a transient condition. Retry after * yielding. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Shouldn't actually happen.. but must be transient or a bug. server = null; Thread.yield(); } return server; }
请勿使用该规则,在获取提供者时server = upList.get(index);其中upList是可用提供者列表,而index是由所有提供者数量生成的随机数,这可能会产生越界错误(当有server处于非UP时),去github上查了下这个bug一直没修:RandomRule incorrect index bug
RetryRule 重试机制,需配合其他规则使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class RetryRule extends AbstractLoadBalancerRule { // 默认规则轮询,可设 IRule subRule = new RoundRobinRule(); // 默认重试时长 long maxRetryMillis = 500; ... public Server choose(ILoadBalancer lb, Object key) { // 请求时间 long requestTime = System.currentTimeMillis(); // 截至时间 long deadline = requestTime + maxRetryMillis; Server answer = null; // 根据子规则选取服务提供者 answer = subRule.choose(key); // 时间不超过deadline时,如选取失败,进行重试 if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { // 设定一个定时任务来执行中断以在超时时退出选取 InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); while (!Thread.interrupted()) { answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { /* pause and retry hoping it's transient */ Thread.yield(); } else { // 选取成功 break; } } // 取消该定时器 task.cancel(); } if ((answer == null) || (!answer.isAlive())) { return null; } else { return answer; } }
ZoneAvoidanceRule(区域过滤轮询) ZoneAvoidanceRule继承了PredicateBasedRule(见AvailabilityFilteringRule),因为他没有重写choose方法,所以他只是在新定义的规则上对所有服务提供者进行过滤再轮询。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class ZoneAvoidanceRule extends PredicateBasedRule { // 过滤条件 private CompositePredicate compositePredicate; public ZoneAvoidanceRule() { super(); ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this); AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this); compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); } private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) { // ZoneAvoidancePredicate为主,AvailabilityPredicate为副的组合过滤条件 return CompositePredicate.withPredicates(p1, p2) .addFallbackPredicate(p2) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); } ... }
参考链接