客户端负载均衡

什么是负载均衡

负载平衡(Load balancing)是一种计算机技术,用来在多个计算机(计算机集群)、网络连接、CPU、磁盘驱动器或其他资源中分配负载,以达到最优化资源使用、最大化吞吐率、最小化响应时间、同时避免过载的目的。 使用带有负载平衡的多个服务器组件,取代单一的组件,可以通过冗余提高可靠性。负载平衡服务通常是由专用软件和硬件来完成。 主要作用是将大量作业合理地分摊到多个操作单元上进行执行,用于解决互联网架构中的高并发和高可用的问题。负载均衡

常用负载均衡策略

  • 轮询/加权轮询

    轮询是指将请求轮流分配给每台服务器,当服务器群中各服务器的处理能力相同时,且每笔业务处理量差异不大时,最适合使用这种算法。但现实中不能保证每台服务器性能均相近,此时出现的就是加权轮询,将请求按权重分配进行轮询以调控每台服务器的负载。

  • 随机(按权重)

    按权重设置随机概率,调用量较少的时候在一个截面上碰撞的概率高(请求集中在一台服务器上),但调用量越大分配越均匀(越接近设定的权重比)。

  • 最少活跃/加权最少活跃

    最少活跃指服务提供者当前任务数(当前的活跃调用数)最少;加权则是在其过程中进行按权重分配,如活跃数相同的两个服务提供者,则按权(随机)分配。

  • 哈希/一致性哈希

    将请求按一定规则哈希将请求分配到对应的服务器上;一致哈希是一种特殊的哈希算法。
在使用一致哈希算法后,哈希表槽位数(大小)的改变平均只需要对 K/n 个关键字重新映射,其中 K 是关键字的数量, n是槽位数量。然而在传统的哈希表中,添加或删除一个槽位的几乎需要对所有关键字进行重新映射。

  • 响应速度

    根据服务提供者的响应速度来选择。

负载均衡的实现

Dubbo

Dubbo的负载均衡代码在dubbo-cluster下com.alibaba.dubbo.rpc.cluster.loadbalance包中(Dubbo 2.6.6)。实现了四种负载均衡策略:一致性哈希,最少活跃,随机,加权轮询。

权重

dubbo服务在预热期间权重是计算出来的,这个值随着启动时间越长会越接近自定义的权重值,预热期过后才等于自定义权重值。

1
2
3
4
5
6
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 计算权重,下面代码逻辑上形似于 (uptime / warmup) * weight。
// 随着服务运行时间 uptime 增大,权重计算值 ww 会慢慢接近配置值 weight
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
一致性哈希
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

private static final class ConsistentHashSelector<T> {
// 虚拟节点
private final TreeMap<Long, Invoker<T>> virtualInvokers;
// 虚拟节点数量
private final int replicaNumber;
// 用于检查实际节点是否有变化(上下线等)
private final int identityHashCode;
// 用于参数定位
private final int[] argumentIndex;

ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 获取虚拟节点数,默认为160
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
// 获取参与 hash 计算的参数下标值,默认对第一个参数进行 hash 运算
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 分散到环上
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
// 对 address + i 进行 md5 运算,得到一个长度为16的字节数组
byte[] digest = md5(address + i);
// 对 digest 每4字节进行hash
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}

...

private Invoker<T> selectForKey(long hash) {
// long hash是传入参数md5的hash
// 查找第一个节点值大于或等于当前 hash 的 Invoker
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
// null说明在末尾,则取首个节点
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}

...
}
加权最少活跃
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
// 最小的活跃数值(最小的任务数量)
int leastActive = -1;
// 具有相同最小活跃数的服务者提供者数量
int leastCount = 0;
//具有相同最小活跃数的提供者在列表中的下标
int[] leastIndexs = new int[length];
// 所有最小活跃的提供者的权重和
int totalWeight = 0;
// 第一个最小活跃数的提供者权重值
int firstWeight = 0;
// 多个最小活跃时,每个权重是否相等
boolean sameWeight = true;
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// 获取权重,如果是预热期则是预热权重
int afterWarmup = getWeight(invoker, invocation);
// 发现比当前有更小活跃的提供者,重置记录值
if (leastActive == -1 || active < leastActive) {
leastActive = active;
leastCount = 1;
leastIndexs[0] = i;
totalWeight = afterWarmup;
firstWeight = afterWarmup;
sameWeight = true;

// 发现和当前最小活跃数相等的提供者,更新记录值
} else if (active == leastActive) {
leastIndexs[leastCount++] = i;
totalWeight += afterWarmup;
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}

