β

聊聊Dubbo(五):核心源码-SPI扩展

Harries Blog™ 104 阅读

站在一个框架作者的角度来说,定义一个接口,自己默认给出几个接口的实现类,同时 允许框架的使用者也能够自定义接口的实现 。现在一个简单的问题就是: 如何优雅的根据一个接口来获取该接口的所有实现类呢?

JDK SPI 正是为了优雅解决这个问题而生,SPI 全称为 ( Service Prov id er Interface),即服务提供商接口,是JDK内置的一种服务提供发现机制。目前有不少框架用它来做服务的扩展发现, 简单来说,它就是一种动态替换发现服务实现者的机制

所以,Dubbo如此被广泛接纳的其中的 一个重要原因就是基于SPI实现的强大灵活的扩展机制 开发者 可自定义 插件 嵌入Dubbo,实现灵活的业务 需求

有人会觉得这就是建立在面向接口编程下的一种为了使组件可扩展或动态变更实现的规范,常见的类SPI的设计有 JDBC、JNDI、JAXP 等,很多 开源 框架的内部实现也采用了SPI。例如JDBC的架构是由一套 API 组成,用于给Java应用提供访问不同 数据库 的能力,而 数据 库提供商的驱动 软件 各不相同,JDBC通过提供一套通用行为的API接口,底层可以由提供商自由实现,虽然JDBC的设计没有指明是SPI,但也和SPI的设计类似。

1 JDK SPI扩展

JDK为SPI的实现提供了工具类,即 java .util.ServiceLoader,ServiceLoader中定义的SPI规范没有什么特别之处, 只需要有一个提供者 配置 文件( provider -configuration file),该文件需要在resource 目录 META-INF/services下,文件名就是服务接口的全限定名 。

  1. 文件内容是提供者Class的全限定名列表,显然提供者Class都应该实现服务接口;
  2. 文件必须使用UTF-8编码;

1.1 JDK SPI 示例

聊聊Dubbo(五):核心源码-SPI扩展
/**
 * SPI服务接口
 */
public interface Cmand {
    public void execute();
}
public class ShutdownCommand implements Cmand {
    public void execute() {
        System.out.println("shutdown....");  
    }
}
public class StartCommand implements Cmand {
    public void execute() {
        System.out.println("start....");
    }
}
public class SPIMain {
    public static void main(String[] args) {
        ServiceLoader<Cmand> loader = ServiceLoader.load(Cmand.class);
        System.out.println(loader);
 
        for (Cmand Cmand : loader) {
            Cmand.execute();
        }
    }
}

META-INF/services/com.unei.serviceloader.Cmand配置:

com.unei.serviceloader.impl.ShutdownCommand  
com.unei.serviceloader.impl.StartCommand

运行结果:

java.util.ServiceLoader[com.unei.serviceloader.Cmand]
shutdown....
start....

1.2 JDK SPI 原理

  1. 配置文件为什么要放在META-INF/services下面?

ServiceLoader类定义如下:

private static final String PREFIX = "META-INF/services/"; (JDK已经写死了)

但是如果ServiceLoader在load时提供Cla ssl oader,则可以从其他的目录读取。

  1. ServiceLoader读取实现类是什么时候 实例 化的 ?来看下ServiceLoader的几个重要属性:
private static final String PREFIX = "META-INF/services/";

// 要加载的接口
private Class<S> service;

// The class loader used to locate, load, and instantiate providers
private ClassLoader loader;

// 用于缓存已经加载的接口实现类,其中key为实现类的完整类名
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();

// 用于延迟加载接口的实现类
private LazyIterator lookupIterator;

public void reload() {
    providers.clear();
    lookupIterator = new LazyIterator(service, loader);
}
private ServiceLoader(Class<S> svc, ClassLoader cl) {
    service = Objects.requireNonNull(svc, "Service interface cannot be null");
    loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
    acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
    reload();
}

private class LazyIterator implements Iterator<S> {

    Class<S> service;
    ClassLoader loader;
    Enumeration<URL> configs = null;
    Iterator<String> pending = null;
    String nextName = null;

    private LazyIterator(Class<S> service, ClassLoader loader) {
        this.service = service;
        this.loader = loader;
    }

    public boolean hasNext() {
        if (nextName != null) {
            return true;
        }
        if (configs == null) {
            try {
                String fullName = PREFIX + service.getName();
                if (loader == null)
                    configs = ClassLoader.getSystemResources(fullName);
                else
                   configs = loader.getResources(fullName);
            } catch (IOException x) {
                fail(service, "Error locating configuration files", x);
            }
        }
        while ((pending == null) || !pending.hasNext()) {
            if (!configs.hasMoreElements()) {
                return false;
            }
            pending = parse(service, configs.nextElement());
        }
        nextName = pending.next();
        return true;
    }

