本书是对SpringCloud/Netflix开源的Eureka组件分析系列文章,保证不太监,正在撰写中,欢迎各位Star与PR

面向读者

  • 了解过分布式系统,有Zookeeper, etcd等经验更佳
  • 已经掌握Eureka入门,最低要搭过Demo

为读者提供了

  • Eureka的中文介绍
  • Eureka的断点级源码分析
  • 遗留老系统改造方法

关于微服务

虽然本书主要讲的是SpringCloud,但是微服务改造是一个庞大的工程问题,并不是简单的引入几个Jar包就可以,而是需要Jenkins打包,Maven私服,配置中心,DevOps运维,日志管理等大批自动化维护工具,以及质量流程,测试QA结合使用,所以有心进行Eureka服务化的人在行动前,一定要计算投入产出比,避免上车后带不动。

同时,假如你要推进改造,那么必须有上级支持或者有小弟,否则自己单干也干不起来。

本书也仅仅达到工程水平,远不能达到"格物"水平。

如何划分微服务?

我的个人建议是按照混血型组织进行划分,格鲁夫的组织理论在服务化中依然存在

  • (横向的)业务服务:比如CURD、流程等业务,由各个模块负责人主导,它是横向的组织,如果大家权责不清晰,先在业务层上做WBS,再搞服务化。
  • (纵向的)公共服务:比如数据字典、权限管理、SSO等方案,它们都可以做到多租户/业务无关的SAAS实现,而且网上有开源实现,基本上就是纯搬砖工作量。

技术交流

邮件: miao1007@gmail.com

license

本作品采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可。

Eureka简介

Eureka与覆盖网络

Eureka在IP网络的基础上提供了覆盖网络(Overlay Network),所谓覆盖网络,实际上就是在网络上的网络,比如VoIP,DNS,DHT,VxN协议甚至热炒的区块链,它们的特点就是节点间通信与下层IP协议解藕(比如地址,端口,协议等): A用户给B用户打电话,只需要知道对方的电话号码,而不需要对方的IP地址,这里的“电话号码”就可以看作一个NodeId,而电话连线的过程对用户是透明的。

在Eureka中,通过VIP(Virtual IP,同义词,非真正的VIP协议)来表示一个节点集群的ID,节点间通信只需要考虑VIP,而不用考虑其下层网络的属性(是否可用,IP是否又变了等)。

Eureka的数据结构

Eureka实际就是维护了一个远程中心化的Map,key为Virtual IP,也就是Application的名称,Value为可用服务实例列表(Instance)

Map<String, List<ServiceInstance>>

它的函数原型大致如下

void put(KEY vip, List<ServiceInstance> services);  // 发送实例到中心服务器
List<ServiceInstance> get(KEY vip);  // 通过VIP获取实例

因此,当你看到“云”,“微服务”时,千万不要产生胆怯心理,也不要被忽悠了。Eureka相比于Dubbo等RPC框架简单了许多,更比通信领域简单了太多。普通开发可以在一下午上手,中级开发看完本书后即可了解70%的流程。

与其他组件的对比

从客户端的角度

如果说让我来做个类比的话,那么DNS中的特例“HTTP DNS”就是Eureka最好的类比。HTTP DNS在Android等客户端中广泛使用,客户端向DNS发送域名,服务端返回了一串解析后的IP列表

# 下为OpenDNS的例子
$ curl http://119.29.29.29/d?dn=gitbook.com
104.25.212.20;104.25.213.20                                                

而Eureka也一样,通过向EurekaServer发送VIP(Virtual IP,你可以把它看作内网域名)名称,也返回一串地址,Java代码中对应的请求是

# api-prod-sz1就是vip,它对应了很多Server实例
$ curl localhost:8761/eureka/vips/api-prod-sz1

返回了如下(以XML为例)

<applications>
    <versions__delta>-1</versions__delta>
    <apps__hashcode>UP_1_</apps__hashcode>
    <application>
        <name>API-PROD-SZ1</name>
        <instance>
            <instanceId>10.0.0.4:api-prod-sz1</instanceId>
            <hostName>10.0.0.4</hostName>
            <app>API-PROD-SZ1</app>
            <ipAddr>10.0.0.4</ipAddr>
            <status>UP</status>
            <overriddenstatus>UNKNOWN</overriddenstatus>
            <port enabled="true">8080</port>
            <securePort enabled="false">443</securePort>
            <countryId>1</countryId>
            <dataCenterInfo class="com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo">
                <name>MyOwn</name>
            </dataCenterInfo>
            <leaseInfo>
                ...
            </leaseInfo>
            <metadata>
                <management.port>8080</management.port>
                <jmx.port>53894</jmx.port>
            </metadata>
            ...
            <vipAddress>api-prod-sz1</vipAddress>
        </instance>
    </application>
</applications>

因此从客户端黑盒的角度来看,Eureka与HTTP DNS没有很大的差别

EurekaHTTP DNS
实例粒度节点级节点级
侵入式有,也可以使用RESTfulAPI
短板老项目不好改需要跨防火墙,维护成本高,背后隐藏着Nginx(upsource等)
控制域与命名服务直连与命名服务直连
用户域P2PP2P
传输语法HTTPHTTP
抽象语法按照Wiki的几个API来搞发送一个QueryString即可

所以千万不要畏惧Eureka这样的新技术,它们的使用并不需要非常高的编码水平。

网上有很多文章拿Eureka与Nginx进行对比,我认为这个是不公平的,Nginx本身定位是位于防火墙前面向客户的,而Eureka用于内部PRC的。对于我个人来说,Nginx/DNS这一套折腾下来最麻烦的就是防火墙的各种开通,需要各种跳板去调试配置,分析定位速度非常慢。

从CAP的角度

根据CAP定律,C——数据一致性,A——服务可用性,P——服务对网络分区故障的容错性,这三个无法同时满足,因此各个微服务框架均侧重选了2个

ZookeeperEureka
C(Consistence)YN
A(Avaliabilty)NY

上面说的只是理论问题,实际上Zookeeper在真正生产使用时,稍微请求多一点就跪了,雪崩(连接数太多)直接导致所有服务无法使用,比如在生产环境用遇到过

  • 断电(没想到吧) -> Zk持久化EOF错误了 -> 无法启动 -> 负载压到少数机器 -> 慢慢变慢最终爆掉
  • 负载过多 -> 连接数下不来 -> 线程卡在Netty的NIO等待,没法更新树->最终爆掉

此外还有Zk难以抓包定位,无可视化界面等问题,这些都需要自己维护定制工具

与Dubbo的关系

  • 正如上面的Curl例子所示,Eureka的命名服务主要是通过VIP帮你找到IP,不支持方法级服务的注册发现,因此比基于zk的Dubbo更简化(但是可以用Swagger+Feign来实现)。
  • Eureka的增量更新不是原子操作,但是保证最终一致性
  • 基于Eureka的RPC一般是HTTP调用,也就是服务端开一个RestController就可以了,比Dubbo等微服务框架侵入性更低

与SpringCloudFunction的关系

Eureka与Spring Cloud Function(FAAS)没有任何关系。Spring的FAAS开源实现目前还不成熟,内部直接用文件做的全局共享,不存在分布式特性

Eureka的主要概念

在Eureka中,Netflix基于AWS设计出了很多Java类

  • EurekaServer: 服务注册发现的实现,你可以把它看作Zookeeper, etcd等

  • EurekaClient: 一个对Eureka的RESTful规范的实现,心跳,注册等流程均通过这里以HTTP请求的形式发出,你暂时可以把它理解为轮询HTTP请求发生器。

  • VIP(Virtual IP): 虚拟IP,命名服务中的Key,类似于域名

  • Feign: 注解形式的HTTP客户端,API基本与Retrofit一致,内部通过动态代理拼装HTTP请求,我之前也写Retrofit的文章,这里就多不介绍了。

  • SpringCloud-xxx: 对上面的包装,重点在于它的AutoConfiguration帮你干了很多重复工作,此外Maven的依赖问题也帮你解决了(老项目的工作量就在这了)

Eureka客户端

SpringBoot的Eureka-client分析

虽然本文主要讲的是Eureka,但是绝大部分初级中级读者应该还是从SpringBoot开始上手,因此本部分将讲解SpringBoot下Eureka的启动流程。

首先要注意的是,由于作者不精通SpringBoot,下面断点方法有投机取巧的意味,仅用于拉通对齐端到端主流程,而不能让你深入理解SpringBoot的启动原理

结论

通过进一步的封装,将原生的eureka-client转换为SpringBoot常用的yaml,通过AutoConfiguration这种注解DSL帮你节约样板代码的配置时间

  • SpringBoot-starter等框架大部分均是一层包装,千万不要畏惧
  • 善于用工具解决问题,避免把时间耗在Spring的细节中
  • 充分利用构造函数来断点分析Bean

预先准备

  • 按照网上任意教程搭建Server与Client的环境,比如这里
  • 旗舰版IntellijIDEA工具

SpringBoot自动装箱扫盲

@Configuration注解的含义

我们首先复习一下传统Spring的Java注解。除了传统的XML标记Bean以外,专业书籍中更推荐使用JavaConfig配置来代替繁琐的Xml标记

比如在《Spring In Action》中,作者是这样推荐的 "I’d favor the type-safe and more powerful JavaConfig over XML."

