EurekaClient的RPC与心跳分析
Eureka是SpringCloud中推荐的NamingServer,自然就少不了心跳。如果要是来类比的话,我认为网络中的DDNS,或者LTE中的控制侧平面协议与本文所讲比较类似。
本文结论
Eureka的客户端启动后开了2个轮询线程池
- 通过定时PUT发送实例心跳
- 通过定时GET获取增量信息
- 上述心跳与增量信息均不是原子操作,但是有最终一致性
说完了结论,我们现在就可以开始动手抓包了
抓包准备工作
本文直接上Wireshark进行抓包分析(再发个感慨,Eureka的几个HTTP包相比LTE通信中的RRC等信令抓包实在是太简单了,同样也是服务发现,通信专业比所谓的微服务难很多,但工资却不如IT)
准备工作
- 配置netflix的日志级别为DEBUG的多台Client/Server程序
- Wireshark抓包工具,并配置好监听的网卡(如果你用的是localhost,一般是选择Loopback)
术语概念
appID is the name of the application and instanceID is the unique id associated with the instance. In AWS cloud, instanceID is the instance id of the instance and in other data centers, it is the hostname of the instance.
心跳抓包
抓包准备
启动EurekaServer,打开Wireshark,选择你要监听的网卡(如果你用的是localhost,一般是选择Loopback),然后配置如下过滤器
http&&tcp.port==8761
单机Client启动与心跳(RENEW)场景
如下是Server早已经启动后,Client从零开始发送的所有请求
Time | Info |
---|---|
26.609128 | GET /eureka/apps/ HTTP/1.1 |
26.610332 | HTTP/1.1 200 (application/json) |
26.78631 | POST /eureka/apps/API-PROD-SZ1 HTTP/1.1 (application/json) |
56.406603 | HTTP/1.1 204 |
56.673179 | GET /eureka/apps/ HTTP/1.1 |
56.67615 | HTTP/1.1 200 (application/json) |
56.676931 | PUT /eureka/apps/API-PROD-SZ1/10.0.0.4:api-prod-sz1?status=UP&lastDirtyTimestamp=1519311933974 HTTP/1.1 |
56.679674 | HTTP/1.1 200 |
86.688146 | PUT /eureka/apps/API-PROD-SZ1/10.0.0.4:api-prod-sz1?status=UP&lastDirtyTimestamp=1519311933974 HTTP/1.1 |
86.691646 | HTTP/1.1 200 |
86.725409 | GET /eureka/apps/delta HTTP/1.1 |
86.729934 | HTTP/1.1 200 (application/json) |
其中获取增量更新Delta的返回如下,也就是一个Diff操作
{"applications":{"versions__delta":"11","apps__hashcode":"UP_1_","application":[]}}
而心跳就更简单了,只是一个PUT操作更新实例
节点的正常上线
在当前负载下额外启动一个Client,可以发现如下现象
- 新增Client通过POST发送当前实例信息给Server
- 其它Client通过GET增量信息接受新增Client的信息
- 当POST发布实例操作没有完成时,其它Client获取的delta是空白的;操作完成后,其它Client获取的delta有了新增的实例
节点的正常下线
清空Wireshark日志,然后Kill 15
关闭Eureka的某个Client,可以发现有如下请求发出
首先POST报文将JSON中的状态配置为DOWN
POST /eureka/apps/API-PROD-SZ1 HTTP/1.1 (application/json)
接着删除了此APP的实例(无论剩下的Client有几个都发送了)
DELETE /eureka/apps/API-PROD-SZ1/10.0.0.4:api-prod-sz1 HTTP/1.1
其中
- appID: API-PROD-SZ1
- instanceID: 10.0.0.4:api-prod-sz1
节点的异常下线
异常下线后,判断逻辑肯定在Server端,本文暂时不分析。当然默认是90s后自动下线。
抓包结论
由上面可以得出如下结论
- POST发布app全量实例的操作不是一个(阻塞的)原子操作
- 通过轮询全量或增量同步应用信息,但是Eureka不保证各个节点的Consistence(也就是CAP的C没法保证),但是在多次轮询后可以达到最终一致性
- 心跳本身很简单,只是PUT应用的实例信息
Java侧请求
在EurekaClient的构造函数中,主要有两步操作:第一步反序列化配置文件,第二步启动定时线程池(心跳与更新缓存),下文简要提供相关断点位置
首先进入构造函数
com.netflix.discovery.DiscoveryClient#DiscoveryClient
通过分析,可以发现在本地缓存如下
// 本地通过CAS实现
private final AtomicReference<Applications> localRegionApps = new AtomicReference<>();
通过REST接口反查第一次RPC请求断点位于,将获取全量的APP信息
com.netflix.discovery.DiscoveryClient#fetchRegistry
后续将通过轮询进行增量更新与心跳
全量更新
当本地缓存为空时,将进行全量更新
com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry
通过CAS保证本地线程安全
if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 此处的过滤操作类似于groupBy操作符过滤出状态为up的实例
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
增量更新
当通过定时轮询从服务端获取到增量更新(Applications对象)后,将在本地CAS锁(ReentrantLock)更新
//com.netflix.discovery.DiscoveryClient#getAndUpdateDelta
if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 通过ActionType更新hashSet,此处内部也有锁
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
}
updateDelta
此部分比较简单,通过遍历并进行ActionType模式匹配更新Application列表中的状态
本地Hash校验
此处虽然有个所谓的一致性Hash的唬人名字,实际上就是本地与远程的对比,类似于分段下载完iso文档后进行CheckSum校验,它与环型的一致性Hash不是一个东西。
举一个例子,比如当前有如下的机器
[
"vip1": [UP, UP]
"vip2": [UP, UP]
"appHashCode": "UP_4_"
]
其中某一台挂了,服务端返回如下
[
"vip1": [UP, UP]
"vip2": [UP, DONW]
"appHashCode": "DOWN_1_UP_3_"
]
本地增量更新远程的状态(updateDelta)后,也将进行Hash计算(getReconcileHashCode),并与远程的计算结果相对比。
此处校验算法虽然有一堆for循环,但是它实际上是类似Groovy中函数式编程countBy
的实现,伪代码如下
// getReconcileHashCode 的伪代码
// 本地增量后的 localRegionApps
def localRegionApps = ["UP",'UP','DOWN','DOWN','UP','UP','UP','UP'];
getReconcileHashCode(list){
def map = list.countBy {it}
//=>{UP=6, DOWN=2}
def list = map.collect { k, v -> k + "_" + v + "_" }
//=>[UP_6_, DOWN_2_]
// 注意,此处仅仅为伪代码,因为真实使用KetSet遍历的是基于TreeMap(按照Key)进行排序
def hash = list.inject("") { old, it -> it.concat(old) }
//=>DOWN_2_UP_6_
}
getReconcileHashCode(localRegionApps)
// => DOWN_2_UP_6_
如果hashCode相同,那么此次更新就成功了;如果hashCode不相同(我还没有见过,肯定是在本地合并的那一步),将会进行全量更新
Hash碰撞特例
上面的Hash算法太简单了,比如
[
"vip1": [UP, UP]
"vip2": [UP, DONW]
"appHashCode": "DOWN_1_UP_3_"
]
变成了
[
"vip1": [UP, DONW]
"vip2": [UP, UP]
"appHashCode": "DOWN_1_UP_3_"
]
此时本地updateDelta也更新失败的话,那么这次增量更新校验却被认为是更新成功了,这里的就存在碰撞问题。
此处待确认。
附录
服务端代码位置
此部分的服务端代码在如下位置
eureka-core-1.8.6.jar!/com/netflix/eureka/resources
RPC实现
Eureka在内部均采用了sun的jersey作为HTTP请求客户端,你可以把它类比为OkHttp或者HttpClient
例如获取Application就调用了如下
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplications
注意这里的Java代码也是通过Override闭包回掉的形式来实现分层的,与ServiceComb有点类似,因此读者在打断点时要明白代码并不一定是从上到下走的
如果你第一次看Eureka的源码,建议将下的所有AbstractJerseyEurekaHttpClient
下的Jersey HTTP
字符串日志相关的行都打上断点,先分析再把断点读薄。
NodeJS中的Client
在NodeJS等其它平台中,如果希望集成到Eureka的服务发现中,可以使用EurekaClient实现,内部原理很简单,就是一个Timer定时请求。我在部分项目中也有使用,但是NodeJS圈子有一个常见的问题就容易撒手不管,所以如果使用的话可能需要进行定制。