之前对Dubbo服务注册的流程细节进行了梳理,但是不能跳出细节对代码有个总体的概览,最近抽时间经过一个漫长的周期,对Dubbo服务注册进行了从上到下的整理,并且对各个对象中属性的作用进行了标注,本次是从 RegistryProtocol.export(final Invoker<T> originInvoker) 开始。
服务注册这里拆分为六部份
RegistryProtocol.export(final Invoker<T> originInvoker) 开始
//--------------//--------------//--------------//--------------//-------------
一、RegistryProtocol.doLocalExport(originInvoker);
1.RegistryProtocol INSTANCE; 本单例对象
2.Map<URL, NotifyListener> overrideListeners : 订阅的URL及订阅信息的回调对象 包含originInvoker对象信息
3.Map<String, ExporterChangeableWrapper<?>> bounds : 绑定关系集合
Key 为服务提供者 URL 对应的字符串
value : ExporterChangeableWrapper 对象 ExporterChangeableWrapper()构造参数是ListenerExporterWrapper 和 之前的originInvoker对象 其中ListenerExporterWrapper 为 DubboProtocol.export 返回的对象
DubboProtocol.export(Invoker<T> invoker)
1.Map<String, Exporter<?>> exporterMap 对象:
key 为 接口名:端口 com.alibaba.dubbo.demo.DemoService:20880
value: 为 DubboExporter 对象 , 1.包含key信息2.封装了原originInvoker和Filter链的Invoker对象 3.DubboProtocol.exporterMap对象的钩子
openServer(URL url)
2.ExchangeHandler requestHandler 为dubbo服务的请求处理器
3.Map<String, ExchangeServer> serverMap
key 为 暴露的本机 IP:PORT 如 10.8.0.49:20880
value 为 ExchangeServer对象 实际为HeaderExchangeServer
从里到外的责任链包装关系为:
1.ExchangeHandler.requestHandler
2.HeaderExchangeHandler 头信息事件处理器
3.DecodeHandler 解码器
4.public NettyServer(URL url, ChannelHandler handler)
-->super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
// ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME) 为url设置 "threadname" -> "DubboServerHandler-10.8.0.49:20880"
-->ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)) //再次包装DecodeHandler和URL信息
1.AllChannelHandler 其中有1.DecodeHandler 和2.服务暴露的URL信息 3.executor 名为threadname对应的线程池 用于多线程启动数据处理
(再把创建的线程池加入DataStore默认扩展对象中,对象中维护着private ConcurrentMap<String, ConcurrentMap<String, Object>> data key为java.util.concurrent.ExecutorService v为 HashMap key为暴露端口号,value为线程池对象)
2.HeartbeatHandler 心跳处理器 (处理心跳的处理)
3.MultiMessageHandler 批量信息处理器
-->super()
初始化NettyServer属性 url , handler, codec=DubboCountCodec , timeout=5000 , connectTimeout=3000, localAddress=/10.8.0.49:20880(本地暴露服务地址),bindAddress=/0.0.0.0:20880(绑定IP:PORT) , accepts=1000 , idleTimeout=600000心跳,ServerBootstrap bootstrap , channels , executor线程池 :从 DataStore中获取之前创建的线程池对象。
5.HeaderExchangeServer 属性信息:scheduled定时器 从server中获取channel 发送心跳消息 时间间隔heartbeat=60000,server=NettyServer ,heartbeat=60000,heartbeatTimeout=180000
serverMap.put key : 10.8.0.49:20880 value : HeaderExchangeServer
4. 最后返回 DubboExporter 对象
5. 经过ProtocolListenerWrapper包装 成ListenerExporterWrapper对象()DubboExporter对象和exporterListener对象
最终返回ListenerExporterWrapper对象
//--------------//--------------//--------------//--------------//-------------//--------------//--------------//
二、RegistryProtocol.getRegistry(originInvoker); 根据invoker的地址获取registry实例
//--------------//--------------//--------------//--------------//-------------//--------------//--------------//
三、URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
// dubbo://10.8.0.49:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&default.accepts=1000&default.threadpool=fixed&default.threads=100&default.timeout=5000&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&owner=uce&pid=19772&side=provider×tamp=1533183044031
//--------------//--------------//--------------//--------------//-------------//--------------//--------------//
四、registry.register(registedProviderUrl); 在zk节点上创建注册信息
ZookeeperRegistry.register(URL url)
1.registered.add(url)
2.doRegister(URL url)
1.1.zkClient.create /dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F10.8.0.49%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26default.accepts%3D1000%26default.threadpool%3Dfixed%26default.threads%3D100%26default.timeout%3D5000%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26owner%3Duce%26pid%3D19772%26side%3Dprovider%26timestamp%3D1533183044031 创建节点
//--------------//--------------//--------------//--------------//-------------//--------------//--------------//
五、URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl) 获取重写订阅节点URL
provider://10.8.0.49:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&default.accepts=1000&default.threadpool=fixed&default.threads=100&default.timeout=5000&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&owner=uce&pid=19772&side=provider×tamp=1533183044031
//--------------//--------------//--------------//--------------//-------------//--------------//--------------//
六、OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
OverrideListener 实现了NotifyListener, 当注册中心的订阅的URL发生变化时 回调重新的 export
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
-->ZookeeperRegistry.subscribe(URL url, NotifyListener listener)
-->1.subscribed ConcurrentMap<URL, Set<NotifyListener>> putIfAbsent(url, new ConcurrentHashSet<NotifyListener>()) 每个URL都对应着n个NotifyListener , 在 subscribed中添加 url 和 listener的对应关系
-->2.doSubscribe(url, listener); // 向服务器端发送订阅请求
--->zkListeners ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>>
public void childChanged(String parentPath, List<String> currentChilds) { // ChildListener 类型通知信息
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
--> listener.notify(categoryList);
}
--->zkClient.create(path, false); 创建 configurators 配置相关的持久化节点
--->ZkClientZookeeperClient.addChildListener(path, zkListener); zkListener为ChildListener类型 向zk发起节点订阅
path: /dubbo/com.alibaba.dubbo.demo.DemoService/configurators ,
zkListener为ChildListener的实例
// key 为/group/interface/configures 字符串, 上面创建的 ChildListener对象 value为 与注册中心监听对象 泛型
--> childListeners ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>>
--> createTargetChildListener(path, listener) 仅仅创建与注册中心监听对象适配的TargetChildListener 监听对象
--> addTargetChildListener(path, targetListener); // 向 Zookeeper ,真正发起订阅
---> notify(url, listener, urls); //首次订阅发出一次通知,让消费者完成初始化
--> 1.1UrlUtils.isMatch(url, u) //对interface,category,group 信息进行匹配
--> 1.2筛选出 匹配的u信息的 筛出 category 信息 例如(configurations) , 将其加入 Map<String, List<URL>> result 信息中
--> 2.1将 notified 中url相关信息取出 Map<String, List<URL>> categoryNotified = notified.get(url);
--> 2.2将 result中信息以 key为单位 覆盖 notify 中的信息。
--> 2.3saveProperties(url); 保存url信息
--> listener.notify(categoryList); List<URL> categoryList
--> 从originInvoker中获取原始的服务暴露的URL信息与 configurators里信息进行合并, 与bounds中获取Exporter中的URL信息进行对比,重新进行服务暴露 RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
实体属性定义
ZookeeperRegistryFactory 注册中心工厂
Map<String, Registry> REGISTRIES //注册中心信息
key : zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService
value : new ZookeeperRegistry(url, zookeeperTransporter);
ZookeeperRegistry 注册中心信息
AbstractRegistry
1.properties 发布服务接口 key 接口名称 value 对应配置信息 empty://192.168.20.218:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&default.accepts=1000&default.threadpool=fixed&default.threads=100&default.timeout=5000&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&owner=uce&pid=13492&side=provider×tamp=1532953268537
2.registryCacheExecutor 文件缓存定时写入
3.syncSaveFile:异步保存文件的标识
4.lastCacheChanged
5.registered Set<URL> : 注册中心已经注册过的URL列表
6.ConcurrentMap<URL, Set<NotifyListener>> subscribed : 订阅者列表 订阅者Entry里每个URL都对应着n个NotifyListener
7.ConcurrentMap<URL, Map<String, List<URL>>> notified : 回调通知数据存储
8.registryUrl : 注册中心地址 zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=&dubbo=2.0.0&file=&interface=com.alibaba.dubbo.registry.RegistryService&owner=uce&pid=×tamp=
FailbackRegistry
9.retryExecutor ScheduledExecutorService : 重试机制的线程池,当有失败的连接或者监听时,会进行重试 (默认5秒一次)
10.failedRegistered Set<URL>
11.failedUnregistered Set<URL>
12.failedSubscribed ConcurrentMap<URL, Set<NotifyListener>>
13.failedUnsubscribed ConcurrentMap<URL, Set<NotifyListener>>
14.failedNotified ConcurrentMap<URL, Map<NotifyListener, List<URL>>>
ZookeeperRegistry
15.anyServices Set<String>
16.zkListeners ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>>
17.root 根分组地址
18.zkClient zk客户端
ZkclientZookeeperClient 注册中心客户端
//--------------//--------------//--------------//--------------//-------------//--------------//--------------//
ZkclientZookeeperClient
1.url : zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&file=.cache/dubbo-registry.cache&interface=com.alibaba.dubbo.registry.RegistryService&owner=uce&pid=19772×tamp=1533183043787
2.stateListeners Set<StateListener> : 状态监听器
3.childListeners ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>>
// 保存 父节点监听器 String:节点url一般为/group/interface/configurators , ChildListener子节点监听器 ,TargetChildListener 监听器对应监听对象,注册中心相关
4.client : 实际注册中心客户端对象
5.state : 状态