通过在传统项目的web.xml(WebApplicationInitializer)中配置@ComponentScan或者SpringBoot中配置@SpringBootApplication进行扫描,带有@Configuration的Class将被自动注入,比如下面就是一个Bean。

@Configuration
public class WebConfig{
    @Bean
    public SSOFilter ssofilter(){
        ....
    }
}

@EnableAutoConfiguration的含义

@SpringBootApplication注解中,最重要的是EnableAutoConfiguration这个注解,它就是SpringBoot自动装配各种配置的关键

// EnableAutoConfiguration注解对应的代码
@Import({EnableAutoConfigurationImportSelector.class})
public @interface EnableAutoConfiguration {
   ...
}

我们对它的最终实现类进行如下断点

org.springframework.boot.autoconfigure.AutoConfigurationImportSelector#selectImports

可以发现这里调用了SpringFactoriesLoader读取factories并返回了一个数组

org.springframework.core.io.support.SpringFactoriesLoader#loadSpringFactories

SpringFactoriesLoader是老Spring项目中已经存在的工具类(比如Dubbo中对Bean的xsd进行定制以支持Zookeeper),而非SpringBoot新造的轮子。它硬编码了META-INF/spring.factories作为properties配置读取并反序列化,并通过反射实例化标记为@Configuration的JavaConfig

—— 此部分可以参考《SpringBoot揭秘: 快速构建微服务》

所以说 ,EnableAutoConfiguration并不是什么黑科技,它只是反序列化了properties配置并读取JavaConfig,并没有很高深的技巧。

这里就不详细讲了,在本场景中返回了70多个bean,这些将后续被实例化并加载到上下文后进行Refresh。

如果读者有兴趣的话,可以参考mybatis-starter中的配置,这个比Eureka依赖更简单,更加适合入门学习

Eureka都自动注入了啥

Eureka中的AutoConfiguration

我们接着去查看Eureka中的配置文件(spring-cloud-netflix-eureka-client-1.4.2.RELEASE.jar!/META-INF/spring.factories),可以发现内部有如下配置

# 注意: value的前缀被我节约版面干掉了
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
EurekaClientConfigServerAutoConfiguration,\
EurekaDiscoveryClientConfigServiceAutoConfiguration,\
EurekaClientAutoConfiguration,\
RibbonEurekaAutoConfiguration,\
EurekaDiscoveryClientConfiguration
...

这么多配置如何分析呢,对于这类问题我们可以考虑购买高效工具(IntellijIDEA)来提高生产力

首先我们最常见使用的对象是这个

@Autowired
private DiscoveryClient discoveryClient;

使用IDEA的行左边绿色的Bean分析功能,可以发现是CompositeDiscoveryClientAutoConfiguration注入的。

打开这个Config,在构造compositeDiscoveryClient下打断点,可以发现实现类分别是EurekaDiscoveryClientSimpleDiscoveryClient,明显前一个才是真正负责RPC业务的客户端。

接着在EurekaDiscoveryClient的构造函数上打断点,发现构造这个是在EurekaClientAutoConfiguration被调用,两个入参均为netflix的包名,说明我们已经走到Spring封装的尽头

@Bean
public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
  return new EurekaDiscoveryClient(config, client);
}

接下来依次分析config与client的构造函数

EurekaInstanceConfig

第一个比较好分析,它只是一个Properties类

@ConfigurationProperties("eureka.instance")
public class EurekaInstanceConfigBean {
  //可以发现这个类就是一个单纯的反序列化类
}

EurekaClient

这部分在断点中为动态代理,实现类是CloudEurekaClient,我们在它的构造函数上打断点

public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
                         EurekaClientConfig config,
                         AbstractDiscoveryClientOptionalArgs<?> args,
                         ApplicationEventPublisher publisher) {
  //这里进行了耗时的注册初始化等流程,后续我们会在neflix中单独分析
  super(applicationInfoManager, config, args);
  this.applicationInfoManager = applicationInfoManager;
  this.publisher = publisher;
  this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
  ReflectionUtils.makeAccessible(this.eurekaTransportField);
}

可以发现,在如下位置进行@Lazy初始化Bean

org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.RefreshableEurekaClientConfiguration#eurekaClient

这里注入了3个类

  • ApplicationInfoManager: Spring 直接注入的Singleton
  • EurekaClientConfigBean: 反序列化eureka.client的配置信息
  • EurekaInstanceConfig: 同上,反序列化eureka.instance的配置信息

这样EurekaClient的启动流程就分析完了

总结

说白了SpringBoot就是自动帮你反序列化并new出来一个Client,并配置了很多默认值。

关于@EnableDiscoveryClient注解

读者可能会问:为什么这个注解放到最后写呢,它明明应该是“入口”啊。

关于这个入口,读者可以做一个黑盒实验,把@EnableDiscoveryClient注解给去掉重新跑。发现无论是否加这个注解,在默认的配置下,都会自动注入EurekaClient这些Bean,EurekaClient也照样会发送RPC心跳请求。

首先配置日志等级

logging.level.org.springframework.boot.autoconfigure=DEBUG

我们点击进入这个注解,这个注解实际实现类是EnableDiscoveryClientImportSelector,在这里打上断点

org.springframework.cloud.client.discovery.EnableDiscoveryClientImportSelector#selectImports

然后启动SpringBoot,在Edgware.SR1版本下,断点进入,返回

org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration

这个类基本啥也没干,只是一个反序列化类

因此网上大部分的Eureka源码分析文章从第一步入口就得出了错误的结论,这个注解与@EnableAutoConfiguration虽然用的都是一个SpringFactoriesLoader,但是并不是它帮你组装了各种类,也不是一个开关,返回的并不是同一个值。

因此建议各位读者无论看我的文章还是其他的博文,都要有批判性思维。除了精确校对的技术书籍,博客中的内容都是需要验证的。

EurekaClient的RPC与心跳分析

Eureka是SpringCloud中推荐的NamingServer,自然就少不了心跳。如果要是来类比的话,我认为网络中的DDNS,或者LTE中的控制侧平面协议与本文所讲比较类似。

本文结论

Eureka的客户端启动后开了2个轮询线程池

  • 通过定时PUT发送实例心跳
  • 通过定时GET获取增量信息
  • 上述心跳与增量信息均不是原子操作,但是有最终一致性

说完了结论,我们现在就可以开始动手抓包了

抓包准备工作

本文直接上Wireshark进行抓包分析(再发个感慨,Eureka的几个HTTP包相比LTE通信中的RRC等信令抓包实在是太简单了,同样也是服务发现,通信专业比所谓的微服务难很多,但工资却不如IT)

准备工作

  • 配置netflix的日志级别为DEBUG的多台Client/Server程序
  • Wireshark抓包工具,并配置好监听的网卡(如果你用的是localhost,一般是选择Loopback)

术语概念

appID is the name of the application and instanceID is the unique id associated with the instance. In AWS cloud, instanceID is the instance id of the instance and in other data centers, it is the hostname of the instance.

心跳抓包

抓包准备

启动EurekaServer,打开Wireshark,选择你要监听的网卡(如果你用的是localhost,一般是选择Loopback),然后配置如下过滤器

http&&tcp.port==8761

单机Client启动与心跳(RENEW)场景

如下是Server早已经启动后,Client从零开始发送的所有请求

TimeInfo
26.609128GET /eureka/apps/ HTTP/1.1
26.610332HTTP/1.1 200 (application/json)
26.78631POST /eureka/apps/API-PROD-SZ1 HTTP/1.1 (application/json)
56.406603HTTP/1.1 204
56.673179GET /eureka/apps/ HTTP/1.1
56.67615HTTP/1.1 200 (application/json)
56.676931PUT /eureka/apps/API-PROD-SZ1/10.0.0.4:api-prod-sz1?status=UP&lastDirtyTimestamp=1519311933974 HTTP/1.1
56.679674HTTP/1.1 200
86.688146PUT /eureka/apps/API-PROD-SZ1/10.0.0.4:api-prod-sz1?status=UP&lastDirtyTimestamp=1519311933974 HTTP/1.1
86.691646HTTP/1.1 200
86.725409GET /eureka/apps/delta HTTP/1.1
86.729934HTTP/1.1 200 (application/json)

其中获取增量更新Delta的返回如下,也就是一个Diff操作

{"applications":{"versions__delta":"11","apps__hashcode":"UP_1_","application":[]}}

而心跳就更简单了,只是一个PUT操作更新实例

节点的正常上线

在当前负载下额外启动一个Client,可以发现如下现象

  • 新增Client通过POST发送当前实例信息给Server
  • 其它Client通过GET增量信息接受新增Client的信息
  • 当POST发布实例操作没有完成时,其它Client获取的delta是空白的;操作完成后,其它Client获取的delta有了新增的实例

节点的正常下线

清空Wireshark日志,然后Kill 15关闭Eureka的某个Client,可以发现有如下请求发出

首先POST报文将JSON中的状态配置为DOWN

POST /eureka/apps/API-PROD-SZ1 HTTP/1.1  (application/json)

接着删除了此APP的实例(无论剩下的Client有几个都发送了)

DELETE /eureka/apps/API-PROD-SZ1/10.0.0.4:api-prod-sz1 HTTP/1.1 

其中

  • appID: API-PROD-SZ1
  • instanceID: 10.0.0.4:api-prod-sz1

节点的异常下线

异常下线后,判断逻辑肯定在Server端,本文暂时不分析。当然默认是90s后自动下线。

