Apache Camel快速入门【下】

梦想不抛弃苦心追求的人,只要不停止追求,你们会沐浴在梦想的光辉之中。再美好的梦想与目标,再完美的计划和方案,如果不能尽快在行动中落实,最终只能是纸上谈兵,空想一番。只要瞄准了大方向,坚持不懈地做下去,才能够扫除挡在梦想前面的障碍,实现美好的人生蓝图。Apache Camel快速入门【下】,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

3-5、Service与生命周期

Apache Camel中有一个比Endpoint、Component、CamelContext等元素更基础的概念元素:Service。Camel官方文档中对Service的解释是:

Camel uses a simple lifecycle interface called Service which has a single start() and stop() method.

Various classes implement Service such as CamelContext along with a number of Component and Endpoint classes.

When you use Camel you typically have to start the CamelContext which will start all the various components and endpoints and activate the routing rules until the context is stopped again.

……

包括Endpoint、Component、CamelContext等元素在内的大多数工作在Camel中的元素,都是一个一个的Service。例如,我们虽然定义了一个JettyHttpComponent(就是在代码中使用DSL定义的”jetty:http://0.0.0.0:8282/directCamel“头部所表示的Component),但是我们想要在Camel应用程序运行阶段使用这个Component,就需要利用start方法将这个Component启动起来。

实际上通过阅读org.apache.camel.component.jetty.JettyHttpComponent的源代码,读者可以发现JettyHttpComponent的启动过程起始大多数情况下什么都不会做,只是在org.apache.camel.support.ServiceSupport中更改了JettyHttpComponent对象的一些状态属性。倒是HttpConsumer这个Service,在启动的过程中启动了JettyHttpComponent对象的连接监听,并建立了若干个名为【qtp-*】的处理线程。下图为读者展示了org.apache.camel.Service接口的主要继承/实现体系:

这里写图片描述

Service有且只有两个接口方法定义:start()和stop(),这两个方法的含义显而易见,启动服务和终止服务。另外继承自Service的另外两个子级接口SuspendableService、ShutdownableService分别还定义了另外几个方法:suspend()、resume()和shutdown()方法,分别用来暂停服务、恢复服务和彻底停止服务(彻底停止服务意味着在Camel应用程序运行的有生之年不能再次启动了)。

Camel应用程序中的每一个Service都是独立运行的,各个Service的关联衔接通过CamelContext上下文对象完成。每一个Service通过调用start()方法被激活并参与到Camel应用程序的工作中,直到它的stop()方法被调用。也就是说,每个Service都有独立的生命周期。(http://camel.apache.org/lifecycle.html)

那么问题来了,既然每个Service都有独立的生命周期,我们启动Camel应用程序时就要启动包括Route、Endpoint、Component、Producer、Consumer、LifecycleStrategy等概念元素在内的无数多个Service实现,那么作为开发人员不可能编写代码一个一个的Service来进行启动(大多数开发人员不了解Camel的内部结构,也根本不知道要启动哪些Service)。那么作为Camel应用程序肯定需要提供一个办法,在应用程序启动时分析应用程序所涉及到的所有的Service,并统一管理这些Service启动和停止的动作。这就是CamelContext所设计的另一个功能。
 

4、CamelContext上下文

CamelContext从英文字面上理解,是Camel服务上下文的意思。CamelContext在Apache Camel中的重要性,就像ApplicationContext之于Spring、ServletContext之于Servlet…… 但是包括Camel官方文档在内的,所有读者能够在互联网上找到的资料对于CamelContext的介绍都只有聊聊数笔。

The context component allows you to create new Camel Components from a CamelContext with a number of routes which is then treated as a black box, allowing you to refer to the local endpoints within the component from other CamelContexts.

First you need to create a CamelContext, add some routes in it, start it and then register the CamelContext into the Registry (JNDI, Spring, Guice or OSGi etc).

………

以上是Camel官方文档(http://camel.apache.org/context.html)对于CamelContext作用的一些说明,大致的意思是说CamelContext横跨了Camel服务的整个生命周期,并且为Camel服务的工作环境提供支撑。

4-1、CamelContext实现结构

那么CamelContext中到底存储了哪些重要的元素,又是如何工作的呢?看样子官方的使用手册中并没有说明,我们还是通过分析CamelContext的源代码来看看它的一些什么内容吧。下面我们应用已经讲解过的Apache Camel相关知识,对org.apache.camel.CamelContext接口以及它的主要实现类进行分析,以便尽可能的去理解为什么CamelContext非常重要:

上图是Apache Camel中实现了org.apache.camel.CamelContext接口的主要类。其中有两个实现类需要特别说明一下:SpringCamelContext和DefaultCamelContext。Camel可以和Spring框架进行无缝集成,例如可以将您的某个Processor处理器以Spring Bean的形式注入到Spring Ioc容器中,然后Camel服务就可以通过在Spring Ioc容器中定义的bean id(XML方式或者注解方式都行)取得这个Processor处理器的实例。

为了实现以上描述的功能,需要Camel服务能够从Spring的ApplicationContext取得Bean,而SpringCamelContext可以帮助Camel服务完成这个关键动作:通过SpringCamelContext中重写的createRegistry方法创建一个ApplicationContextRegistry实例,并通过后者从ApplicationContext的“getBean”方法中获取Spring Ioc容器中符合指定的Bean id的实例。这就是Camel服务和Spring进行无缝集成的一个关键点,如以下代码片段所示:

public class SpringCamelContext extends DefaultCamelContext implements InitializingBean, DisposableBean, ApplicationContextAware {
    ......
     @Override
    protected Registry createRegistry() {
        return new ApplicationContextRegistry(getApplicationContext());
    }
    ......
}
 
public class ApplicationContextRegistry implements Registry {
    ......
 
    @Override
    public Object lookupByName(String name) {
        try {
            return applicationContext.getBean(name);
        } catch (NoSuchBeanDefinitionException e) {
            return null;
        }
    }
    ......
}

另外一个需要说明的是DefaultCamelContext类,这个类是我们在前文涉及到Camel示例代码时使用最多的CamelContext实现。而我们将要分析的CamelContext工作原理也基本上是在这个类中进行了完整的实现——其子类只是根据不同的Camel运行环境重写了其中某些方法(例如之前提到的createRegistry方法)。

4-2、DefaultCamelContext结构和启动过程

如果我们翻阅DefaultCamelContext的源代码,首先就会发现在其中定义了许多全局变量,数量在70个左右(实际上根据《代码大全》的描述,一个类中不应该有这么多全局变量。究竟这个类的作者当时是怎样的想法,就不清楚了)。其中一些变量负责记录CamelContext的状态属性、一些负责引用辅助工具还有一些记录关联的顶层工作对象(例如Endpoint、Servcie、Routes、)Components等等)。很明显我们无法对这些变量逐一进行深入分析讲解,但是经过前两篇文章的介绍至少以下变量信息我们是需要理解其作用的:

public class DefaultCamelContext extends ServiceSupport implements ModelCamelContext, SuspendableService {
    ......
 
    // java的基础概念:类加载器,一般进行线程操作时会用到它
    private ClassLoader applicationContextClassLoader;
    // 已定义的endpoint URI(完整的)和Endpoint对象的映射关系
    private Map<EndpointKey, Endpoint> endpoints;
    // 已使用的组件名称(即Endpoint URI头所代表的组件名称)和组件对象的对应关系
    private final Map<String, Component> components = new HashMap<String, Component>();
    // 针对原始路由编排所分析出的路由对象,路由对象是作为CamelContext从路由中的一个元素传递到下一个元素的依据
    //  路由对象中还包含了,将路由定义中各元素连接起来的其它Service。例如DefaultChannel
    private final Set<Route> routes = new LinkedHashSet<Route>();
    // 由DSL或者XML描述的原始路由编排。每一个RouteDefinition元素中都包含了参与这个路由的所有Service定义。
    private final List<RouteDefinition> routeDefinitions = new ArrayList<RouteDefinition>();
    // 生命周期策略,实际上是一组监听,文章后面的内容会重点讲到
    private List<LifecycleStrategy> lifecycleStrategies = new CopyOnWriteArrayList<LifecycleStrategy>();
    // 这是一个计数器,记录当前每一个不同的Routeid中正在运行的的Exchange数量
    private InflightRepository inflightRepository = new DefaultInflightRepository();
    // 服务停止策略
    private ShutdownStrategy shutdownStrategy = new DefaultShutdownStrategy(this);
    ......
}

为了和本文3-5小节的内容向呼应,这里我们着重分析一下DefaultCamelContext的启动过程:DefaultCamelContext是如何帮助整个Camel应用程序中若干Service完成启动过程的?首先说明DefaultCamelContext 也是一个Service,所以它必须实现Service接口的start()方法和stop()方法。而DefaultCamelContext对于start()方法的实现就是“启动其它已知的Service”。

更具体的来说,DefaultCamelContext将所有需要启动的Service按照它们的作用类型进行区分,例如负责策略管理的Service、负责Components组件描述的Service、负责注册管理的Service等等,然后再按照顺序启动这些Service。以下代码片段提取自DefaultCamelContext的doStartCamel()私有方法,并加入了笔者的中文注释(原有作者的注释依然保留),这个私有方法由DefaultCamelContext中的start()方法间接调用,用于完成上述各Service启动操作。

// 为了调用该私有方法,之前的方法执行栈分别为:
// start()
// super.start()
// doStart()
......
private void doStartCamel() throws Exception {
    // 获取classloader是有必要的,这样保证了Camel服务中的classloader和环境中的其他组件(例如spring)一致
    if (applicationContextClassLoader == null) {
       // Using the TCCL as the default value of ApplicationClassLoader
        ClassLoader cl = Thread.currentThread().getContextClassLoader();
        if (cl == null) {
            // use the classloader that loaded this class
            cl = this.getClass().getClassLoader();
        }
        setApplicationContextClassLoader(cl);
    }
 
    ......
 
    // 首先启动的是ManagementStrategy策略管理器,它的默认实现是DefaultManagementStrategy。
    // 还记得我们在分析DUBBO时提到的Java spi机制吧,Camel-Core也使用了这个机制,并进行了二次封装。详见org.apache.camel.spi代码包。
    // 启动ManagementStrategy,可以帮助Camel实现第三方组件包(例如Camel-JMS)的动态加载
    // start management strategy before lifecycles are started
    ManagementStrategy managementStrategy = getManagementStrategy();
    // inject CamelContext if aware
    if (managementStrategy instanceof CamelContextAware) {
        ((CamelContextAware) managementStrategy).setCamelContext(this);
    }
    ServiceHelper.startService(managementStrategy);
 
    ......
    // 然后启动的是 生命周期管理策略 
    // 这个lifecycleStrategies变量是一个LifecycleStrategy泛型的List集合。
    // 实际上LifecycleStrategy是指是一组监听,详见代码片段后续的描述
    ServiceHelper.startServices(lifecycleStrategies);
 
    ......
    // 接着做一系列的Service启动动作
    // 首先是Endpoint注册管理服务,要进行重点介绍的是org.apache.camel.util.LRUSoftCache
    // 它使用了java.lang.ref.SoftReference进行实现,这是Java提供的
    endpoints = new EndpointRegistry(this, endpoints);
        addService(endpoints);
 
    ......
    // 启动线程池管理策略和一些列其它服务
    // 基本上这些Service已经在上文中提到过
    doAddService(executorServiceManager, false);
    addService(producerServicePool);
    addService(inflightRepository);
    addService(shutdownStrategy);
    addService(packageScanClassResolver);
    addService(restRegistry);
 
    ......
    // start components
    startServices(components.values());
    // 启动路由定义,路由定义RouteDefinition本身并不是Service,但是其中包含了参与路由的各种元素,例如Endpoint。
    // start the route definitions before the routes is started
    startRouteDefinitions(routeDefinitions);
 
    ......
}
......

以上代码片段已经做了比较详细的注释。下文中,我们将以上代码片段中无法用几句话在代码注释中表达的关键知识点再进行说明:

4-2-1、LifecycleStrategy

LifecycleStrategy接口按照字面的理解是一个关于Camel中元素生命周期的规则管理器,但实际上LifecycleStrategy接口的定义更确切的应该被描述成一个监听器

当Camel引用程序中发生诸如Route加载、Route移除、Service加载、Serivce移除、Context启动或者Context移除等事件时,DefaultCamelContext中已经被添加到集合“lifecycleStrategies”(java.util.List<LifecycleStrategy>)的LifecycleStrategy对象将会做相应的事件触发。

读者还应该注意到“lifecycleStrategies”集合是一个CopyOnWriteArrayList,我们随后对这个List的实现进行讲解。以下代码展示了在DefaultCamelContext添加Service时,DefaultCamelContext内部是如何触发“lifecycleStrategies”集合中已添加的监听的:

......
private void doAddService(Object object, boolean closeOnShutdown) throws Exception {
    ......
 
    // 只有以下条件成立,才需要将外部来源的Object作为一个Service处理
    if (object instanceof Service) {
        Service service = (Service) object;
 
        // 依次连续触发已注册的监听
        for (LifecycleStrategy strategy : lifecycleStrategies) {
            // 如果是一个Endpoint的实现,则触发onEndpointAdd方法
            if (service instanceof Endpoint) {
                // use specialized endpoint add
                strategy.onEndpointAdd((Endpoint) service);
            } 
            // 其它情况下,促发onServiceAdd方法
            else {
               strategy.onServiceAdd(this, service, null);
            }
       }
 
       // 其它后续处理
       ......
    }
}
......

4-2-2、CopyOnWriteArrayList与监听者模式
正如上一小节讲到的,已在DefaultCamelContext中注册的LifecycleStrategy对象存放于一个名叫“lifecycleStrategies”的集合中,后者是CopyOnWriteArrayList容器的实现,这是一个从JDK 1.5+ 版本开始提供的容器结构。

各位读者可以设想一下这样的操作:某个线程在对容器进行写操作的同时,还有另外的线程对容器进行读取操作。如果上述操作过程是在没有“线程安全”特性的容器中进行的,那么可能出现的情况就是:开发人员原本想读取容器中 i 位置的元素X,可这个元素已经被其它线程删除了,开发人员最后读取的 i 位置的元素变成了Y。但是在具有“写线程安全”特性的容器中进行这样的操作就不会有问题:因为写操作在另一个副本容器中进行,原容器中的数据大小、数据位置都不会受到影响。

如果上述操作过程是在有“线程安全”特性的容器中进行的,那么以上脏读的情况是可以避免的。但是又会出现另外一个问题:由于容器的各种读写操作都会加上锁(无论是悲观锁还是乐观锁),所以容器的读写性能又会收到影响。如果采用的是乐观锁,那么对性能的影响可能还不会太大,但是如果采用的是悲观锁,那么对性能的影响就有点具体了。

CopyOnWriteArrayList为我们提供了另一种线程安全的容器操作方式。CopyOnWriteArrayList的工作效果类似于java.util.ArrayList,但是它通过ReentrantLock实现了容器中写操作的线程安全性。CopyOnWriteArrayList最大的特点是:当进行容器中元素的修改操作时,它会首先将容器中的原有元素克隆到一个副本容器中,然后对副本容器中的元素进行修改操作。待这些操作完成后,再将副本中的元素集合重新会写到原有的容器中完成整个修改操作。这种工作机制称为Copy-On-Write(COW)。这样做的最主要目的是分离容器的读写操作。CopyOnWriteArrayList会对所有的写操作加锁,但是不会对任何容器的读操作加锁(因为写操作在一个副本中进行)。

另外CopyOnWriteArrayList还重新实现了一个新的迭代器:COWIterator。它是做什么的呢?举例说明:在ArrayList中我们如果在进行迭代时同时进行容器的写操作,那么就可能会因为下标超界等原因出现程序异常:

List<?> list = new ArrayList<?>();
// 省略了添加元素部分的代码
......
 
// ArrayList不支持这样的操作方式,会报错
for(Object item : list){
    list.remove(item);
}

但如果使用CopyOnWriteArrayList中重写的COWIterator迭代器,就不会出现的情况(开发人员还可以使用JDK 1.5+ 提供的另一个线程安全COW容器:CopyOnWriteArraySet):

List<?> list = new CopyOnWriteArrayList<?>();
// 省略了添加元素部分的代码
......
 
// COWIterator迭代器支持一边迭代一边进行容器的写操作
for(Object item : list){
    list.remove(item);
}

那么CopyOnWriteArrayList和监听器模式有什么关系呢?在书本上我们学到的监听器容器基本上都不是线程安全的,这基本上是出于两方面的考虑。首先对于设计模式的初学者来说最重要的理解模式所代表的设计思想,而非实现细节;另外,在这些示例中,设计模式的实现和操作一般为单一线程,不会出现多其它线程同时操作容器的情况。以下是我们常看到的监听者模式(代码片段):

/**
 * 为事件监听携带的业务对象
 * @author yinwenjie
 */
public class BusinessEventObject extends EventObject {
    public BusinessEventObject(Object source) {
        super(source);
    }
}
 
/**
 * 监听器,其中只有一个事件方法
 * @author yinwenjie
 */
public interface BusinessEventListener extends EventListener {
    public void onBusinessStart(BusinessEventObject eventObject);
}
 
/**
 * 业务级别的代码
 * @author yinwenjie
 */
public class BusinessOperation {
 
    /**
     * 已注册的监听器放在这里
     */
    private List<BusinessEventListener> listeners = new ArrayList<BusinessEventListener>();
 
    public void registeListener(BusinessEventListener eventListener) {
        this.listeners.add(eventListener);
    }
 
    ......  
 
    public void doOp() {
        //业务代码在这里运行后,接着促发监听
        for (BusinessEventListener businessEventListener : listeners) {
            businessEventListener.onBusinessStart(new BusinessEventObject(this));
        }
    }
    ......
}

以上代码无需做太多说明。请注意,由于我们使用ArrayList这样的非线程安全容器作为已注册监听的存储容器,所以开发人员在使用这个容器触发监听事件时需要格外小心:确保同一时间只会有一个线程对容器进行写操作、确保在一个迭代器内没有容器的写操作、还要确保每个监听器的具体实现不会把当前线程锁死(次要)——但作为开发人员真的能随时保证这些事情吗?

4-2-3、SoftReference
我们都知道JVM的内存是有上限的,JVM的垃圾回收线程进行工作时会将当前没有任何引用可达性的对象区域进行回收,以便保证JVM的内存空间能够被循环利用。当JVM的可用内存达到上限,且垃圾回收线程又无法找到任何可以回收的对象时,应用程序就会报错。JVM中某个线程的堆栈状态可能如下图所示:
在这里插入图片描述

上图中线程Thread1在执行时,在栈内存中创建了一个变量X。变量X指向堆内存为A类实例化对象分配的内存空间(后文称之为A对象)。注意,A对象中还对同样存在于堆内存区域中的B类、C类的实例化对象(后文称为B对象、C对象)有引用关系。那么如果JVM垃圾回收策略要对A对象、B对象、C对象三个内存区域进行回收,除非针对这些区域的引用可达性全部消失,否则以上所说到的对内存区域都不会被回收。这样的对象间引用方式被称为强引用(Strong Reference):JVM宁愿抛出OutOfMemoryError也不会在还存在引用可及性的情况下回收内存区域。

引用可达性,是JVM垃圾回收策略中确认哪些内存区域可以进行回收的判断算法。大致的定义是:从某个根引用开始进行引用图结构的深度遍历扫描,当遍历完成时那些没有被扫描到的一个(或者多个)内存区域就是失去引用可达性的区域。

引用可达性,是JVM垃圾回收策略中确认哪些内存区域可以进行回收的判断算法。大致的定义是:从某个根引用开始进行引用图结构的深度遍历扫描,当遍历完成时那些没有被扫描到的一个(或者多个)内存区域就是失去引用可达性的区域。
在这里插入图片描述
上如所示的引用关系和图A中的引用关系类似,只是我们在A对B、C的引用关系上都增加了一个SoftReference对象进行间接关联。代码片段如下所示:

package com.test;
 
import java.lang.ref.SoftReference;
 
public class A {
    /**
     * 软引用 B
     */
    private SoftReference<B> paramB;
 
    /**
     * 软引用 C
     */
    private SoftReference<C> paramC;
 
    /**
     * 构造函数中,建立和B、C的软引用
     * @param paramB
     * @param paramC
     */
    public A(B paramB , C paramC) {
        this.paramB = new SoftReference<B>(paramB);
        this.paramC = new SoftReference<C>(paramC);
    }
 
    /**
     * @return the paramB
     */
    public B getParamB() {
        return paramB.get();
    }
 
    /**
     * @return the paramC
     */
    public C getParamC() {
        return paramC.get();
    }
}

当出现“软引用”对象被垃圾回收线程回收时,例如B对象被回收时,A对象中的getB()方法将会返回null。那么原来进行B对象间接引用动作的SoftReference对象该怎么处理呢?要知道如果B对象被回收了,那么承载这个“软引用”的SoftReference对象就没有什么用处了。还好JDK中帮我们准备了名叫ReferenceQueue的队列,当SoftReference对象所承载的“软引用”对象被回收后,这个Reference对象将被送入ReferenceQueue中(当然你也可以不指定,如果不指定的话SoftReference对象会以“强引用”的回收策略被回收,不过SoftReference对象所占用的内存空间不大),开发人员可以随时扫描ReferenceQueue,并对其中的Reference对象进行清除。

注意,一个对象同一时间并不一定只被另一个对象引用,而是可能被若干个对象同时引用。只要对这个对象的引用中有一个没有使用“软引用”特性,那么垃圾回收策略对它的回收就不会采用“软引用”的回收策略进行。如下图所示:

这里写图片描述

上图中,有两个对象元素同时对B对象进行了引用(注意是同一个B对象,而不是对B类分别new了两次)。其中A对象对B对象的依赖通过“软引用”(SoftReference)间接完成,D对象对B对象的引用却是通过传统的“硬引用”完成的。当垃圾回收策略开始工作时它会发现这样的情况,并且即使在内存空间不够的情况下,也不会对B对象进行回收,直到针对B对象的所有引用可达性消失。

在JAVA中还有弱引用、虚引用两个概念(Camel中的LRUWeakCache就是基于弱引用实现的)。但是由于他们至少和我们重点说明的DefaultCamelContext没有太多关系,所以这里笔者就不再发散性的讲下去了。对这块还不太了解的读者可以自行参考JDK官方文档。

4-2-4、LRU算法简介
LRU的全称是Least Recently Used(最近最少使用),它是一种选择算法,有的文章中也把LRU算法称为“缓存淘汰算法”。在计算机技术实践中它被广泛用于缓存功能的开发,例如处理内存分页与虚拟内存的置换问题,或者又像Camel那样用于计算选择Endpoint对象将从缓存结构中被移除。下图的结构说明了LRU算法的大致工作过程:

这里写图片描述

上图中,我们可以看到几个关键点:

整个队列有一个阀值用于限制能够存放于队列容器中的最大元素个数,这个阀值我们暂且称为maxCacheSize。

当队列中的元素还没有达到这个maxCacheSize时,进入队列的元素将被放置在队列的最前面,队列会保持这种处理策略直到队列中的元素达到maxCacheSize为止。

当队列中的某个元素被选择时(一般来说,队列允许开发人员在选择元素时传入一个Key,队列会依据这个Key进行元素选择),被命中的元素又会重新排列到队列的最前面。这样一来,队列最尾部的元素就是近期使用最少的一个元素。

一旦当队列中的元素达到maxCacheSize后(不可能超过),新进入队列中的元素将会把队列最尾部的元素挤出队列,而它自己会排列到队列的最顶部。

4-2-5、Camel中的LRUSoftCache

那么我们介绍的SoftReference、LRU和我们本节正在讲述的DefaultCamelContext有什么联系呢?在DefaultCamelContext中,用来进行Endpoint注册存储管理的类称为EndpointRegistry,它就是依据LRU算法原则决定哪些Endpoint定义应该存放在缓存中。具体来说,EndpointRegistry中使用“软引用”方式,通过ConcurrentLinkedHashMap提供的既有LRU技术支持实现了存在于内存中的高效缓存。它在DefaultCamelContext的变量定义如下:

......
private Map<EndpointKey, Endpoint> endpoints = new EndpointRegistry(this, endpoints);
......

EndpointRegistry和它继承的父类LRUSoftCache,以及它更高层的父类LRUCache的主要结构如下所示:

这里写图片描述
LRUCache的主要结构

/**
 * A Least Recently Used Cache.
 * If this cache stores org.apache.camel.Service then this implementation will on eviction
 * invoke the {org.apache.camel.Service#stop()} method, to auto-stop the service.
 */
public class LRUCache<K, V> implements Map<K, V>, EvictionListener<K, V>, Serializable {
    // 这个值记录LRU队列的最大值
    private int maxCacheSize = 10000;
    // 一个布尔型,表示如果LRU队列中的元素被消除时,是否试着执行Service的stop方法
    // 因为存储在这个LRU中的元素一般来说是实现了Service接口的元素
    private boolean stopOnEviction;
    // 这个计数器用于统计LRU中元素的命中次数
    private final AtomicLong hits = new AtomicLong();
    // 这个计数器用于统计LRU中元素的未命中次数
    private final AtomicLong misses = new AtomicLong();
    // 这个计数器用于统计LRU中元素的移除数量
    private final AtomicLong evicted = new AtomicLong();
    // 由Google实现的一个数据结构,后文详细介绍
    private ConcurrentLinkedHashMap<K, V> map;
 
    ......
    // 这个构造函数有三个参数,分别是:
    // initialCapacity:LRU队列的初始化大小
    // maximumCacheSize:LRU队列的最大元素大小
    // stopOnEviction:这个布尔值表示是否试图对可能的Service元素进行stop操作
    public LRUCache(int initialCapacity, int maximumCacheSize, boolean stopOnEviction) {
        // 构造函数主要的作用就是初始化ConcurrentLinkedHashMap对象
        map = new ConcurrentLinkedHashMap.Builder<K, V>()
                .initialCapacity(initialCapacity)
                .maximumWeightedCapacity(maximumCacheSize)
                .listener(this).build();
        this.maxCacheSize = maximumCacheSize;
        this.stopOnEviction = stopOnEviction;
    }
    ......
 
    /**
     * 该方法在元素被从LRU队列中注销时被触发。
     * 其中调用的stopService方法,将会试图停止service的运行,如果value实现了Service接口的话
     * */
    @Override
    public void onEviction(K key, V value) {
        evicted.incrementAndGet();
        LOG.trace("onEviction {} -> {}", key, value);
        // 如果条件则开始stop动作
        if (stopOnEviction) {
            try {
                // stop service as its evicted from cache
                ServiceHelper.stopService(value);
            } catch (Exception e) {
                LOG.warn("Error stopping service: " + value + ". This exception will be ignored.", e);
            }
        }
    }
    ......
}

ConcurrentLinkedHashMap是Google提供的一个数据结构,其使用特性和java.util.LinkedHashMap一致,但是它是线程安全的。更重要的是ConcurrentLinkedHashMap的工作方式就是一个已经实现好的LRU的算法。LRUSoftCache的主要结构

/**
 * A Least Recently Used Cache which uses SoftReference. 
 *
 * This implementation uses java.lang.ref.SoftReference for stored values in the cache
 * to support the JVM when it wants to reclaim objects when it's running out of memory.
 * Therefore this implementation does not support all the java.util.Map methods. 
 * */
public class LRUSoftCache<K, V> extends LRUCache<K, V> {
 
    ......
    /**
     * 构造函数
     * */
    public LRUSoftCache(int initialCapacity, int maximumCacheSize, boolean stopOnEviction) {
        // 这是调用父级LRUCache的构造函数
        super(initialCapacity, maximumCacheSize, stopOnEviction);
    }
    ......
 
    /**
     * SoftReference是LRUSoftCache最关键的地方,后文介绍
     * */
    @Override
    @SuppressWarnings("unchecked")
    public V put(K key, V value) {
        SoftReference<V> put = new SoftReference<V>(value);
        SoftReference<V> prev = (SoftReference<V>) super.put(key, (V) put);
        return prev != null ? prev.get() : null;
    }
 
    ......
 
    /**
     * 取出Key所对应的“软引用”,并且从“软引用”中视图获取value本身
     * */
    @Override
    @SuppressWarnings("unchecked")
    public V get(Object o) {
        SoftReference<V> ref = (SoftReference<V>) super.get(o);
        return ref != null ? ref.get() : null;
    }
    ......
}

SoftReference是LRUSoftCache最关键的地方,请注意以上代码片段中的put方法。该方法就是向ConcurrentLinkedHashMap送入一个新的K-V的元素,但是注意了,该方法并不是把Value直接送入ConcurrentLinkedHashMap,而是创建一个针对Value的“软引用”SoftReference,并将其作为Value送入ConcurrentLinkedHashMap。通过get方法获取Key对应的Value时,也是从ConcurrentLinkedHashMap中首先获取“软引用”对象。需要注意的是,这时的“软引用”中是否还存在真实的值并不清楚,所以要进行一下判断再进行返回。

EndpointRegistry的主要结构

/**
 * A Least Recently Used Cache which uses SoftReference. 
 *
 * This implementation uses java.lang.ref.SoftReference for stored values in the cache
 * to support the JVM when it wants to reclaim objects when it's running out of memory.
 * Therefore this implementation does not support all the java.util.Map methods. 
 * */
public class LRUSoftCache<K, V> extends LRUCache<K, V> {
 
    ......
    /**
     * 构造函数
     * */
    public LRUSoftCache(int initialCapacity, int maximumCacheSize, boolean stopOnEviction) {
        // 这是调用父级LRUCache的构造函数
        super(initialCapacity, maximumCacheSize, stopOnEviction);
    }
    ......
 
    /**
     * SoftReference是LRUSoftCache最关键的地方,后文介绍
     * */
    @Override
    @SuppressWarnings("unchecked")
    public V put(K key, V value) {
        SoftReference<V> put = new SoftReference<V>(value);
        SoftReference<V> prev = (SoftReference<V>) super.put(key, (V) put);
        return prev != null ? prev.get() : null;
    }
 
    ......
 
    /**
     * 取出Key所对应的“软引用”,并且从“软引用”中视图获取value本身
     * */
    @Override
    @SuppressWarnings("unchecked")
    public V get(Object o) {
        SoftReference<V> ref = (SoftReference<V>) super.get(o);
        return ref != null ? ref.get() : null;
    }
    ......
}

EndpointRegistry的主要结构

......
/**
 * Endpoint registry which is a based on a {@link org.apache.camel.util.LRUSoftCache}.
 * <p/>
 * We use a soft reference cache to allow the JVM to re-claim memory if it runs low on memory.
 */
public class EndpointRegistry extends LRUSoftCache<EndpointKey, Endpoint> implements StaticService {
    ......
}
......

实际上EndpointRegistry在代码结构中最主要的作用就是确定K-V的泛型结构,因为主要的LRU结构已经通过LRUCache实现了,另外基于“软引用”的技术逻辑也都已经通过LRUSoftCache实现了。所以我们用一句话总结整个EndpointRegistry的实现:通过LRUCache保证已经注册并且最近使用频繁的Endpoint对象一定存在于缓存中,通过LRUSoftCache保证所有已保存在内存中Endpoint对象不会导致JVM内存溢出。

5、使用XML形式编排路由
除了上文中我们一直使用的DSL进行路由编排的操作方式以外,Apache Camel也支持使用XML文件描述进行路由编排。通过XML文件开发人员还可以将Camel和Spring结合起来使用——两者本来就可以进行无缝集成。下面我们对这种方式的使用大致进行一下介绍。首先我们创建一个XML文件,和Spring结合使用的:

<?xml version="1.0" encoding="UTF-8"?>
 
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:camel="http://camel.apache.org/schema/spring" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring-2.14.1.xsd ">
 
    <camel:camelContext xmlns="http://camel.apache.org/schema/spring">
        <camel:endpoint id="jetty_from" uri="jetty:http://0.0.0.0:8282/directCamel"/>
        <camel:endpoint id="log_to" uri="log:helloworld2?showExchangeId=true"/>
 
        <camel:route>
            <camel:from ref="jetty_from"/>
            <camel:to ref="log_to"/>
        </camel:route>
    </camel:camelContext>
 
    ......
</beans>

以上xml文件中我们定义了一个Camel路由过程。请注意xml文件中所使用的schema xsd路径,不同的Apache Camel版本所使用的xsd路径是不一样的,这在Camel的官方文档中有详细说明:http://camel.apache.org/xml-reference.html。在示例代码中笔者使用的Camel版本是V2.14.1。

XML文件描述中,笔者定义了两个endpoint:id为“jetty_from”的Endpoint将作为route的入口,接着传来的Http协议信息将到达id为“log_to”endpoint中。后者是一个Log4j的操作,最终Exchange中的In Message Body信息将打印在控制台上。接下来我们启动测试程序:

......
/**
* 日志
 */
private static final Log LOGGER = LogFactory.getLog(SpringXML.class);
 
public static void main(String[] args) throws Exception {
    /*
     * 这里是测试代码
     * 作为架构师,您应该知道在应用程序中如何进行Spring的加载、如果在Web程序中进行加载、如何在OSGI中间件中进行加载
     * 
     * Camel会以SpringCamelContext类作为Camel上下文对象
     * */
    ApplicationContext ap = new ClassPathXmlApplicationContext("application-config.xml");
    SpringXML.LOGGER.info("初始化....." + ap);
 
    // 没有具体的业务含义,只是保证主线程不退出
    synchronized (SpringXML.class) {
        SpringXML.class.wait();
    }
}
...... 

那么在Camel中如何使用Spring中业已存在的Bean对象呢?我们再将本小计中以上示例进行深入,在Route中加入一个由Spring托管的处理器对象(Processor),并在Processor中引用Spring托管的另一个Bean对象:DoSomethingService。这是一个由Spring容器管理的Bean。书写方式就像您某个Spring工程中书写一个Spring Bean一样:

/**
 * 这是一个服务层接口定义
 * @author yinwenjie
 */
public interface DoSomethingService {
    public void doSomething(String userid);
}
 
==================================以上是接口,下面是接口实现
 
/**
 * 实现了定义的DoSomethingService接口,并且交由Spring Ioc容器托管
 * @author yinwenjie
 */
@Component("DoSomethingServiceImpl")
public class DoSomethingServiceImpl implements DoSomethingService {
    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(DoSomethingServiceImpl.class);
 
    /* (non-Javadoc)
     * @see com.yinwenjie.test.cameltest.helloworld.spring.DoSomethingService#doSomething(java.lang.String)
     */
    @Override
    public void doSomething(String userid) {
        DoSomethingServiceImpl.LOGGER.info("doSomething(String userid) ...");
    }
}

自定义的Processor处理器,交给Spring托管。可以看到和之前大家书写的Processor没有太大区别。无非是多出了一个Spring提供的“Component”注解标记:

/**
 * 自定义的处理器,处理器本身交由Spring Ioc容器管理
 * 并且其中注入了一个DoSomethingService接口的实现
 * @author yinwenjie
 */
@Component("defineProcessor")
public class DefineProcessor implements Processor {
    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(DefineProcessor.class);
 
    @Autowired
    private DoSomethingService somethingService;
 
    @Override
    public void process(Exchange exchange) throws Exception {
        // 调用somethingService,说明它正常工作
        this.somethingService.doSomething("yinwenjie");
        // 这里在控制台打印一段日志,证明这个Processor正常工作了,就行
        DefineProcessor.LOGGER.info("process(Exchange exchange) ... ");
    }
}

接下来我们就可以在原来的XML文件中修改Route的编排,加入这个被Spring Ioc容器托管的Processor处理器。以上代码已经明示,这个自定义的Processor处理器在Spring Ioc容器中的id为“defineProcessor”:
 

......
<camel:camelContext xmlns="http://camel.apache.org/schema/spring">
    <camel:endpoint id="jetty_from" uri="jetty:http://0.0.0.0:8282/directCamel"/>
    <camel:endpoint id="log_to" uri="log:helloworld2?showExchangeId=true"/>
 
    <camel:route>
        <camel:from ref="jetty_from"/>
        <camel:to ref="log_to"/>
 
        <!-- 这是新加的processor处理器 -->
        <camel:process ref="defineProcessor"></camel:process>
    </camel:route>
</camel:camelContext>
......

以下为控制台显示的执行效果:

[2016-07-11 19:37:36] INFO  qtp405711462-19 Exchange[Id: ID-yinwenjie-240-55321-1468237049924-0-1, ExchangePattern: InOut, BodyType: org.apache.camel.converter.stream.InputStreamCache, Body: [Body is instance of org.apache.camel.StreamCache]] (MarkerIgnoringBase.java:96)

[2016-07-11 19:37:36] INFO  qtp405711462-19 doSomething(String userid) … (DoSomethingServiceImpl.java:24)

[2016-07-11 19:37:36] INFO  qtp405711462-19 process(Exchange exchange) …  (DefineProcessor.java:31)

注意控制台打印的第一句日志还是由原来的Log4j-endpoint打印的,接着路由会执行defineProcessor处理器中的somethingService.doSomething()方法,打印出第二句日志。最后由defineProcessor中的Log4j打印出最后一句——整个由Camel和Spring集成的Route工作是正常的。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/150484.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!