// 刚好仅有一个最小活跃
if (leastCount == 1) {
return invokers.get(leastIndexs[0]);
}
// 如果有多个最小活跃,且权重不相等,则按权分配
if (!sameWeight && totalWeight > 0) {
int offsetWeight = random.nextInt(totalWeight) + 1;
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
}
// 多个最小活跃权重相等,随机返回一个
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}
随机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
int totalWeight = 0;
// 所有服务提供者的权重是否相等
boolean sameWeight = true;
// 计算总权重,并判断所有服务提供者的权重是否相等
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight;
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
// 权重不均等按权随机
if (totalWeight > 0 && !sameWeight) {
int offset = random.nextInt(totalWeight);
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// 权重均等时随机返回一个
return invokers.get(random.nextInt(length));
}
加权轮询

这里加权轮询算法实现类似nginx的加权轮询,对于每个节点有两个属性,我们配置的weight和用于轮询的current weight。current weight初始为0。对于每次请求:

  1. 遍历集群中的所有可用提供者,使每个提供者的currentWeight += weight, 并统计出权重总和 totalWeight。
  2. 选取currentWeight最大的提供者处理本次请求。
  3. 对于本次选定的提供者进行currentWeight -= totalWeight。

来看一个权重配置为[A: 3, B: 2, C: 1]的例子。初始currentWeight数组为[0, 0, 0],totalWeight = 6

请求编号 currentWeight 数组 选择结果 选择结果后currentWeight 数组
1 [3, 2, 1] A [-3, 2, 1]
2 [0, 4, 2] B [0, -2, 2]
3 [3, 0, 3] A [-3, 0, 3]
4 [0, 2, 4] C [0, 2, -2]
5 [3, 4, -1] B [3, -2, -1]
6 [6, 0, 0] A [0, 0, 0]

这种策略避免了同一时间有大量请求集中到同一台机器上并实现轮询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
   protected static class WeightedRoundRobin {
private int weight;
// currentWeight
private AtomicLong current = new AtomicLong(0);
// 更新currentWeight时间戳
private long lastUpdate;
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
// currentWeight += weight
public long increaseCurrent() {
return current.addAndGet(weight);
}
// currentWeight -= totalWeight
public void sel(int total) {
current.addAndGet(-1 * total);
}
public long getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}

// {"服务名.方法名": {"url(服务提供者)": WeightedRoundRobin}}
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
// 更新锁,用于移除过期节点
private AtomicBoolean updateLock = new AtomicBoolean();


protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 第一层索引 "服务名.方法名"
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 获取{"url(服务提供者)": WeightedRoundRobin} 不存在就创建
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}

int totalWeight = 0;
// 当前currentWeight最大值
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;

// 遍历服务提供者 currentWeight+=weight 统计totalWeight 查找currentWeight最大的节点
for (Invoker<T> invoker : invokers) {
// 根据服务提供者找到对应的WeightedRoundRobin
String identifyString = invoker.getUrl().toIdentityString();
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);

// 获取权重(带预热机制)
int weight = getWeight(invoker, invocation);
if (weight < 0) {
weight = 0;
}

// 如果不存在对应的weightedRoundRobin则新建
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(identifyString, weightedRoundRobin);
weightedRoundRobin = map.get(identifyString);
}

// weight有更新,则重置weight与currentWeight
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
// currentWeight += weight
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
// 如果大于当前currentWeight节点,则替换
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
} // for end

// 服务提供者数量与历史数量不一致,需移除过期的节点
if (!updateLock.get() && invokers.size() != map.size()) {
if (updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
newMap.putAll(map);
Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
while (it.hasNext()) {
Entry<String, WeightedRoundRobin> item = it.next();
if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
it.remove();
}
}
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}

// currentWeight -= totalWeight
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
Spring Cloud Ribbon
ribbon负载均衡组件
  • Rule 决定采用哪个服务提供者。
  • Ping 用于确认服务提供者存活。
  • ServerList 静态或动态的地址列表,如果是动态则后台会定时刷新过滤该列表。
使用配置文件配置
  • <clientName>.ribbon.NFLoadBalancerClassName: Should implement ILoadBalancer
  • <clientName>.ribbon.NFLoadBalancerRuleClassName: Should implement IRule
  • <clientName>.ribbon.NFLoadBalancerPingClassName: Should implement IPing
  • <clientName>.ribbon.NIWSServerListClassName: Should implement ServerList
  • <clientName>.ribbon.NIWSServerListFilterClassName: Should implement ServerListFilter

示例: 为user服务设定负载均衡策略:

1
2
3
4
users:
ribbon:
NIWSServerListClassName: com.netflix.loadbalancer.ConfigurationBasedServerList
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.WeightedResponseTimeRule