抓包结论

由上面可以得出如下结论

  • POST发布app全量实例的操作不是一个(阻塞的)原子操作
  • 通过轮询全量或增量同步应用信息,但是Eureka不保证各个节点的Consistence(也就是CAP的C没法保证),但是在多次轮询后可以达到最终一致性
  • 心跳本身很简单,只是PUT应用的实例信息

Java侧请求

在EurekaClient的构造函数中,主要有两步操作:第一步反序列化配置文件,第二步启动定时线程池(心跳与更新缓存),下文简要提供相关断点位置

首先进入构造函数

com.netflix.discovery.DiscoveryClient#DiscoveryClient

通过分析,可以发现在本地缓存如下

// 本地通过CAS实现
private final AtomicReference<Applications> localRegionApps = new AtomicReference<>();

通过REST接口反查第一次RPC请求断点位于,将获取全量的APP信息

com.netflix.discovery.DiscoveryClient#fetchRegistry

后续将通过轮询进行增量更新与心跳

全量更新

当本地缓存为空时,将进行全量更新

com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry

通过CAS保证本地线程安全

 if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
     // 此处的过滤操作类似于groupBy操作符过滤出状态为up的实例
     localRegionApps.set(this.filterAndShuffle(apps));
     logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
 } else {
     logger.warn("Not updating applications as another thread is updating it already");
 }

增量更新

当通过定时轮询从服务端获取到增量更新(Applications对象)后,将在本地CAS锁(ReentrantLock)更新

//com.netflix.discovery.DiscoveryClient#getAndUpdateDelta
if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
    logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
    String reconcileHashCode = "";
    if (fetchRegistryUpdateLock.tryLock()) {
        try {
            // 通过ActionType更新hashSet,此处内部也有锁
            updateDelta(delta);
            reconcileHashCode = getReconcileHashCode(applications);
        } finally {
            fetchRegistryUpdateLock.unlock();
        }
    } else {
        logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
    }
}

updateDelta

此部分比较简单,通过遍历并进行ActionType模式匹配更新Application列表中的状态

本地Hash校验

此处虽然有个所谓的一致性Hash的唬人名字,实际上就是本地与远程的对比,类似于分段下载完iso文档后进行CheckSum校验,它与环型的一致性Hash不是一个东西

举一个例子,比如当前有如下的机器

[
    "vip1": [UP, UP]
    "vip2": [UP, UP]
    "appHashCode": "UP_4_"
]

其中某一台挂了,服务端返回如下

[
    "vip1": [UP, UP]
    "vip2": [UP, DONW]
    "appHashCode": "DOWN_1_UP_3_"
]

本地增量更新远程的状态(updateDelta)后,也将进行Hash计算(getReconcileHashCode),并与远程的计算结果相对比。

此处校验算法虽然有一堆for循环,但是它实际上是类似Groovy中函数式编程countBy的实现,伪代码如下

// getReconcileHashCode 的伪代码
// 本地增量后的 localRegionApps
def localRegionApps = ["UP",'UP','DOWN','DOWN','UP','UP','UP','UP'];
getReconcileHashCode(list){
    def map = list.countBy {it}
    //=>{UP=6, DOWN=2}
    def list = map.collect { k, v -> k + "_" + v + "_" }
    //=>[UP_6_, DOWN_2_]
    // 注意,此处仅仅为伪代码,因为真实使用KetSet遍历的是基于TreeMap(按照Key)进行排序
    def hash = list.inject("") { old, it -> it.concat(old) }
    //=>DOWN_2_UP_6_
}
getReconcileHashCode(localRegionApps)
// => DOWN_2_UP_6_

如果hashCode相同,那么此次更新就成功了;如果hashCode不相同(我还没有见过,肯定是在本地合并的那一步),将会进行全量更新

Hash碰撞特例

上面的Hash算法太简单了,比如

[
    "vip1": [UP, UP]
    "vip2": [UP, DONW]
    "appHashCode": "DOWN_1_UP_3_"
]

变成了

[
    "vip1": [UP, DONW]
    "vip2": [UP, UP]
    "appHashCode": "DOWN_1_UP_3_"
]

此时本地updateDelta也更新失败的话,那么这次增量更新校验却被认为是更新成功了,这里的就存在碰撞问题。

此处待确认。

附录

服务端代码位置

此部分的服务端代码在如下位置

eureka-core-1.8.6.jar!/com/netflix/eureka/resources

RPC实现

Eureka在内部均采用了sun的jersey作为HTTP请求客户端,你可以把它类比为OkHttp或者HttpClient

例如获取Application就调用了如下

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplications

注意这里的Java代码也是通过Override闭包回掉的形式来实现分层的,与ServiceComb有点类似,因此读者在打断点时要明白代码并不一定是从上到下走的

如果你第一次看Eureka的源码,建议将下的所有AbstractJerseyEurekaHttpClient下的Jersey HTTP字符串日志相关的行都打上断点,先分析再把断点读薄。

NodeJS中的Client

在NodeJS等其它平台中,如果希望集成到Eureka的服务发现中,可以使用EurekaClient实现,内部原理很简单,就是一个Timer定时请求。我在部分项目中也有使用,但是NodeJS圈子有一个常见的问题就容易撒手不管,所以如果使用的话可能需要进行定制。

参考

聊聊分布式散列表(DHT)的原理——以 Kademlia(Kad) 和 Chord 为例

Eureka集成遗留项目

How to use eureka without spring boot?

虽然SpringCloud开发效率很爽,但是现有项目很多是遗留老项目(比如Struts等),基本上不可能升级到SpringBoot。本文将介绍SpringCloud中Eureka组件与老项目的集成方法。

一听到最新的SpringCloud组件与老项目集成,有经验的老码农可能会说“不可能吧,Jar包兼容性就一堆坑”。针对这个问题,本文已经解决了。

本文目标

  • 保持现有非SpringBoot项目结构不变,确保兼容性
  • 集成Eureka服务

清理Maven依赖遗留问题

使用eureka很简单,只用引入一个Jar包maven依赖就可以了。实际上并不然,对于老项目来说,拔出萝卜带出泥,引入一个Jar包,就像npm一样下载了一堆文件,这些同名不同版本的jar包混到一起,让开发者无从下手。

maven的runtime依赖介绍

在eureka的scope中有一种叫runtime的依赖,这些包在打包(比如War包)时将被导入,而在使用时如果你没有显示申明依赖了它,它在IDE中仍然是红的代码

引入pom后代码中是否仍然变红是否打入war包
compileNY
provided(比如私有Framework)NN
runtime(比如JDBC)YY

因此如果我们代码中使用Eureka中依赖的jar包库(比如javax.inject),需要在dependency中重复再次显性声明依赖。

maven中的systemPath处理

有些很老的系统是从Jar包迁移到Maven来的,经历过几次转手,项目源码中本身就有很多Jar包,比如有如下结构

- common
	-lib
		- spring-4.0.9.jar
		- mybatis.jar
    -src
    	- java
    		...

这些jar包在maven中是这样定义的

<!-- bad practice in maven -->
<dependency>
  <groupId>xxxx</groupId>
  <artifactId>xxxx</artifactId>
  <version>1.3.2</version>
  <type>jar</type>
  <scope>system</scope>
  <systemPath>${project.basedir}/libs/spring-4.0.9.jar</systemPath>
</dependency>

对于这类万年大坑,建议花专门的时间去把这些本地的jar包换成maven正常依赖的形式,以免产生jar包版本冲突,这类问题与Eureka无关,反正迟早要做,不如趁现在改了,对于这类问题,建议最小化分批替换原则,并多留出测试时间,建议发完版本立刻开搞

至于版本兼容性,可以考虑对Jar包进行MD5判断

使用Dependency Management管理版本号

由于eureka引入了很多第三方的jar包(比如guava),而这些第三方的Jar包对我来说,我不希望一点点去exclude,并折腾各种版本兼容性。因此可以使用Dependency Management来统一管理此问题,在老项目中使用spring-cloudspring-boot作为依赖管理。

<project>
    <!-- 虽然这里是Parent,但是不会引入任何包 -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.10.RELEASE</version>
        <relativePath/>
    </parent>
    
    <properties>
        <!-- 这里可以覆盖parent中的版本 -->
        <spring-cloud.version>Edgware.SR5</spring-cloud.version>
    </properties>
    
    <dependencies>
      <dependency>
        <groupId>com.netflix.eureka</groupId>
        <artifactId>eureka-client</artifactId>
      </dependency>
      <dependency>
        <groupId>com.netflix.archaius</groupId>
        <artifactId>archaius-core</artifactId>
      </dependency>
    </dependencies>
    <dependencyManagement>
      	<dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement> 
</project>

这样,你在项目中就复用了SpringBoot的版本号管理维护工作量,以后你只用修改这里的version与其它同级微服务中的SpringCloud版本一致,内部的Eureka,Jackson等版本号就不用自己折腾了。

Intellij中的暗坑

在Intellij中,当你从SVN中删除某个文件,这个文件只是Scheduled to remove from svn。并没有真的从FS删除,因此当你删除了Jar包后,需要立刻提交到SVN中,这样FS中才会被删除掉。这个地方坑了我一天,导致出现多个Jackson,而导致反序列化失败

如果你反复删不掉,可以使用Everything等工具,jar包可能放在你意想不到的位置(比如WEB-INF中)

集成Eureka