    public S next() {
        if (!hasNext()) {
            throw new NoSuchElementException();
        }
        String cn = nextName;
        nextName = null;
        Class<?> c = null;
        try {
            // 遍历时,查找类对象
            c = Class.forName(cn, false, loader);
        } catch (ClassNotFoundException x) {
            fail(service,  "Provider " + cn + " not found");
        }
        if (!service.isAssignableFrom(c)) {
            fail(service, "Provider " + cn  + " not a subtype");
        }
        try {
            // 遍历时,才会初始化类实例对象
            S p = service.cast(c.newInstance());
            providers.put(cn, p);
            return p;
        } catch (Throwable x) {
            fail(service, "Provider " + cn + " could not be instantiated", x);
        }
        throw new Error();
    }

    public void remove() {
        throw new UnsupportedOperationException();
    }
}

1.3 JDK SPI ServiceLoader缺点

  1. 虽然ServiceLoader也算是使用的延迟加载, 但是基本只能通过遍历全部获取,也就是接口的实现类全部加载并实例化一遍 。如果你并不想用某些实现类,它也被加载并实例化了,这就造成了浪费。
  2. 获取某个实现类的方式不够灵活, 只能通过Iterator形式获取 ,不能根据某个 参数 来获取对应的实现类。

2 Dubbo SPI扩展

Dubbo对JDK SPI进行了扩展,对服务提供者配置文件中的内容进行了改造, 由原来的提供者类的全限定名列表改成了KV形式的列表,这也导致了Dubbo中无法直接使用JDK ServiceLoader ,所以,与之对应的,在Dubbo中有ExtensionLoader, ExtensionLoader是扩展点载入器,用于载入Dubbo中的各种可配置组件,比如动态代理方式(ProxyFactory)、 负载均衡 策略(LoadBalance)、RCP 协议 (Protocol)、拦截器(Filter)、容器类型(Container)、 集群 方式(Cluster)和 注册中心 类型(RegistryFactory)等 。

总之,Dubbo为了应对各种场景, 它的所有内部组件都是通过这种SPI的方式来 管理 ,这也是为什么Dubbo需要将服务提供者配置文件设计成KV键值对形式, 这个K就是我们在Dubbo配置文件或注解中用到的K,Dubbo直接通过服务接口(上面提到的ProxyFactory、LoadBalance、Protocol、Filter等)和配置的K从ExtensionLoader拿到服务提供的实现类 。

同时,由于Dubbo使用了URL总线的设计, 即很多参数通过URL对象来传递,在实际中,具体要用到哪个值,可以通过URL中的参数值来指定 。

2.1 扩展功能介绍

Dubbo对SPI的扩展是通过ExtensionLoader来实现的,查看ExtensionLoader的 源码 ,可以看到Dubbo对JDK SPI 做了三个方面的扩展:

  1. 方便获取扩展实现:JDK SPI仅仅通过接口类名获取所有实现, 而ExtensionLoader则通过接口类名和key值获取一个实现

  2. IOC依赖注入功能:Adaptive实现,就是生成一个代理类, 这样就可以根据实际调用时的一些参数动态决定要调用的类了 。

    举例来说:接口A,实现者A1、A2。接口B,实现者B1、B2。

    现在实现者A1含有setB()方法,会自动注入一个接口B的实现者,此时注入B1还是B2呢?都不是, 而是注入一个动态生成的接口B的实现者B$Adpative,该实现者能够根据参数的不同,自动引用B1或者B2来完成相应的功能 ;

  3. 采用装饰器模式进行功能增强,自动包装实现,这种实现的类一般是自动激活的,常用于包装类,比如Protocol的两个实现类:ProtocolFilterWrapper、ProtocolListenerWrapper。

    还是第2个的例子,接口A的另一个实现者AWrapper1。大体内容如下:

    private A a;
    AWrapper1(A a){
    	  this.a=a;
    }

    因此,我们在获取某一个接口A的实现者A1的时候,已经自动被AWrapper1包装了。

2.2 扩展源码分析

2.2.1 ExtensionLoader初始化

以获取DubboProtocol为例:

@SPI("dubbo")
public interface Protocol {
    
    /**
     * 获取缺省端口,当用户没有配置端口时使用。
     * 
     * @return 缺省端口
     */
    int getDefaultPort();

