使用 AspectJ 和 JMX 进行消息流追踪

工程 | Ben Hale | 2006年4月25日 | ...

在我曾经工作的一个项目中,我们有一个系统负责接收来自设备的消,并决定是否将该信息传递给用户。该系统有多个决策层级,我们遇到的问题之一始终是:消息在通过系统的过程中是否丢失了?

在我们转向 Spring 之前,几乎不可能回答这个问题。我们尝试使用日志记录,但由于做出决策的消息数量庞大,这种方法充其量只能说是繁琐。也尝试使用调试器,但庞大的数据量和时序变化导致成功率时有时无。

遗憾的是,我离开时我们还没能实现一个更合适的解决方案,但如果当时做了,它可能会是这样的。最后,我将讨论在这种工作中可能有用的一些扩展。

首先,我们有一组接口及其实现


package flowtracingexample;

public interface Component1 {

	void forwardCall();

}

package flowtracingexample;

import java.util.Random;

public class DefaultComponent1 implements Component1 {
	
	private Component2 child;

	private Random r = new Random();
	
	public DefaultComponent1(Component2 child) {
		this.child = child;
	}

	public void forwardCall() {
		if (r.nextBoolean()) {
			child.forwardCall();
		}
	}

}

package flowtracingexample;

public interface Component2 {

	void forwardCall();

}

package flowtracingexample;

import java.util.Random;

public class DefaultComponent2 implements Component2 {
	
	private Component3 child;

	private Random r = new Random();
	
	public DefaultComponent2(Component3 child) {
		this.child = child;
	}

	public void forwardCall() {
		if (r.nextBoolean()) {
			child.forwardCall();
		}
	}

}

package flowtracingexample;

public interface Component3 {

	void forwardCall();

}

package flowtracingexample;

public class DefaultComponent3 implements Component3 {

	public void forwardCall() {
	}

}

这是一个非常简单的例子,但要点是使用 fowardCall() 方法时,消息有 50% 的时间会传递给下一个子组件(在本例中按数字升序)。请注意,这些 POJO 中没有任何与追踪相关的逻辑。

为了实现我们的追踪行为,我们希望有一组计数器;每个组件一个。此外,我们希望能够重置计数器、启动和停止监控,并确定监控是否正在进行。为此,我们实现了一个包含这些计数器的类。


package flowtracingexample;

import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;

@ManagedResource
public class FlowTracer {

	private long component1Count = 0;

	private long component2Count = 0;

	private long component3Count = 0;

	private boolean tracing = false;

	@ManagedAttribute
	public long getComponent1Count() {
		return this.component1Count;
	}

	@ManagedAttribute
	public long getComponent2Count() {
		return this.component2Count;
	}

	@ManagedAttribute
	public long getComponent3Count() {
		return this.component3Count;
	}

	@ManagedAttribute
	public boolean getTracing() {
		return this.tracing;
	}

	public void incrementComponent1Count() {
		if (this.tracing) {
			component1Count++;
		}
	}

	public void incrementComponent2Count() {
		if (this.tracing) {
			component2Count++;
		}
	}

	public void incrementComponent3Count() {
		if (tracing) {
			component3Count++;
		}
	}

	@ManagedOperation
	public void resetAllComponentCount() {
		resetComponent1Count();
		resetComponent2Count();
		resetComponent3Count();
	}

	@ManagedOperation
	public void resetComponent1Count() {
		this.component1Count = 0;
	}

	@ManagedOperation
	public void resetComponent2Count() {
		this.component2Count = 0;
	}

	@ManagedOperation
	public void resetComponent3Count() {
		this.component3Count = 0;
	}

	@ManagedOperation
	public void startTracing() {
		tracing = true;
	}

	@ManagedOperation
	public void stopTracing() {
		tracing = false;
	}
}

这个类的方法及其内容都相当直观。对您来说可能比较新的是这个类上的注解。当每个 bean 部署到 JMX MBeanServer 时,Spring 的 JMX 支持会使用这些注解自动构建 MBean 管理接口。

  • ManagedResource:声明该类应作为 JMX MBean 暴露
  • ManagedAttribute:声明此 getter/setter 表示的 JavaBean 属性应为 MBean 属性。如果需要对此属性进行读写访问,则需要同时注解 getter 和 setter。
  • ManagedOperation:声明此方法应作为 MBean 操作暴露