集成Eureka的难度不大,它内部也就是2个定时发请求的线程池,并不存在高大上的东西。

Log4j调试配置

分析任何框架第一件事就是配置Log

比较偷懒的方法是在Log4j中配置如下为Debug

com.netflix=DEBUG

配置后就可以看到200,204,400等各种RPC回掉了

Java侧配置

集成Eureka主要需要控制它的生命周期,如果你的项目还在用web.xml,那么可以把它挂到Listener中,或者用纯Java的@PostConstruct来控制Bean。此处可以参考Github配置

@Configuration
public EurekaClassicConfig{
    @PostConstruct
    public void init(){
        // 详见GitHub配置, 这一步会阻塞30s
        EurekaClient client = initializeEurekaClient(applicationInfoManager, new DefaultEurekaClientConfig());
    }
}

唯一注意的就是client初始化非常久,需要多等待

Properties的配置

配置文件名一定是: eureka-client.properties,因为这个是硬编码写死的,不能自定义

另外,Properties中最重要的是配置VIP(Virtual IP, 类似Nginx的UpSource)名称,而SpringBoot中已经默认配置好了,如果没配置将导致后续注册后仍然无法使用,下面是最简配置

eureka.vipAddress="miaomiao"
eureka.name="miaomiao"
# 与Tomcat一致
eureka.port="8080"
eureka.serviceUrl.default=http://localhost:8080/eureka/v2

官网的Wiki比较旧,可以参考如下对象进行配置

  • PropertyBasedClientConfigConstants

此外,虽然在官网中的Wiki中的url是以“/”结尾的,但是我发现请求时出现了//这样的问题,因此我建议在serviceUrl不要加入/后缀,具体可以打Log验证

功能验证

通过下面的Controller获取ServiceInstance,打断点查看是否能获取它的host与port信息

@RestController
class ServiceInstanceRestController {

    @Autowired
    private DiscoveryClient discoveryClient;

    @Autowired
    private LoadBalancerClient client;

    @RequestMapping("/service-instances/{applicationName}")
    public List<ServiceInstance> serviceInstancesByApplicationName(
    @PathVariable String applicationName) {
        // 打断点
        return this.discoveryClient.getInstances(applicationName);
    }

    @RequestMapping("/service-instance/{applicationName}")
    public ServiceInstance serviceInstanceByApplicationName(
    @PathVariable String applicationName) {
        return this.client.choose(applicationName);
    }
}

任意客户端进行curl调用

GET /service-instance/miaomiao

如果获取到了JSON数据,就说明注册成功了。其它组件就可以通过调用RestController的形式来请求了。

集成后常见问题

启动速度变慢

由于默认注册的间隔是30s,因此测试环境中可以把它改小点

注册中心单点故障

当注册中心挂掉后,启动就会失败。更深的问题是EurekaServer没有实现HA(High Availability)。针对这种问题,最简单的是配置多台Server,它们之间通过P2P进行Replicate

怎么输入鉴权密码

在URL中直接配置,输入方法同配置HTTP代理,如果你不希望明文放密码,可以使用 {cipher}进行加密

http://USERNAME:PASSWORD@PROXYIP:PROXYPORT

使用Swagger生成Feign桩

在Eureka中,默认通过RestController进行RPC调用,虽然使用比较简单,但是纯手写还是比较麻烦的。目前一般有如下调用方法

RPC StubPROSCONS
HttpClient/OkHttphight customizedmaintains urls, serialization
Feign/RetrofitDynamic Proxy, Interceptorsmaintains source code
Swagger CodegenAuto generated maven jarmaintains templates

其中Swagger Codegen方法中可以通过手动更新到VSC与通过Jenkins部署到MVN上实现,在实际项目中,你可以首先以源码形式进行管理,后面在切到全部自动化管理

graph LR
	a2--mvn deploy-->b1
    subgraph Swagger
    a1(API)--swagger codegen maven plugin-->a2(POM Source)
    end
    subgraph Maven nexus
    b1(Jar)
    end

虽然网上讲Swagger代码生成很多 ,但是中文似乎研究的人并不多,而且官方的生成器模版有一堆定制问题,因此本文进行一下介绍

简易生成代码桩

这个仅用于没有负担的入门项目

  • 没有鉴权(no auth)
  • 没有多环境配置(no env profiles)

代码如下

# download the stable release jar
wget http://central.maven.org/maven2/io/swagger/swagger-codegen-cli/2.3.1/swagger-codegen-cli-2.3.1.jar -O swagger-codegen-cli.jar
# run codegen with petstore
java -jar swagger-codegen-cli.jar generate -i http://petstore.swagger.io/v2/swagger.yaml -o gen -l spring --library spring-cloud

Do not use codegen3, it not works now.

高度定制生成代码桩

虽然Swagger看似一键生成了代码,但是生成的内容效果都不是很好,因此建议自己维护一份模板,原因如下

  • 在业务中一般会定制Encoder与鉴权插件,此部分仅仅修改模版是不够的
  • 由于SpringCloud更新速度很快的原因,如果你使用SpringBoot2.0,那么需要自己定制模版中的import。

具体教程如下

配置Maven plugin

首先在Maven中配置插件,其中插件参数如下

<profiles>
    <profile>
        <id>swagger-gen</id>
        <!-- eg: passed by mvn -Dkey=value -->
        <properties>
            <url>http://petstore.swagger.io/v2/swagger.yaml</url>
            <package>com.github.miao1007</package>
            <timestamp>${maven.build.timestamp}</timestamp>
            <maven.build.timestamp.format>yyyyMMdd</maven.build.timestamp.format>
        </properties>
        <build>
            <plugins>
                <plugin>
                    <groupId>io.swagger</groupId>
                    <artifactId>swagger-codegen-maven-plugin</artifactId>
                    <version>2.3.1</version>
                    <executions>
                        <execution>
                            <!-- see https://stackoverflow.com/a/3169340/4016014 -->
                            <id>default-cli</id>
                            <goals>
                                <goal>generate</goal>
                            </goals>
                            <configuration>
                                <inputSpec>${url}</inputSpec>
                                <language>spring</language>
                                <apiPackage>${package}.api</apiPackage>
                                <modelPackage>${package}.model</modelPackage>
                                <groupId>${package}</groupId>
                                <artifactId>sdk</artifactId>
                                <artifactVersion>${timestamp}-SNAPSHOTS</artifactVersion>
                                <invokerPackage>${package}.invoker</invokerPackage>
                                <templateDirectory>template</templateDirectory>
                                <configOptions>
                                    <library>spring-cloud</library>
                                    <dateLibrary>java8</dateLibrary>
                                </configOptions>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </profile>
</profiles>

注意这里专门开了一个profile,避免与其他打包产生影响。

生成源码(Generate source code)

执行如下

# please replace the url to your own's
mvn -P swagger-gen -Durl=http://petstore.swagger.io/v2/swagger.yaml -f pom.xml

部署到Maven(Deploy to maven)

执行如下

# it won't work now because we havn't configure the template file in the pom
mvn clean deploy -f target/generated-sources/swagger/pom.xml

此处deploy需要一个Nexus私服,这个自己搭

使用桩

在其他需要此项目的pom文件中加入如下

<dependencies>
    <dependency>
        <groupId>com.github.miao1007</groupId>
        <artifactId>sdk</artifactId>
        <name>sdk</name>
        <!-- you can customize your own timestamp -->
        <version>20180724-SNAPSHOTS</version>
    </dependency>
</dependencies>