    /**
     * 暴露远程服务:<br>
     * 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
     * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
     * 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
     * 
     * @param <T> 服务的类型
     * @param invoker 服务的执行体
     * @return exporter 暴露服务的引用,用于取消暴露
     * @throws RpcException 当暴露服务出错时抛出,比如端口已占用
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * 引用远程服务:<br>
     * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
     * 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
     * 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
     * 
     * @param <T> 服务的类型
     * @param type 服务的类型
     * @param url 远程服务的URL地址
     * @return invoker 服务的本地代理
     * @throws RpcException 当连接服务提供方失败时抛出
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /**
     * 释放协议:<br>
     * 1. 取消该协议所有已经暴露和引用的服务。<br>
     * 2. 释放协议所占用的所有资源,比如连接和端口。<br>
     * 3. 协议在释放后,依然能暴露和引用新的服务。<br>
     */
    void destroy();

}

public class DubboProtocol extends AbstractProtocol {

    public static final String NAME = "dubbo";
    ...
    ...
}

// 示例:
ExtensionLoader<Protocol> protocolLoader = ExtensionLoader.getExtensionLoader(Protocol.class);
Protocol dubboProtocol = protocolLoader.getExtension(DubboProtocol.NAME);
  1. ExtensionLoader.getExtensionLoader(Protocol.class):获取ExtensionLoader实例

    private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
    
    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
         if (type == null)
             throw new IllegalArgumentException("Extension type == null");
         if(!type.isInterface()) {
             throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
         }
         // 0. 判断是否为通过SPI注解定义的可扩展接口
         if(!withExtensionAnnotation(type)) {
             throw new IllegalArgumentException("Extension type(" + type + 
                     ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
         }
         // 1. 先从EXTENSION_LOADERS中,根据传入可扩展类型type查找
         ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
         if (loader == null) {
             // 2. 不存在,则新建ExtensionLoader实例
             EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
             loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
         }
         return loader;
    }
    
     private ExtensionLoader(Class<?> type) {
         this.type = type;
         objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
     }
    
     private static <T> boolean withExtensionAnnotation(Class<T> type) {
         return type.isAnnotationPresent(SPI.class);
     }

    ExtensionLoader充当插件工厂角色,提供了一个私有的构造器。其入参type为扩展接口类型。 Dubbo通过SPI注解定义了可扩展的接口 ,如Filter、 Transport er等。 每个类型的扩展对应一个ExtensionLoader。SPI的 value 参数决定了默认的扩展实现 。

    当扩展类型是ExtensionFactory时,不指定objectFactory, 否则初始化ExtensionFactory的ExtensionLoader并获取一个扩展适配器 。

  2. protocolLoader.getExtension(DubboProtocol.NAME):根据Key获取相应的扩展实现类实例

    /**
      * 返回指定名字的扩展。如果指定名字的扩展不存在,则抛异常 {@link IllegalStateException}.
      *
      * @param name
      * @return
      */
     @SuppressWarnings("unchecked")
     public T getExtension(String name) {
         if (name == null || name.length() == 0)
             throw new IllegalArgumentException("Extension name == null");
         if ("true".equals(name)) {
             return getDefaultExtension();
         }
         // 1. 先从缓存中取相应的扩展实现类实例
         Holder<Object> holder = cachedInstances.get(name);
         if (holder == null) {
             cachedInstances.putIfAbsent(name, new Holder<Object>());
             holder = cachedInstances.get(name);
         }
         Object instance = holder.get();
         if (instance == null) {
             synchronized (holder) {
                 instance = holder.get();
                 if (instance == null) {
                     // 2. 创建相应的扩展实现类实例
                     instance = createExtension(name);
                     holder.set(instance);
                 }
             }
         }
         return (T) instance;
    }
    
     @SuppressWarnings("unchecked")
     private T createExtension(String name) {
         // 3. 根据name获取相应扩展类的类实例
         Class<?> clazz = getExtensionClasses().get(name);
         if (clazz == null) {
             throw findException(name);
         }
         try {
             T instance = (T) EXTENSION_INSTANCES.get(clazz);
             if (instance == null) {
                 EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                 instance = (T) EXTENSION_INSTANCES.get(clazz);
             }
             injectExtension(instance);
             Set<Class<?>> wrapperClasses = cachedWrapperClasses;
             if (wrapperClasses != null && wrapperClasses.size() > 0) {
                 for (Class<?> wrapperClass : wrapperClasses) {
                     instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                 }
             }
             return instance;
         } catch (Throwable t) {
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                     type + ")  could not be instantiated: " + t.getMessage(), t);
         }
     }

    createExtension方法就是创建相应的扩展类实例,具体里面的创建步骤下文会具体说到, 接下来先继续深入看getExtensionClasses方法是如何从配置文件中找到相应的扩展类的类配置 。