负载均衡策略
RoundRobinRule(轮询)

简单轮询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//下一个服务提供者在列表中的位置
private AtomicInteger nextServerCyclicCounter;

public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
// 选取的服务提供者
Server server = null;
// 重试计数
int count = 0;
// 轮询
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
// 所有服务提供者
List<Server> allServers = lb.getAllServers();
// 可用的服务提供者数量
int upCount = reachableServers.size();
// 所有服务提供者数量
int serverCount = allServers.size();

if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}

// nextServerIndex = (nextServerIndex + 1) % total
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);

// 如果提供者因某些原因(如down)从列表中移除了,则让步一段时间后再重新选取
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}

// 服务提供者存活且可提供服务则返回
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}

// Next.
server = null;
}

// 重试次数过多
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
AvailabilityFilteringRule(可用服务轮询)

在简单轮询策略基础上,对服务提供者做可用性过滤:

  1. 在默认情况下,这台服务器如果3次连接失败,这台服务器就会被设置为“短路”状态。短路状态将持续30秒,如果再次连接失败,短路的持续时间就会几何级地增加。

  2. 并发数过高的服务器。如果一个服务器的并发连接数过高(默认Integer.MAX_INT),配置了AvailabilityFilteringRule规则的客户端也会将其忽略。

相关配置:

1
2
3
4
5
6
7
8
# 重试次数
niws.loadbalancer.<clientName>.connectionFailureCountThreshold

# 短路状态持续时长
niws.loadbalancer.<clientName>.circuitTripMaxTimeoutSeconds

# 并发连接数
<clientName>.<clientConfigNameSpace>.ActiveConnectionsLimit

AvailabilityFilteringRuleUML

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {
//默认简单轮询
RoundRobinRule roundRobinRule = new RoundRobinRule();

@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
roundRobinRule = new RoundRobinRule();
}

@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
roundRobinRule.setLoadBalancer(lb);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
// 提供了一些过滤方法的抽象类,基于google guava
public abstract AbstractServerPredicate getPredicate();

@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
// 遍历所有服务提供者并过滤出符合条件的进行轮询选取(先过滤,再选取)
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class AvailabilityFilteringRule extends PredicateBasedRule {   
public AvailabilityFilteringRule() {
super();
// 判断条件 CompositePredicate 组合过滤条件
// AvailabilityPredicate类 实际上这里的判断条件
predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}

@Override
// 这里相比PredicateBasedRule会少一次服务器列表遍历
public Server choose(Object key) {
// 重试次数
int count = 0;
// 直接轮询选取
Server server = roundRobinRule.choose(key);
while (count++ <= 10) {
// 符合条件则返回
if (predicate.apply(new PredicateKey(server))) {
return server;
}
server = roundRobinRule.choose(key);
}
return super.choose(key);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class AvailabilityPredicate extends  AbstractServerPredicate {
...
@Override
public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}


private boolean shouldSkipServer(ServerStats stats) {
//短路状态判断以及并发数量判断
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
return true;
}
return false;
}
}
WeightedResponseTimeRule(响应时长作为权重的随机选择)

为每一个服务器赋予一个权重值。服务器响应时间越长,这个服务器的权重就越小。这个规则会随机选择服务器,这个权重值会影响服务器的选择。权重定时刷新(默认30s)。
对于每次请求:

  1. 遍历服务提供者列表,计算出平均响应时间总和totalResponseTime += avgResponseTime
  2. 再次遍历服务提供者列表,获得当前提供者的权重weight = totalResponseTime - avgResponseTime,并记录下列表累计权重和用于随机accumulatedWeights[i] = accumulatedWeights[i - 1] + weight
  3. 生成随机数并进行选取,失败的情况下会回退至简单轮询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
public class WeightedResponseTimeRule extends RoundRobinRule {
...

// 累积权重存放,[0]存放了服务器[0]的权重,[1]存放了服务器[0]到[1]的权重和
// [n]则存放了服务器[0]到[n]的权重和
private volatile List<Double> accumulatedWeights = new ArrayList<Double>();

// 更新权重用的Timer
protected Timer serverWeightTimer = null;

// 更新权重锁
protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);

...

// 初始化维护权重任务
void initialize(ILoadBalancer lb) {
if (serverWeightTimer != null) {
serverWeightTimer.cancel();
}
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
// 启动维护权重任务
// DynamicServerWeightTask里执行了 new ServerWeight().maintainWeights()
// serverWeightTaskTimerInterval 默认30 * 1000
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);