通过上述流程,你的代码生成器应该就可以用了,但是默认模版还有以下问题

  • 不支持Eureka的value属性(Do not support Eureka's dynamic naming service),而是hard coding url
  • pom过于简单,不支持上传源码(maven-source-plugin)

定制模版

首先在根目录下创建文件夹template

然后,你需要覆盖文件的形式定制,从这里下载需要定制的文件,并放到刚刚的template目录

# 注意不需要文件夹层次
https://github.com/swagger-api/swagger-codegen/tree/master/modules/swagger-codegen/src/main/resources/JavaSpring

定制apiClient动态模版

举个例子,需要支持基于yaml获取Eureka的name,那么需要进行如下定制,此处path相当于tomcat的contextPath,原版的模版中并不支持

File: template/apiClient.mustache

package {{package}};

import org.springframework.cloud.netflix.feign.FeignClient;
import {{configPackage}}.ClientConfiguration;

{{=<% %>=}}
@FeignClient(name="${<%groupId%>.name}", path="${<%groupId%>.path}")
<%={{ }}=%>
public interface {{classname}}Client extends {{classname}} {
}

然后生成的代码如下

//generated file by mustache
@FeignClient(name="${com.github.miao1007.name}", path="${com.github.miao1007.path}")
public interface PetApiClient extends PetApi {
}

接着我们在客户机的application.yaml中配置即可

# eureka client config example 
io:
  github:
  	miao1007:
  	  sdk:
  	  	# eureka's name
        name: EUREKA-ORDER-PROD
        # tomcat's context path
        path: /context

然后就可以像往常一样注入服务即可

@Autowired
private PetApiClient client;
//use
client.queryBy...

如果你想明白底层原理的话,可以看这里

定制POM源码模版

同理,由于默认模版中只上传了jar,导致用户使用时参数可能是var1, var2,这里可以通过配置源码插件实现

File: template/pom.mustache

+ <plugin>  
+    <groupId>org.apache.maven.plugins</groupId>  
+    <artifactId>maven-source-plugin</artifactId>  
+    <version>2.1.1</version>  
+    <executions>  
+        <execution>  
+            <id>attach-sources</id>  
+            <phase>package</phase>
+            <goals>  
+                <goal>jar-no-fork</goal>  
+            </goals>  
+        </execution>  
+    </executions>  
+ </plugin>

以及nexus上传定制

+ <distributionManagement>
+     <repository>
+         <id>releases</id>
+         <!-- your nexus url -->
+         <url>http://127.0.0.1:8081/nexus/content/repositories/releases</url>
+     </repository>
+     <snapshotRepository>
+         <id>snapshots</id>
+         <url>http://127.0.0.1:8081/nexus/content/repositories/snapshots</url>
+     </snapshotRepository>
+ </distributionManagement>

这个不属于本文范畴,可以自行学习

Feign File upload

这个地方简直天坑了

  • 首先Swagger生成的代码有问题,没有@ParamPart,导致上传无法使用,详见解决办法#8419
  • Feign代码写的水平远不如Retrofit/OkHttp优雅,它不支持免配置上传二进制文件,我目前的解决如下

依赖如下

<dependency>
    <groupId>io.github.openfeign.form</groupId>
    <artifactId>feign-form-spring</artifactId>
    <version>3.3.0</version>
</dependency>

全局配置如下,需要被扫描

// Feign client config
@Configuration
class FeignConfig {
    
    @Autowired
	private ObjectFactory<HttpMessageConverters> messageConverters;
    
    @Bean
    public Encoder feignEncoder() {
        Encoder dft = new SpringEncoder(this.messageConverters);
        Encoder form = new SpringFormEncoder();
        return new Encoder(){
          public void encode(Object object, Type bodyType, RequestTemplate template) {
              if (bodyType == MultipartFile.class) {
                form.encode(object, bodyType, template);
              } else {
                dft.encode(object, bodyType, template);
              }
            }
        };
    }
}

为什么不单独在接口中独立写Encoder呢?这样写的问题是,Feign内部的FeignContext使用name作为key,configuration作为value,因此如果你这里定制了不同的configuration,那么相同name下的configuration将被覆盖,详见FactoryBean中实现。

总结与建议

Feign与Swagger的结合可以说是一堆问题,当然网上并没有像Dubbo那么完善的方案,因此需要注意

  • 如果使用Controller作为RPC的实现,那么在写Controller时一定不要用Map作为入参出参,这样RPC序列化时将无法使用。我在项目中发现了很多外包这样写,导致后期维护成本较高。再次感慨招人与静态检测的重要性。
  • 如果需要鉴权,那么不用桩中支持,而是直接外部全局配置拦截器即可,在服务端的业务代码中也不要加入鉴权相关的@ApiParams
  • 如果使用基于编码TCP的形式进行RPC,那么需要自己定制模版,但是SpringCloud的调用链,日志均等生态就无法使用了

基于SpringCloud的分布式定时调度任务

本文讲如何设计一款对开发者友好的定时任务调度框架。

根据相关规定,具体实现将不开源。不过本文可以介绍此框架的分析过程以及使用的技术。

如果当前有PaaS,那么基于PaaS实现最推荐:比如Kubernetes、Nomad进行容器级、无状态的调度。不推荐阅读下文。

1. Backgroud

定时任务几乎在每个项目都广泛存在,普通项目中一般通过注解或者Bean等形式进行管理Job。不过当你的任务比较多时,你就会发现来坑了(开源软件啥都有,但啥都不全) ——

  • Quartz调度过程为黑盒,无法查看历史,也不支持各种调度管理(暂停/新增/下线/立刻执行等)功能
  • 基于Xml/Java的Bean很难统一维护,后面接手的人谁也不敢动
  • Quartz如同它的名字(QuartzCrystal,晶振,也就是时钟发生器,类似CPU中的PLL电路),只是触发器而已,它本身不要与负载均衡混到一起(内部本身通过竞争Row锁实现,连均分都做不到)

虽然当当等调度开源框架进行了二次定制,但是它们仍然属于库文件,侵入式太强(需要继承某个类或者注解等代码污染),因此把上面的逻辑SAAS(Schedule-As-A-Service)化可能是更好的选择

2. Requirement

针对以上问题,做出如下愿景

服务化

将原有的Java方法调用换为RPC,调度逻辑(触发实现,同步还是异步)与业务分离

Quartz新定时任务
调度逻辑硬编码到Java/XML中,修改需要重启通过外部DSL(比如JSON)以Web界面的形式实时管理(暂停/新增/下线/立刻执行等),作为调度客户端RPC请求执行器
侵入性与业务框架强耦合连Quartz都不用引入,通过Eureka以覆盖网络的形式进行节点间通信。甚至连Eureka都可以不用,而使用Nginx作为命名服务。
运维特性黑盒能够记录JobHistory,以便以后有据可查

用户侧(执行器)需要做什么改造?

  1. 用户侧暴露任务的Endpoint,最简单的是暴露为RestController
  2. 用户在定时任务Web管理界面上传配置,如下,其中url中的host是VIP,VIP的可用性由执行器自己保证
{
    "name": "SendMail",
    "url": "http://eureka-busi-sz1/app/task/sendmail.task",
    "cron": " */2 * * * *",
    "fail": "miao1007@gitbook.com"
}

这样,定时任务调度就会在相应的时间间隔发送RPC请求,客户自己就不用维护调度逻辑了。

3. Action

针对上面的需求,我们可以把

3.1. 定时任务后台RPC请求流程图

总体设计如下,只要服务注册到Eureka即可获取到IP,并执行RPC

sequenceDiagram
  	Scheduler->>EurekeServer: "eureka-busi-sz1"的IP是多少
    activate EurekeServer
    EurekeServer-->>Scheduler: 它的IP是[a,b,c]
    deactivate EurekeServer
    loop 客户端负载均衡
        Scheduler->>Scheduler: 选出一个IP
    end
    Scheduler ->> Business: RPC call
    activate Business
    Business -->> Scheduler: 完成业务
    deactivate Business

高可用方案

  • 定时任务本身复用Quartz的统一数据库中心,多台横向部署即可。部署时建议在相同的Region中,时间间隔必须小于1s
  • EurekaServer: 参考Netflix的HA教程,配置多台即可
  • Business高可用: 取决于业务,一般来说横向部署多台,注册到Eureka即可。

3.2. Eureka+OkHttp+Quartz解决方案

针对上述选型,实现如下功能

  • Web界面: 对于编程人员全部通过鼠标或者脚本导入导出定时任务,我个人通过 Angular搞了一套简单的表格式管理页面
  • 调度管理: 通过对Quartz的Scheduler进行封装暴露,使用JobStoreTX(持久化)
  • 分布式锁: 通过Quartz默认内置Row级Lock(SELECT FOR UPDATE)
  • 命名服务: 通过Eureka实现服务实例的获取(代码中利用OkHttp的DNS进行Override)
  • 负载均衡: 使用Ribbon,如果你不喜欢它的数组轮询方式,可以自己实现IRule。这里需要结合业务实现,无状态的数组轮询/Hash即可,有状态的需要维护一个结构体
  • RPC: 默认HTTP阻塞调用,不过受限于SpringMVC/Tomcat的超时,此方案不支持获取阻塞时间过长的任务结果(这类更应该用MQ来实现,比如索引任务,这类可以参考SofaBoot的实现),当然如果你只要求能发送请求而不在乎返回结果,那么这个功能就已经够了,可以考虑加入回写。这个是方案的短板,长期阻塞IO将导致线程并发数能力弱。
  • RPC鉴权: 此部分与调度是无关的,但是可以采用定制Header的Interceptor等方法实现

最终通过SpringBoot一个Jar包进行多机部署,此架构单点故障主要在中心的JobStore(比如数据库)上

3.3. Quartz的定制

本文不会过多涉及到非Eureka的篇章,因此建议如下

  • 使用Plugin注册Job监听器,并模仿LogBack的DBAppender实现历史日志记录,如果你的日志是非结构化的(比如返回了JSON报文),我建议放到ElasticSearch上
  • 多看多学Quartz的调度API,并封装为RESTful接口
  • 由于是给所有小组用,而且很多人都喜欢放在6点跑定时任务,因此Quartz的执行线程池必须调大,而不是默认的10条,否则会Missfire

3.4. 定制OkHttp的Dns接口实现命名服务

Eureka从客户端来看,它非常像HTTP DNS。虽然在Android中经常使用,不过我们的Quartz也相当于是客户端,因此可以在Quartz中的请求Job的RPC中,定制如下

LoadBalancerClient client;
final Dns eurekaDns = new Dns() {
    @Override
    List<InetAddress> lookup(String vip) throws UnknownHostException {
        if (!vip?.trim()){
            throw new UnknownHostException("hostname is null")
        }
        ServiceInstance instance = client.choose(vip)
        if (instance){
            String realHost = instance.getHost()
            return InetAddress.getAllByName(realHost)
        } else {//这种场景仅用于输入的不是VIP
            return Dns.SYSTEM.lookup(vip)
        }
    }
}
OkHttpClient ok = new OkHttpClient.Builder().dns(eurekaDns).build()
//注意这里输入的HostName是Eureka中的VIP(Virtual IP)
Request request = new Request.Builder().url("http://eureka-instance/api/xxx.json").build()
ok.newCall(request)

这样,非常简单复用了各种开源软件的接口,没有造一点轮子,就搞定了找服务的问题

  • OkHttp自动帮你Parse了URL,因此你不用自己手动获取HostName,OkHttp内部的双端Dequeue队列可以保证所有任务按序发送。
  • Ribbon帮你实现了负载均衡,注意这里是客户端负载均衡算法,本文场景仅供应用服务层调度,不支持cgroup/namespace层的调度(比如nomad/k8s)。
  • Eureka帮你实现了VIP的服务发现(这里被Ribbon给封装了看不见)
  • InetAddress.getAllByName这个调用只是本地拼装IP,没有再次进行DNS请求

此方案的好处是,假如后续你的服务换成了Consul/etcd/kubernertes,也能无损切换。定时任务就要做到功能单一,不要和“调度”混到一起。

4. Summary

项目收益

上述方案已经在生产环境中使用,目前收益如下

  • 比较俗的,就是个人搞到一个演讲分享的机会,然后部门内评了一个奖并推广
  • 通过定时执行日志统计出了很多特殊业务场景下的失败BUG并改进,晚上睡觉再也不心虚了
  • 任务调度与业务真正解藕,没有代码污染,业务代码中可以只留一份JSON作为存档
  • 统一邮件警报功能,任务挂掉后马上掌握

特别发现了一些严重的编码问题,比如某些不负责的员工代码中把异常全局try/catch住,打印一个报错,就认为“我已经处理好异常”了,这种代码导致了很多任务看似成功实则失败,这种行为的确让人心累,以后招聘一定要好好把关!

方案缺点

虽然本方案在小项目组内(也就是200多个任务)完全ok,但是

  • 阻塞架构:受限于被请求端的超时配置,不支持长阻塞任务获取结果;过多的长阻塞任务占用数据库大量连接池。需要用MQ/Webhook来进行回写处理(没做)
  • 无法检测到Missfired的任务(因为只配置了Job监听器,而没有配置Trigger监听器)
  • 受限于Quartz本身处理Missfired的逻辑(可以看那个For循环源码),重启后cron任务会一次性跑完,导致可能影响业务,比如邮件集中发一堆被客户吊(这个需要定制Quartz源码)

5. 附录

其它技术调研

通过技术调研(搜索开源项目),有如下方法

  • 最原始的crontab+curl脚本实现Trigger: 这样一套做下来,基本上除了开发本人,谁也不敢碰代码了。缺点也非常明显,Shell维护性差
  • 使用Quartz+分布式锁实现调度(现有状态): 目前还算是比较主流的实现,毕竟不是所有的业务都需要“云化”,很可能几台就够用了,缺点是每台机子都要折腾自己的Bean,只要有人离职就心慌
  • 使用zk+Quartz实现: 比较完善的一种方案,当然这套系统需要自研要耗费人力,而网上似乎只有当当网的分布式作业框架elastic-job进行了开源,这个写的很有水平,但是当当的是通过zk注册Bean实现调用Java类,对老系统侵入较强: 需要引入新的Jar包(开源合规/信息安全等等),还要继承一个黑盒Class,同时它对zk暴露的是Class,不支持微服务RPC调用。
  • 某闭源zk实现: 某电信级项目中间件,通过定制Zk服务发现,自研线程池与DSL脚本实现,此方案优点是高度定制适配业务,缺点是普通业务团队养不起。
  • 其它国产框架: XXL-Job等框架. 虽然架构框图画一堆,评奖也很多,但是并没有看出针对上述Quartz缺点(比如阻塞,抽取Loadbalancer,Misfire,上下文持久化)的定制改造,说白了就是一个Wrapper而已,没有达到中间件团队的能力。虽然我认可作者的能力,但我对国产(包括阿里的)开源还是比较谨慎的。(国产开源特点: 面相KPI与个人品牌而开源)

最后结论是,现有的开源调度系统都有侵入式,而且都不支持微服务,因此只好自己进行开发了,上面是我的探索思路,希望会有帮助。我个人还是建议项目组自行定制定时任务,因为每个项目的鉴权,微服务框架,业务执行时间都是不同的,这也是业界好用的不开源,开源的不好用的原因。

附加题:调度器实现(Bin Packing Systen Design)

某项目需要专门的编译器集群去跑编译任务

  • 任务是一堆代码,它将消耗一种特殊的硬件进行运行。
  • 某种硬件设备集群,有两个指标,一个是存放软件源码的磁盘容量【S】,一个是编译速度【V】MB/H;编译器始终高可用,但是可以中途新增编译器;不考虑磁盘加载等非编译时间消耗。
  • 软件项目不能分拆到两个编译器上跑;同一个编译器可以跑多个项目。

求给出两个调度设计(均分负载到多台/优先堆满一台)的SASS的方案。如果读者有兴趣的话,可以联系本人内推。

Eureka服务端

本部分主要是对Eureka内部数据结构的维护与集群复制的分析,相比于其它源码分析类文章,本文更像是实验过程,而不是粘一堆代码后得出结论。

搭建环境

配置单机版Host

编辑本机的Hosts(/etc/hosts)

127.0.0.1 peer1
127.0.0.1 peer2
127.0.0.1 peer3 

搭建测试环境

配置如下配置属性,详见High Availability, Zones and Regions,注意defaultZone在Server侧需要写全,在Client侧可以写部分,否则没写的不会收到心跳信息,导致Server报错"Renews threshold"

eureka:
  client:
    serviceUrl:
      defaultZone: http://peer1:8081/eureka/,http://peer2:8082/eureka/,http://peer3:8083/eureka/
  instance:
    appname: eureka-server
---
spring:
  profiles: peer1
eureka:
  instance:
    hostname: peer1
    instance-id: peer-id1
server:
  port: 8081
---
spring:
  profiles: peer2
eureka:
  instance:
    hostname: peer2
    instance-id: peer-id2
server:
  port: 8082
---
spring:
  profiles: peer3
eureka:
  instance:
    hostname: peer3
    instance-id: peer-id3
server:
  port: 8083

然后再加入@EnableEurekaServer注解,就完成搭建一个Server了,依此启动这三个profile的SpringBoot程序

如果搭建成功,应该可以发现下面三个地址可以访问了

http://localhost:8081/
http://localhost:8082/
http://localhost:8083/

在启动过程中,不断刷新如下地址,可以发现apps__hashcode会不断更新

http://localhost:8081/eureka/apps

执行分析

打开Wireshark,分析localhost网卡,进行如下过滤

# 发送与接收
http&&(tcp.port==8081 || tcp.port==8082 || tcp.port==8083)
# 接收
http&&(tcp.dstport==8081 || tcp.dstport==8082 || tcp.dstport==8083)

EurekaServer如何维护命名服务

本文只讲EurekaServer如何装箱的过程,后续再详细讲Eureka本身的细节。

启动分析

@startuml
Bob -> Alice : hello
@enduml

从注解开始分析

打开@EnableEurekaServer这个注解,可以发现了一个空的Class

org.springframework.cloud.netflix.eureka.server.EurekaServerMarkerConfiguration.Marker

通过以往经验,这个Marker可能是一个flag标记,对它进行findUsage分析

可以发现它是如下JavaConfig的前置条件

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration {}

这里导入的Bean太多,如果强行分析也是可以,但是很容易陷入Spring的细节中,投入时间划不来。仔细看可以发现javax.ws这种涉及到RESTful接口的,打上断点

EurekaServerAutoConfiguration#jerseyApplication

分析它内部扫描的Class,整理所有接口如下

@Path(value=/{version}/instances)
@Path(value=/{version}/status)
@Path(value=/{version}/vips)
@Path(value=/serverinfo)
@Path(value=/{version}/svips)
@Path(value=/{version}/peerreplication)
@Path(value=/{version}/apps)
@Path(value=/{version}/asg)

在之前的Client抓包分析中,客户端通过HTTP请求进行RPC,所以只要分析断点上述Server端的接口,即可明白其内部工作原理了。

Eureka这里使用了jersey作为它的路由实现,在Filter中代替了Servlet的工作,这点类似Struts框架。

此外进行全局搜索,发现Spring也用mvc包装了部分Controller

org.springframework.cloud.netflix.eureka.server.EurekaController

断点请求

获取APP列表

比如最简单的接口

GET /eureka/apps/ HTTP/1.1

通过上面的jerseyApplication可以发现如下类负责此接口

ApplicationsResource#ApplicationsResource(EurekaServerContext)

在这个类中的构造函数与路由位置,各打上断点后启动,发现没有断上,分析可能是lasyload模式,接着上curl

curl localhost:8761/eureka/apps

这次断点成功了,接着按照调用栈,在如下位置打上断点,可以发现Eureka在维护Apps使用了如下结构

ResponseCacheImpl#get(Key, boolean)

获取Instance列表

EurekaServer的集群间复制(Replicate)

首先按照上一章节搭建环境并用wireshark抓包后,可以发现它们内部通过三个轮询请求进行同步

POST /eureka/peerreplication/batch/
PUT /eureka/apps/EUREKA-SERVER/pc_name:8081?status=UP&lastDirtyTimestamp=15575xxxxxxxx
GET /eureka/apps/delta

PeerReplication流程

我们依然不先看源码,先搞清楚RPC,以peer1为例

# 找到peer1的连接信息
lsof -i -n -P |grep  `ps aux|grep java|grep peer1| awk '{print $2}'`

可以看出EurekaServer连接的关系是P2P关系,三个Peer两两相连

用Wireshark进行调试

# 所有发送给Peer1的请求
http.request.uri contains "peerreplication" && tcp.dstport==8081

可以发现这是一个30s间隔的轮询(scheduled updates)

然后在接收侧打上断点

com.netflix.eureka.resources.PeerReplicationResource#batchReplication

通过断点照成的超时,找到发送侧的报错stacktrace

com.netflix.eureka.cluster.ReplicationTaskProcessor#process

这样就可以迅速分析源码了

renew流程

对客户端的ClientUrl进行追踪,可以发现拼装侧在如下位置,来自配置文件

com.netflix.eureka.cluster.PeerEurekaNodes#resolvePeerUrls

以peer1为例,它将for循环依此请求peer2与peer3,在接收侧接收HTTP请求后,同Client一样的方法,更新自己的Memory内存中的信息,这些过程是没锁的对所有地址的广播( broadcast replication)。

好吧,就写到这吧,我个人认为没有必要继续分析下去,这个方案太浩大了,个人目前只能给出工程断点,缺少视野,否则接着只能写出粘贴源码类的文章了。

第三方组件

本部分介绍Eureka与第三方组件的集成

  • 基于注解的Feign与负载均衡

Feign简介与Spring代理自动注入

Feign是一款Java的基于注解的HTTP客户端,主要在服务端使用较广,属于Netflix系列。本文从动态代理开始介绍,然后推广到如何分离硬编码,到最后基于Spring的扫描装配实现。

面向读者

  • 有一定的Feign, Retrofit, Mybatis等基础
  • 了解Spring,反射,动态代理等技术

读者将学到

  • Feign的关键代码位置
  • 如何持续改进项目,降低硬编码
  • 如何自己实现Spring的自动注入

Feign的简单介绍

注解与动态代理

Feign本身只是一个注解的Parser,并没有负载均衡的功能。它与Retrofit类似,通过注解这种外部DSL拼装出容易理解的HTTP请求,并通过JDK动态代理实现

feign.ReflectiveFeign#newInstance

默认的Handler实现

feign.InvocationHandlerFactory.Default#create

默认动态代理调用实现是

feign.SynchronousMethodHandler#invoke

网络请求客户端

网络传输Client是通过对OkHttp/自带/RxJava等客户端的包装实现,只用实现Client接口,就可以对业务进行定制。比如自带的如下

// 自带网络请求实现
feign.Client.Default#execute

再比如SpringCloud中的负载均衡客户端如下

// Ribbon负载均衡
org.springframework.cloud.netflix.feign.ribbon.LoadBalancerFeignClient#execute

总的来说,有读过Retrofit的人再看这个难度不大,相反由于使用了Java8,代码量进一步减小

上述过程可以参考Retrofit的相关资料,比如参考这里

使用Feign一步步简化业务硬编码

在IT业务开发中,一般会涉及到与其它小组进行对接,其它组可能并没有用Eureka,而是采用了传统的HTTP接口。这时问题就来了,如果有多个业务需要拉通,就需要维护多个URL,传统的手段是采用多个HTTPClient类进行拼装与解析JSON,但是效率非常低(比如某些外包,拿着上万的工资,宁可复制20份url,写20个HTTPClient,也不想办法去思考改进,害人害己,30岁就废了),这个代码例子就不举例了。

对此,我们可以参考MybatisSpring的Mapper搜索,SpringCloud等源码中的实现方法,并进行学习吸收,实现简化与第三方的拉通。

使用Feign干掉模版代码

假如说,我们现有业务系统需要集成对接一个HTTP DNS的服务

# 下为DNSPod的例子
$ curl http://119.29.29.29/d?dn=gitbook.com
104.25.212.20;104.25.213.20  

现在我们可以仿照Feign官网的例子,写一个Demo

DNSPodService service = Feign.builder().decoder(new Decoder() {
    @Override
    public List<String> decode(Response response, Type type) throws IOException, DecodeException, FeignException {
        // 下面没有校验,为了节约版面
        String s = Util.toString(response.body().asReader());
        if (s.contains(";")) {
            return Arrays.asList(s.split(";"));
        }
        return Collections.singletonList(s);
    }
}).target(DNSPodService.class, "http://119.29.29.29");
List<String> ipInfo = service.getIpInfo("gitbook.io");

接口如下

// 下面的dn是硬编码,不要在意这些细节,因为我没有引入复杂的Encoder
public interface DNSPodService {
    @RequestLine("GET /d?dn={domain}")
    List<String> getIpInfo(@Param("domain") String domain);
}

这样,我们的一个服务就完成了,当接口中的方法比较多时,相对于纯HTTPClient的写法,这种收益就比较明显了。

通过注解替换Java中的地址硬编码

上面的代码中的Url是在Java中硬编码写死的,这样肯定不好维护。我们考虑加入如下注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface HTTPUrl {
    String value() default "";
}

然后加到接口中

@HTTPUrl("http://119.29.29.29")
public interface DNSPodService {
	...
}

接着,使用反射获取值

HTTPUrl anno = DNSPodService.class.getAnnotation(HTTPUrl.class);
// 此处可以用CucurrentHashMap<Class,String>来做缓存
String url = anno.value();
// 下文省略了Decoder
Feign.builder().target(DNSPodService.class, url)

通过注解标记,看起来更加连贯,读者心理上阅读代码会更加顺畅,看到这个接口就能想到有哪些服务需要维护。

通过Properties/配置中心/数据字典干掉注解硬编码

虽然这样修改有了一定改进,但是这样做仍然没有解决硬编码的问题,我们首先修改接口为如下

+ @HTTPUrl("dnspod.addr")
- @HTTPUrl("http://119.29.29.29")
public interface DNSPodService {

}

接着新增一个命名服务的接口

// 此接口将读取"dnspod.addr",并返回真正的url
public interface Resolvable {
    String resolve(String key);
}

然后我们先实现一个基于properties的实现类

public class PropertiesResolvable implements  Resolvable {
    Environment env;
    public PropertiesResolvable(Environment env) {
        this.env = env;
    }
    @Override
    public String resolve(String key) {
        return env.getProperty(key);
    }
}

然后在application.properties中配置好这个kv,接着跑起来

// 你的SpringBoot项目实现CommandLineRunner接口
@Autowired
Environment env;

@Override
public void run(String... args) throws Exception {
    HTTPUrl anno = DNSPodService.class.getAnnotation(HTTPUrl.class);
    String value = anno.value();
    Resolvable resolvable = new PropertiesResolvable(env);
    String realAddr = resolvable.resolve(value);
    // 下文省略了Decoder
    DNSPodService service = Feign.builder().target(DNSPodService.class, realAddr);
    List<String> ipInfo = service.getIpInfo("gitbook.io");
    System.out.println("ipInfo = " + ipInfo);
}

可以发现,目前我们实现了将硬编码管理转移到了Properties上,同时没有丢失Java代码的可读性

使用配置中心/数据字典的例子同上,只要实现了Resolvable接口即可热部署。

使用Spring与扫描器实现自动注册

制定目标----通过@Autowired自动生成Feign实例

我们希望通过自动/主动装配实现Feign自动生成接口

@Autowired
private DNSPodService service;
//然后直接使用

在本文开始时,曾经讲过可以参考Mybatis扫描Mapper的形式,或者SpringCloud的形式实现Feign的自动注入。

比如Mybatis通过如下注解实现注入

org.mybatis.spring.boot.autoconfigure.MybatisAutoConfiguration.AutoConfiguredMapperScannerRegistrar

再比如SpringCloud中通过如下注解实现注入

org.springframework.cloud.netflix.feign.FeignClientsRegistrar#registerFeignClients

准备工作

我们首先仿照Mybatis等框架,写一个@Enable的注解

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(ClientRegistrar.class)
public @interface EnableScanFeign {
    String path() default "";
}

并注解到Application中

@SpringBootApplication
@EnableScanFeign(path = "com.example.demo.feign")
public class DemoApplication{}

@import这个注解除了直接注入各种Config外,也可以使用自定义的Selector

接着实现ClientRegistrar,它将在启动时读取注解的上下文


public class ClientRegistrar implements ImportBeanDefinitionRegistrar {

    @Override
    public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
        String path = ((String) metadata.getAnnotationAttributes(EnableScanFeign.class.getName()).get("path"));
        ResolverUtil<Class<?>> resolverUtil = new ResolverUtil<Class<?>>();
        resolverUtil.findAnnotated(HTTPUrl.class, path);
        Set<Class<? extends Class<?>>> classes = resolverUtil.getClasses();
        System.out.println("classes = " + classes);
    }
}