最后,是将所有内容连接起来的问题。首先,我们将构成流程的组件连接起来。接下来,我们声明将追踪器应用于每个组件的切面。在本例中,我们使用了非常出色的 AspectJ 切入点语言。最后,我们设置 JMX exporter,使其能够自动检测带有 @ManagedResource 注解的类实例。


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/aop 
       http://www.springframework.org/schema/aop/spring-aop.xsd">

	<!-- Components -->
	<bean id="component3" class="flowtracingexample.DefaultComponent3" />

	<bean id="component2"
		class="flowtracingexample.DefaultComponent2">
		<constructor-arg ref="component3" />
	</bean>

	<bean id="component1"
		class="flowtracingexample.DefaultComponent1">
		<constructor-arg ref="component2" />
	</bean>

	<!-- Aspect -->
	<bean id="flowTracer" class="flowtracingexample.FlowTracer" />

	<aop:config>
		<aop:aspect id="component1Aspect" ref="flowTracer">
			<aop:before method="incrementComponent1Count"
				pointcut="execution(public void flowtracingexample.Component1.forwardCall())" />
		</aop:aspect>

		<aop:aspect id="component2Aspect" ref="flowTracer">
			<aop:before method="incrementComponent2Count"
				pointcut="execution(public void flowtracingexample.Component2.forwardCall())" />
		</aop:aspect>

		<aop:aspect id="component3Aspect" ref="flowTracer">
			<aop:before method="incrementComponent3Count"
				pointcut="execution(public void flowtracingexample.Component3.forwardCall())" />
		</aop:aspect>
	</aop:config>

	<!-- JMX -->
	<bean class="org.springframework.jmx.export.MBeanExporter">
		<property name="autodetectModeName" value="AUTODETECT_ALL" />
		<property name="assembler">
			<bean
				class="org.springframework.jmx.export.assembler.MetadataMBeanInfoAssembler">
				<property name="attributeSource">
					<bean
						class="org.springframework.jmx.export.annotation.AnnotationJmxAttributeSource" />
				</property>
			</bean>
		</property>
		<property name="namingStrategy">
			<bean
				class="org.springframework.jmx.export.naming.IdentityNamingStrategy" />
		</property>
	</bean>

</beans>

接下来我们需要做的是有一个驱动类。在本例中,驱动类只是以小于 750ms 的随机延迟发送消息。


package flowtracingexample;

import java.io.IOException;
import java.util.Random;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class FlowTracingExample {

	public static void main(String[] args) throws InterruptedException,
			IOException {
		ApplicationContext ctx = new ClassPathXmlApplicationContext(
				"classpath:flowtracingexample/applicationContext.xml");

		Component1 comp = (Component1) ctx.getBean("component1");
		Random r = new Random();

		System.out.print("Ready...");
		 System.in.read();

		for (;;) {
			comp.forwardCall();
			Thread.sleep(r.nextInt(750));
		}
	}
}

在我的情况下,我将运行此应用程序并启用 Java VM 管理功能,因为它为我提供了一个免费的 MBean 服务器(而且我喜欢那些漂亮的内存图)。如果您没听说过它,它是 Java 5 VM 中的一个系统属性,可以让 VM 使用 JMX 管理自身。它包含了内存消耗、线程等许多内容的 bean。您只需在运行应用程序的命令行中添加 -Dcom.sun.management.jmxremote 即可启动它。作为 Java 5 的另一个巧妙补充,我将使用 jconsole 来显示结果。

根据我生疏的数学技能,从长远来看,我期望看到 Component 1 被调用 100%,Component 2 被调用 50%,Component 3 被调用 25% 的时间。让我们看看

Tracing Screen Shot

很高兴我的概率记对了。最好的部分是这仍然符合良好的设计原则。例如,所有组件都不知道任何关于追踪的事情,因为那不是它们的职责。此外,这个子系统的所有追踪需求都包含在一个类中,并且有一个实现,符合 AOP 的需求与实现 1:1 的目标。最后,有了关闭追踪的功能,任何性能影响都或多或少地被抵消了。我知道,我知道增加一个整数并不昂贵,但如果你的追踪做了昂贵的事情,这个功能就很有用,而且你无需担心是否要将其投入生产;你可以简单地禁用监控,直到客户来电寻求支持。

所以这些图表确实很漂亮,如果你知道你预期的百分比,它们甚至可以告诉你一些信息,但你还能做些什么呢?比如最后 100 条通过的消息及其决策?比如记录消息被丢弃的原因?比如丢弃决策与消息未到达管道末端之间的关联?如果你从未故意丢弃某条消息,但它在进入 500ms 内未能到达末端(可能是由于线程问题),能够知道该消息已丢失,难道不好吗?沿着同样的思路,如果消息从管道一端到达另一端的时间超过 250ms,给管理员发送一封电子邮件怎么样?

追踪/监控的可能性是无限的(并且是可插拔的!)。你会用它做什么呢?

当然,还有源代码

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

抢先一步

VMware 提供培训和认证,助您加速发展。

了解更多

获取支持

Tanzu Spring 通过一项简单的订阅,为 OpenJDK™、Spring 和 Apache Tomcat® 提供支持和二进制文件。

了解更多

即将到来的活动

查看 Spring 社区所有即将到来的活动。

查看全部