// 初始化权重值
ServerWeight sw = new ServerWeight();
sw.maintainWeights();

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
logger
.info("Stopping NFLoadBalancer-serverWeightTimer-"
+ name);
serverWeightTimer.cancel();
}
}));
}

class ServerWeight {
// 维护权重
public void maintainWeights() {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}

// 获取锁
if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}

try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}

// 所有服务提供者的响应时间和
double totalResponseTime = 0;
// 获取所有提供者平均响应时间和
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}

// 累积权重
Double weightSoFar = 0.0;

// create new list and hot swap the reference
// 累积权重存放 accumulatedWeights
List<Double> finalWeights = new ArrayList<Double>();

// 获取提供者的权重以及累积权重
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}

}
}

@Override
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;

while (server == null) {
// get hold of the current reference in case it is changed from the other thread
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();

int serverCount = allList.size();

if (serverCount == 0) {
return null;
}

int serverIndex = 0;

// 获取所有服务器权重和
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// 权重值过期或未初始化时回退至简单轮询
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// 生成随机数并进行选取服务提供者
// generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}

server = allList.get(serverIndex);
}

if (server == null) {
/* Transient. */
Thread.yield();
continue;
}

if (server.isAlive()) {
return (server);
}

// Next.
server = null;
}
return server;
}

...
}

wiki上提到的规则是以上三种,下面是wiki上并未提及但Ribbon已实现的一些规则。

BestAvailableRule(最佳可用)

忽略短路的提供者,然后选择并发数(最少活跃)的提供者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {
...
@Override
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
List<Server> serverList = getLoadBalancer().getAllServers();
// 最少活跃数
int minimalConcurrentConnections = Integer.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Server chosen = null;
for (Server server: serverList) {
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
// 选择未短路的服务提供者进行比较
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
// 比较当前活跃数 如果小则替换
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}
...
}
RandomRule(随机) 请勿使用,见github issue链接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;

while (server == null) {
if (Thread.interrupted()) {
return null;
}
// 可用服务提供者列表
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();

int serverCount = allList.size();
if (serverCount == 0) {
/*
* No servers. End regardless of pass, because subsequent passes
* only get more restrictive.
*/
return null;
}

// index = ThreadLocalRandom.current().nextInt(serverCount)
int index = chooseRandomInt(serverCount);
// upList获取可用实例,越界警告
server = upList.get(index);

if (server == null) {
/*
* The only time this should happen is if the server list were
* somehow trimmed. This is a transient condition. Retry after
* yielding.
*/
Thread.yield();
continue;
}

if (server.isAlive()) {
return (server);
}

// Shouldn't actually happen.. but must be transient or a bug.
server = null;
Thread.yield();
}

return server;

}

请勿使用该规则,在获取提供者时server = upList.get(index);其中upList是可用提供者列表,而index是由所有提供者数量生成的随机数,这可能会产生越界错误(当有server处于非UP时),去github上查了下这个bug一直没修:RandomRule incorrect index bug

RetryRule

重试机制,需配合其他规则使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class RetryRule extends AbstractLoadBalancerRule {
// 默认规则轮询,可设
IRule subRule = new RoundRobinRule();
// 默认重试时长
long maxRetryMillis = 500;

...

public Server choose(ILoadBalancer lb, Object key) {
// 请求时间
long requestTime = System.currentTimeMillis();
// 截至时间
long deadline = requestTime + maxRetryMillis;

Server answer = null;
// 根据子规则选取服务提供者
answer = subRule.choose(key);

// 时间不超过deadline时,如选取失败,进行重试
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
// 设定一个定时任务来执行中断以在超时时退出选取
InterruptTask task = new InterruptTask(deadline
- System.currentTimeMillis());

while (!Thread.interrupted()) {
answer = subRule.choose(key);

if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
Thread.yield();
} else {
// 选取成功
break;
}
}
// 取消该定时器
task.cancel();
}

if ((answer == null) || (!answer.isAlive())) {
return null;
} else {
return answer;
}
}
ZoneAvoidanceRule(区域过滤轮询)

ZoneAvoidanceRule继承了PredicateBasedRule(见AvailabilityFilteringRule),因为他没有重写choose方法,所以他只是在新定义的规则上对所有服务提供者进行过滤再轮询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ZoneAvoidanceRule extends PredicateBasedRule {
// 过滤条件
private CompositePredicate compositePredicate;

public ZoneAvoidanceRule() {
super();
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}

private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
// ZoneAvoidancePredicate为主,AvailabilityPredicate为副的组合过滤条件
return CompositePredicate.withPredicates(p1, p2)
.addFallbackPredicate(p2)
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();

}

...
}

参考链接