这样如果顺利的话,你就可以看到被扫描的接口了,不过目前这些只是接口,而没有实现类

为了降低文章难度,我们这里借用了Mybatis的VFS工具类,因此暂时需要导入Mybatis的包,你可以学习扫描jar包是如何实现的。此外,还有更简单的ClassPathMapperScanner或者更简单的ClassPathScanningCandidateComponentProvider也可以学习一个

Bean的注册流程

在Spring第三方库的开发中,我们可以从中学到常见的注册方法如下

// 接着上文的System.out来写
classes.stream()
    .map(this::generateHolder)// todo 生成BeanDefinitionHolder
    .forEach(holder ->
         BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry)
);

其中generateHolder就是我们抽出来的需要完善的方法,本文没有直接用Spring自带的来生成

配置FactoryBean

FactoryBean也是一种Bean,在真实项目中一般定制的就是这里

public static class ClientFactoryBean implements FactoryBean {

    // 这里通过properties写入
    private Class type;
    private String path;

    public Class getType() {
        return type;
    }
    public void setType(Class type) {
        this.type = type;
    }
    public String getPath() {
        return path;
    }
    public void setPath(String path) {
        this.path = path;
    }
    @Override
    public Object getObject() throws Exception {
        return Feign.builder()
            .decoder(/*同上*/)
            .target(DNSPodService.class, path);
    }
    @Override
    public Class<?> getObjectType() {
        return type;
    }
    @Override
    public boolean isSingleton() {
        return false;
    }
}