2.22 配置文件扫描

Dubbo默认依次扫描 META-INF/dubbo/internal/、META-INF/dubbo/、META-INF/services/三个 classpath 目录下的配置文件 。 配置文件以具体扩展接口全名命名 ,如com.a lib aba.dubbo.rpc.Filter,内容如下: 等号前为扩展名,其后为扩展实现类全路径名

echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
cache=com.alibaba.dubbo.cache.filter.CacheFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter

从上一小节的,getExtensionClasses()方法源码看起:

private Map<String, Class<?>> getExtensionClasses() {
        Map<String, Class<?>> classes = cachedClasses.get();
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
	}

    // 此方法已经getExtensionClasses方法同步过。
    private Map<String, Class<?>> loadExtensionClasses() {
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if(defaultAnnotation != null) {
            String value = defaultAnnotation.value();
            if(value != null && (value = value.trim()).length() > 0) {
                String[] names = NAME_SEPARATOR.split(value);
                if(names.length > 1) {
                    throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                            + ": " + Arrays.toString(names));
                }
                if(names.length == 1) cachedDefaultName = names[0];
            }
        }
        
        Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
        loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
        loadFile(extensionClasses, DUBBO_DIRECTORY);
        loadFile(extensionClasses, SERVICES_DIRECTORY);
        return extensionClasses;
    }

    private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
        String fileName = dir + type.getName();
        try {
            Enumeration<java.net.URL> urls;
            ClassLoader classLoader = findClassLoader();
            if (classLoader != null) {
                urls = classLoader.getResources(fileName);
            } else {
                urls = ClassLoader.getSystemResources(fileName);
            }
            if (urls != null) {
                // 1. 逐行读取配置文件,提取出扩展名或扩展类路径
                while (urls.hasMoreElements()) {
                    java.net.URL url = urls.nextElement();
                    try {
                        BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
                        try {
                            String line = null;
                            while ((line = reader.readLine()) != null) {
                                final int ci = line.indexOf('#');
                                if (ci >= 0) line = line.substring(0, ci);
                                line = line.trim();
                                if (line.length() > 0) {
                                    try {
                                        String name = null;
                                        int i = line.indexOf('=');
                                        if (i > 0) {
                                            name = line.substring(0, i).trim();
                                            line = line.substring(i + 1).trim();
                                        }
                                        if (line.length() > 0) {
                                            // 2. 利用Class.forName方法进行类加载
                                            Class<?> clazz = Class.forName(line, true, classLoader);
                                            if (! type.isAssignableFrom(clazz)) {
                                                throw new IllegalStateException("Error when load extension class(interface: " +
                                                        type + ", class line: " + clazz.getName() + "), class " 
                                                        + clazz.getName() + "is not subtype of interface.");
                                            }
                                            // 3. 处理Adaptive注解,若存在则将该实现类保存至cachedAdaptiveClass属性
                                            if (clazz.isAnnotationPresent(Adaptive.class)) {
                                                if(cachedAdaptiveClass == null) {
                                                    cachedAdaptiveClass = clazz;
                                                } else if (! cachedAdaptiveClass.equals(clazz)) {
                                                    throw new IllegalStateException("More than 1 adaptive class found: "
                                                            + cachedAdaptiveClass.getClass().getName()
                                                            + ", " + clazz.getClass().getName());
                                                }
                                            } else {
                                                try {
                                                    // 4. 尝试获取参数类型为当前扩展类型的构造器方法,若成功则表明存在该扩展的封装类型,将封装类型存入wrappers集合;否则转入第五步
                                                    clazz.getConstructor(type);
                                                    Set<Class<?>> wrappers = cachedWrapperClasses;
                                                    if (wrappers == null) {
                                                        cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                                                        wrappers = cachedWrapperClasses;
                                                    }
                                                    wrappers.add(clazz);
                                                } catch (NoSuchMethodException e) {
                                                    clazz.getConstructor();
                                                    if (name == null || name.length() == 0) {
                                                        name = findAnnotationName(clazz);
                                                        if (name == null || name.length() == 0) {
                                                            if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                                                    && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                                                name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                                                            } else {
                                                                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                                                            }
                                                        }
                                                    }

                                                    // 5. 处理active注解,将扩展名对应active注解存入cachedActivates
                                                    String[] names = NAME_SEPARATOR.split(name);
                                                    if (names != null && names.length > 0) {
                                                        Activate activate = clazz.getAnnotation(Activate.class);
                                                        if (activate != null) {
                                                            cachedActivates.put(names[0], activate);
                                                        }
                                                        for (String n : names) {
                                                            if (! cachedNames.containsKey(clazz)) {
                                                                cachedNames.put(clazz, n);
                                                            }
                                                            Class<?> c = extensionClasses.get(n);
                                                            if (c == null) {
                                                                extensionClasses.put(n, clazz);
                                                            } else if (c != clazz) {
                                                                throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    } catch (Throwable t) {
                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
                                        exceptions.put(line, e);
                                    }
                                }
                            } // end of while read lines
                        } finally {
                            reader.close();
                        }
                    } catch (Throwable t) {
                        logger.error("Exception when load extension class(interface: " +
                                            type + ", class file: " + url + ") in " + url, t);
                    }
                } // end of while urls
            }
        } catch (Throwable t) {
            logger.error("Exception when load extension class(interface: " +
                    type + ", description file: " + fileName + ").", t);
        }
    }

以上配置文件加载步骤如下:

  1. 逐行读取配置文件,提取出扩展名或扩展类路径;

  2. 利用Class.forName方法进行类加载;

    Class<?> clazz = Class.forName(line, true, classLoader);
  3. 处理Adaptive注解,若存在则将该实现类保存至cachedAdaptiveClass属性

    if (clazz.isAnnotationPresent(Adaptive.class)) {
       if(cachedAdaptiveClass == null) {
           cachedAdaptiveClass = clazz;
       } else if (! cachedAdaptiveClass.equals(clazz)) {
           throw new IllegalStateException("More than 1 adaptive class found:"    + cachedAdaptiveClass.getClass().getName()
                 + ", " + clazz.getClass().getName());
       }
    }
  4. 尝试获取参数类型为当前扩展类型的构造器方法,若成功则表明存在该扩展的封装类型,将封装类型存入wrappers集合;否则抛出异常转入第五步;

    try {
        // 扩展类型参数的构造器是封装器的约定特征,目前dubbo中默认的只有Filter和Listener的封装器
        clazz.getConstructor(type); 
        Set<Class<?>> wrappers = cachedWrapperClasses;
        if (wrappers == null) {
            cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
            wrappers = cachedWrapperClasses;
        }
        wrappers.add(clazz);
    } catch (NoSuchMethodException e) {
        // 第五步
    }
  5. 处理active注解,将扩展名对应active注解存入cachedActivates;

    Activate activate = clazz.getAnnotation(Activate.class);
    if (activate != null) {
        cachedActivates.put(names[0], activate);
    }

2.2.3 扩展适配器

在dubbo扩展中,适配器模式被广泛使用, 其作用在于为同一扩展类型下的多个扩展实现的调用提供路由功能,如指定优先级等 。dubbo提供了两种方式来生成扩展适配器:

  1. 静态适配器扩展

    所谓的静态适配器扩展就是提前通过编码的形式确定扩展的具体实现,且该实现类由Adaptive注解标注,如:AdaptiveCompiler。 在加载配置文件的loadFile方法中,已经描述过处理该类型扩展的逻辑,具体可参考上一小节loadFile()方法源码 。

    @Adaptive
    public class AdaptiveCompiler implements Compiler {
    
        private static volatile String DEFAULT_COMPILER;
    
        public static void setDefaultCompiler(String compiler) {
            DEFAULT_COMPILER = compiler;
        }
    
        public Class<?> compile(String code, ClassLoader classLoader) {
            Compiler compiler;
            ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class);
            String name = DEFAULT_COMPILER; // copy reference
            if (name != null && name.length() > 0) {
                compiler = loader.getExtension(name);
            } else {
                compiler = loader.getDefaultExtension();
            }
            return compiler.compile(code, classLoader);
        }
    }
  2. 动态适配器扩展

    动态适配器扩展即通过动态代理生成扩展类的动态代理类,在dubbo中是通过javassist技术生成的。与传统的jdk动态代理、cglib不同,javassist提供封装后的API对字节码进行间接操作,简单易用,不关心具体字节码,灵活性更高,且处理效率也较高,是dubbo默认的 编译 器。

首先,从ExtensionLoader构造器中会调用getAdaptiveExtension()方法触发为当前扩展类型生成适配器,源码如下:

private ExtensionLoader(Class<?> type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }

    public T getAdaptiveExtension() {
        // 1. 首先,检查是否存在当前扩展类静态适配器
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if(createAdaptiveInstanceError == null) {
                synchronized (cachedAdaptiveInstance) {
                    instance = cachedAdaptiveInstance.get();
                    if (instance == null) {
                        try {
                            // 2. 创建当前扩展类动态适配器
                            instance = createAdaptiveExtension();
                            cachedAdaptiveInstance.set(instance);
                        } catch (Throwable t) {
                            createAdaptiveInstanceError = t;
                            throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                        }
                    }
                }
            }
            else {
                throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
            }
        }

        return (T) instance;
    }

    private T createAdaptiveExtension() {
        try {
            // IOC属性注入
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
        }
    }
    
    private Class<?> getAdaptiveExtensionClass() {
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }
    
    private Class<?> createAdaptiveExtensionClass() {
        String code = createAdaptiveExtensionClassCode();
        ClassLoader classLoader = findClassLoader();
        // 得到Adaptive类代码内容,通过Compiler进行编译和类加载
        com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }
    
    // 创建当前扩展动态适配器
    private String createAdaptiveExtensionClassCode() {
        StringBuilder codeBuidler = new StringBuilder();
        Method[] methods = type.getMethods();
        boolean hasAdaptiveAnnotation = false;
        // 1. 检查是否至少有一个方法有Adaptive注解,若不存在则抛出异常,即要完成动态代理,必须有方法标注了Adaptive注解
        for(Method m : methods) {
            if(m.isAnnotationPresent(Adaptive.class)) {
                hasAdaptiveAnnotation = true;
                break;
            }
        }
        // 完全没有Adaptive方法,则不需要生成Adaptive类
        if(! hasAdaptiveAnnotation)
            throw new IllegalStateException("No adaptive method on extension " + type.getName() + ", refuse to create the adaptive class!");
        
        codeBuidler.append("package " + type.getPackage().getName() + ";");
        codeBuidler.append("/nimport " + ExtensionLoader.class.getName() + ";");
        codeBuidler.append("/npublic class " + type.getSimpleName() + "$Adpative" + " implements " + type.getCanonicalName() + " {");
        
        for (Method method : methods) {
            Class<?> rt = method.getReturnType();
            Class<?>[] pts = method.getParameterTypes();
            Class<?>[] ets = method.getExceptionTypes();

            Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
            StringBuilder code = new StringBuilder(512);
            if (adaptiveAnnotation == null) {
                code.append("throw new UnsupportedOperationException(/"method ")
                        .append(method.toString()).append(" of interface ")
                        .append(type.getName()).append(" is not adaptive method!/");");
            } else {
                int urlTypeIndex = -1;
                for (int i = 0; i < pts.length; ++i) {
                    if (pts[i].equals(URL.class)) {
                        urlTypeIndex = i;
                        break;
                    }
                }
                // 对于有Adaptive注解的方法,判断其入参中是否有URL类型的参数,或者复杂参数中是否有URL类型的属性,若没有则抛出异常。
                // 这里体现出了为什么dubbo要提供动态适配器生成机制。dubbo中的URL总线提供了服务的全部信息,而开发者可以定义差异化的服务配置,因此生成的URL差异化也较大,若全部靠用户硬编码静态适配器的话效率太低。
                // 有了动态代理,dubbo可以根据URL参数动态地生成适配器的适配逻辑,确定扩展实现的获取优先级。因此,URL作为参数直接或间接传入是必须的,否则失去了动态生成的凭据。
                // 有类型为URL的参数
                if (urlTypeIndex != -1) {
                    // Null Point check
                    String s = String.format("/nif (arg%d == null) throw new IllegalArgumentException(/"url == null/");",
                                    urlTypeIndex);
                    code.append(s);
                    
                    s = String.format("/n%s url = arg%d;", URL.class.getName(), urlTypeIndex); 
                    code.append(s);
                }
                // 参数没有URL类型
                else {
                    String attribMethod = null;
                    
                    // 找到参数的URL属性
                    LBL_PTS:
                    for (int i = 0; i < pts.length; ++i) {
                        Method[] ms = pts[i].getMethods();
                        for (Method m : ms) {
                            String name = m.getName();
                            if ((name.startsWith("get") || name.length() > 3)
                                    && Modifier.isPublic(m.getModifiers())
                                    && !Modifier.isStatic(m.getModifiers())
                                    && m.getParameterTypes().length == 0
                                    && m.getReturnType() == URL.class) {
                                urlTypeIndex = i;
                                attribMethod = name;
                                break LBL_PTS;
                            }
                        }
                    }
                    if(attribMethod == null) {
                        throw new IllegalStateException("fail to create adative class for interface " + type.getName()
                        		+ ": not found url parameter or url attribute in parameters of method " + method.getName());
                    }
                    
                    // Null point check
                    String s = String.format("/nif (arg%d == null) throw new IllegalArgumentException(/"%s argument == null/");",
                                    urlTypeIndex, pts[urlTypeIndex].getName());
                    code.append(s);
                    s = String.format("/nif (arg%d.%s() == null) throw new IllegalArgumentException(/"%s argument %s() == null/");",
                                    urlTypeIndex, attribMethod, pts[urlTypeIndex].getName(), attribMethod);
                    code.append(s);

                    s = String.format("%s url = arg%d.%s();",URL.class.getName(), urlTypeIndex, attribMethod); 
                    code.append(s);
                }
                
                String[] value = adaptiveAnnotation.value();
                // 没有设置Key,则使用“扩展点接口名的点分隔 作为Key
                if(value.length == 0) {
                    char[] charArray = type.getSimpleName().toCharArray();
                    StringBuilder sb = new StringBuilder(128);
                    for (int i = 0; i < charArray.length; i++) {
                        if(Character.isUpperCase(charArray[i])) {
                            if(i != 0) {
                                sb.append(".");
                            }
                            sb.append(Character.toLowerCase(charArray[i]));
                        }
                        else {
                            sb.append(charArray[i]);
                        }
                    }
                    value = new String[] {sb.toString()};
                }
                
                boolean hasInvocation = false;
                for (int i = 0; i < pts.length; ++i) {
                    if (pts[i].getName().equals("com.alibaba.dubbo.rpc.Invocation")) {
                        // Null Point check
                        String s = String.format("/nif (arg%d == null) throw new IllegalArgumentException(/"invocation == null/");", i);
                        code.append(s);
                        s = String.format("/nString methodName = arg%d.getMethodName();", i); 
                        code.append(s);
                        hasInvocation = true;
                        break;
                    }
                }
                
                String defaultExtName = cachedDefaultName;
                String getNameCode = null;
                for (int i = value.length - 1; i >= 0; --i) {
                    if(i == value.length - 1) {
                        if(null != defaultExtName) {
                            if(!"protocol".equals(value[i]))
                                if (hasInvocation) 
                                    getNameCode = String.format("url.getMethodParameter(methodName, /"%s/", /"%s/")", value[i], defaultExtName);
                                else
                                    getNameCode = String.format("url.getParameter(/"%s/", /"%s/")", value[i], defaultExtName);
                            else
                                getNameCode = String.format("( url.getProtocol() == null ? /"%s/" : url.getProtocol() )", defaultExtName);
                        }
                        else {
                            if(!"protocol".equals(value[i]))
                                if (hasInvocation) 
                                    getNameCode = String.format("url.getMethodParameter(methodName, /"%s/", /"%s/")", value[i], defaultExtName);
                                else
                                    getNameCode = String.format("url.getParameter(/"%s/")", value[i]);
                            else
                                getNameCode = "url.getProtocol()";
                        }
                    }
                    else {
                        if(!"protocol".equals(value[i]))
                            if (hasInvocation) 
                                getNameCode = String.format("url.getMethodParameter(methodName, /"%s/", /"%s/")", value[i], defaultExtName);
                            else
                                getNameCode = String.format("url.getParameter(/"%s/", %s)", value[i], getNameCode);
                        else
                            getNameCode = String.format("url.getProtocol() == null ? (%s) : url.getProtocol()", getNameCode);
                    }
                }
                code.append("/nString extName = ").append(getNameCode).append(";");
                // check extName == null?
                String s = String.format("/nif(extName == null) " +
                		"throw new IllegalStateException(/"Fail to get extension(%s) name from url(/" + url.toString() + /") use keys(%s)/");",
                        type.getName(), Arrays.toString(value));
                code.append(s);
                
                s = String.format("/n%s extension = (%<s)%s.getExtensionLoader(%s.class).getExtension(extName);",
                        type.getName(), ExtensionLoader.class.getSimpleName(), type.getName());
                code.append(s);
                
                // return statement
                if (!rt.equals(void.class)) {
                    code.append("/nreturn ");
                }

                s = String.format("extension.%s(", method.getName());
                code.append(s);
                for (int i = 0; i < pts.length; i++) {
                    if (i != 0)
                        code.append(", ");
                    code.append("arg").append(i);
                }
                code.append(");");
            }
            
            codeBuidler.append("/npublic " + rt.getCanonicalName() + " " + method.getName() + "(");
            for (int i = 0; i < pts.length; i ++) {
                if (i > 0) {
                    codeBuidler.append(", ");
                }
                codeBuidler.append(pts[i].getCanonicalName());
                codeBuidler.append(" ");
                codeBuidler.append("arg" + i);
            }
            codeBuidler.append(")");
            if (ets.length > 0) {
                codeBuidler.append(" throws ");
                for (int i = 0; i < ets.length; i ++) {
                    if (i > 0) {
                        codeBuidler.append(", ");
                    }
                    codeBuidler.append(ets[i].getCanonicalName());
                }
            }
            codeBuidler.append(" {");
            codeBuidler.append(code.toString());
            codeBuidler.append("/n}");
        }
        codeBuidler.append("/n}");
        if (logger.isDebugEnabled()) {
            logger.debug(codeBuidler.toString());
        }
        return codeBuidler.toString();
    }

根据adaptive注解的value数组值,及SPI注解定义的默认扩展名,确定适配逻辑,即扩展获取的优先级,这里不再罗列代码,下面为一个具体生成的适配器源码:

package com.alibaba.dubbo.remoting;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter {
    public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg0;
        String extName = url.getParameter("client", url.getParameter("transporter", "netty")); // 处理顺序
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])");
        com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
        return extension.connect(arg0, arg1);
    }
    public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg0;
        String extName = url.getParameter("server", url.getParameter("transporter", "netty")); // 处理顺序
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
        com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
        return extension.bind(arg0, arg1);
    }
}