BeanHolder的实现

这里其实很简单,就是首先构造一个beanDefinition,接着通过property传递参数给Factory即可

// 带 Aware 的继承,都可以在生命周期中获得某个上下文对象
public class ClientRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {

    Environment environment;

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    // 除了样板代码外,主要就是传递property
    private BeanDefinitionHolder generateHolder(Class<? extends Class<?>> aClass) {
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(ClientFactoryBean.class);
        beanDefinitionBuilder.addPropertyValue("type", aClass);
        String value = aClass.getAnnotation(HTTPUrl.class).value();
        beanDefinitionBuilder.addPropertyValue("path", environment.getProperty(value));
        BeanDefinition beanDefinition = beanDefinitionBuilder.getBeanDefinition();
        return new BeanDefinitionHolder(beanDefinition, aClass.getName());
    }
}

测试用例

测试如下,发现只要是Autowired即可自动注解

@SpringBootApplication
@EnableScanFeign(path = "com.example.demo.feign")
public class DemoApplication implements ApplicationRunner {

    @Autowired
    DNSPodService service;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
    
    @Override
    public void run(ApplicationArguments args) throws Exception {
        List<String> ipInfo = service.getIpInfo("www.qq.com");
        System.out.println("ipInfo = " + ipInfo);
    }
}