可以看到, 核心逻辑是获取扩展名extName ,以bind方法为例, 其获取优先级是server,transporter,netty ,可参见URL的getParameter方法源码。其中netty是Transporter接口的SPI注解确定的默认值,而server和transporter是bind方法的Adaptive注解定义的。

@SPI("netty")
public interface Transporter {
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

拿到扩展名后,再从ExtensionLoader获取到扩展实例,调用具体的bind方法。

源码生成后, ExtensionLoader再调用默认的JavassitCompiler进行编译和类加载 ,其具体实现原理不在本文讨论范围,有机会的话后续会介绍这部分内容。

综上可知,ExtensionLoader提供了获取扩展适配器的方法,优先查看是否有静态适配器,否则会使用动态适配器。

2.2.4 封装类

dubbo中存在 一种对于扩展的封装类,其功能是将各扩展实例串联起来,形成扩展链,比如过滤器链,监听链 。当调用ExtensionLoader的getExtension方法时,会做拦截处理, 如果存在封装器,则返回封装器实现,而将真实实现通过构造方法注入到封装器中 。

@SuppressWarnings("unchecked")
    private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            // IOC 注入
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (wrapperClasses != null && wrapperClasses.size() > 0) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                    type + ")  could not be instantiated: " + t.getMessage(), t);
        }
    }
    
    private T injectExtension(T instance) {
        try {
            if (objectFactory != null) {
                // 遍历当前实例所有方法,判断是否需要进行set属性注入
                for (Method method : instance.getClass().getMethods()) {
                    if (method.getName().startsWith("set")
                            && method.getParameterTypes().length == 1
                            && Modifier.isPublic(method.getModifiers())) {
                        Class<?> pt = method.getParameterTypes()[0];
                        try {
                            String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                            // 通过ExtensionFactory获取被注入set属性实例
                            Object object = objectFactory.getExtension(pt, property);
                            if (object != null) {
                                method.invoke(instance, object);
                            }
                        } catch (Exception e) {
                            logger.error("fail to inject via method " + method.getName()
                                    + " of interface " + type.getName() + ": " + e.getMessage(), e);
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return instance;
    }

这里有个injectExtension方法,其作用是:

如果当前扩展实例存在其他的扩展属性,则通过反射调用其set方法设置扩展属性。若该扩展属性是适配器类型,也是通过ExtensionLoader获取的。

所以,ExtensionLoader作为一个IOC插件容器,为dubbo的插件体系运作提供了保障,可以说是dubbo中的核心。掌握了其基本原理,才有助于我们更好地分析dubbo源码。

原文

https ://juejin.im/post/5acee93b51882555731c82ba

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。 PS:如果您想和业内技术大牛交流的话,请加qq群(474807195)或者关注微信公众号(AskHarries),谢谢!

转载请注明原文出处: Harries Blog™ » 聊聊Dubbo(五):核心源码-SPI扩展

作者:Harries Blog™
追心中的海,逐世界的梦
原文地址:聊聊Dubbo(五):核心源码-SPI扩展, 感谢原作者分享。

发表评论