Ribbon与客户端Load balance

Qualifications Before Read

  • Experience in RxJava/Promise
  • Experience in Feign/Retrofit' inteceptor
  • Familiarity with debug tools

SpringCloud层分析

我们基于拦截器机制并直接进入到了如下位置

org.springframework.cloud.netflix.feign.ribbon.LoadBalancerFeignClient

最终实现类是

com.netflix.client.AbstractLoadBalancerAwareClient#executeWithLoadBalancer

如下,非常类似于NodeJS/Netty中的await Promise,也就是基于事件队列的实现机制,这类代码的特点就是写起来非常爽,读起来需要一定阅读量(如果阅读RxJava困难的话,可以学习前端的Promise加速理解)。

为了节约版面,我将所有的异常处理全部去掉了,代码核心在submit中,下面的代码只是一个柯里化

LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
// 提交后返回的是一个Promise,并没有立刻获取到
return command.submit(
    // 这里的入参是一个Operation接口,初学者可能看不懂代码是怎么来回跳转的
    // 实际上它本质是柯里化(Currying),用Interface模拟函数
    new ServerOperation<T>() {
        @Override
        public Observable<T> call(Server server) {
            // 此处的Server已经是**负载均衡后**的了,后面请求为原生HTTP请求
            URI finalUri = reconstructURIWithServer(server, request.getUri());
            S requestForServer = (S) request.replaceUri(finalUri);
            return Observable
                .just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
        }
    })
    // 类似于前端的await,在Java中用CountDownLatch实现
    .toBlocking()
    .single();

整理后,获取单个Server的关键代码如下,主要在submit中这一行

// 获取当前的服务列表xml,并缓存到本地
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);

本人不会全屏贴代码供读者参考的,因此跳转分析技巧需要自己多练习


ILoadBalancer分析

在Ribbon中,请求关键伪代码如下

a-->b: getLoadBalancer
b-->s: chooseServer
s-->Server: IRule

整个流程与ElasticSearch等客户端框架相差不大,比如最常见的com.netflix.loadbalancer.BaseLoadBalancer

默认负载均衡算法(RoundRobinRule)与Elastic完全一样,也是通过数组取余进行计算

  • RoundRobinRule: 通过数组环与Atomic自增取余

Sleuth与服务调用日志收集

在SpringCloud中,官方推荐使用Sleuth与Zipkin实现日志的收集与可视化,本文将介绍一下日志收集的主要流程

注意: Zipkin仅对RPC通信过程进行记录,注意它与业务代码日志是无关的,如果你希望找到一款LogAppender来分析所有Log4j留下的日志,那么建议还是使用Kakfa+ELK这种传统的方法来实现。

术语

  • Span/Trace: 来自Google的Dapper论文,Span表示单个调用的间隔描述,Trace表示Span的集合,在Zipskin中作为Span的JSON数组
  • spring-cloud-starter-sleuth: 英文名是侦探,它的功能是在项目中自动为日志加入Tag与序列号
  • Zipkin: 来自Twitte的分布式日志收集工具,分为上传端(spring-cloud-starter-zipkin,集成到项目中)与服务端(独立部署,默认将数据存到内存中)

内部实现

综述

  • 调用侧请求中加入额外的Span序列号等上下文信息放入Header中(通过注入Feign定制Client实现)
  • 被调用侧通过全局Filter模拟AOP记录执行情况,计算执行情况与耗时,并存入定制的ByteBoundedQueue队列中,然后通过HTTP等将信息异步发送到Zipkin收集器中
  • Zipkin收集器通过UI显示调用详情

考虑到可能有通信的小伙伴,Zipkin实现与电话计费中的CDR(Call Detail Record, 呼叫详情记录)比较类似

使用前

假设有如下调用UpperCase的微服务调用,其中Feign是请求,UpperCaseService是被调用的微服务

sequenceDiagram
  	Feign-->>UpperCaseService: POST /uppercase?s=Test
    activate UpperCaseService
    UpperCaseService-->>Feign: HTTP 1.1/OK
    deactivate UpperCaseService

(上述过程具体可以去Feign中查看)

可以发现,如果服务出现异常/过载问题,我们只能配置Feign的LogLevel,翻本地Log文件了,效率比较低

使用Sleuth与Zip后

首先我们按照Tutorial搭建环境后,并在Zipkin上下载收集器后(此部分自行搭建),执行服务调用,在Zipkin的本地页面即可看到请求详细路线图。

总体修改后的流程如下

sequenceDiagram
  	Feign->>modifiedRequest: TraceFeignClient
  	modifiedRequest-->>TraceFilter: POST /uppercase?s=Test
  	TraceFilter->>UpperCaseService: 
    activate UpperCaseService
    UpperCaseService-->>Feign: HTTP 1.1/OK
    deactivate UpperCaseService
    TraceFilter->>Zipkin: async send log to collector


其中添加了如下组件

  • TraceFeignClient: 请求端注入的FeignClient,为Request的Header添加SpanID, TraceID等信息
  • TraceFilter: 接收端注入的定制Filter,它将解析Request中的Header,执行业务,计算耗时,最终算出一个完整的JSON格式的Span,通过队列异步发送到收集器ZipKin中
  • ZipKin: 日志收集器,读取JSON格式的SPAN信息,并存储与展示

源码分析

分析请求端源码,发现在请求端为Feign集成了新的Client,为请求加入了Span的Header编码

// add span http headers in http request
org.springframework.cloud.sleuth.instrument.web.client.feign.TraceFeignClient#execute

值得注意的是,这里添加Header是在Client中进行,而不是在Inteceptors中,因此就算Feign的日志级别配置为FULL,也无法看到真实发送的请求

分析接收端源码,发现新增了一个全局Filter以拦截Span请求,注意这里不是AOP,而是try/finally包装实现的

// org.springframework.cloud.sleuth.instrument.web.TraceFilter
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse,
                     FilterChain filterChain) throws IOException, ServletException {
    //...
    String name = HTTP_COMPONENT + ":" + uri;
    Throwable exception = null;
    try {
        // parse Span headers from request
        spanFromRequest = createSpan(request, skip, spanFromRequest, name);
        // do business
        filterChain.doFilter(request, new TraceHttpServletResponse(response, spanFromRequest));
    } finally {
        // add span to Queue with BoundedAsyncReporter#report
        // it will be sent in another thread
        detachOrCloseSpans(request, response, spanFromRequest, exception);
    }
}

接收侧在AsyncReporter专门开了一个消费者线程,用于异步发送缓存的Span信息

// 接收侧通过新开一个线程消费队列(ByteBoundedQueue)
final Thread flushThread = new Thread("AsyncReporter{" + sender + "}") {
    @Override public void run() {
         //...
         // 循环执行 zipkin2.reporter.AsyncReporter.BoundedAsyncReporter#flush
         // 通过 Condition 进行阻塞轮循获取Filter刚刚放入的信息
         while (!result.closed.get()) {
             result.flush(consumer);
         }
         //...
    }
};

总的来说,还是一套比较复杂的方案,原理挺好懂,但是让自己来写队列,异步就困难了。

REFFERENCE

  • http://tech.lede.com/2017/04/19/rd/server/SpringCloudSleuth/
  • https://wu-sheng.github.io/me/articles/metrics-tracing-and-logging

参考文献与Wiki

本部分搜集了Eureka的第三方资料

参考书籍

有关入门使用可以参考如下书籍