使用 AspectJ 和 JMX 进行消息流跟踪

工程 | Ben Hale | 2006 年 4 月 25 日 | ...

在我曾经工作的一个项目中,我们有一个系统可以从设备接收消息,并决定是否将该信息传递给用户。存在多个决策级别,我们总是发现自己会问的一个问题是,消息是否在通过系统的过程中丢失了。

在我们搬到 Spring 之前,几乎不可能知道那个问题的答案。我们尝试使用日志记录,但是因为有大量消息需要做出决策,这使得日志记录充其量也只是乏味的。其他尝试使用调试器,但由于消息量和时间变化的综合影响,只取得了间歇性的成功。

不幸的是,我在我们能够实现一个更合适的解决方案之前就离开了,但如果我还在,这大概就是它会是什么样子。最后,我将讨论在这种工作中可能有用的一些扩展。

首先,我们有一组接口及其实现


package flowtracingexample;

public interface Component1 {

	void forwardCall();

}

package flowtracingexample;

import java.util.Random;

public class DefaultComponent1 implements Component1 {
	
	private Component2 child;

	private Random r = new Random();
	
	public DefaultComponent1(Component2 child) {
		this.child = child;
	}

	public void forwardCall() {
		if (r.nextBoolean()) {
			child.forwardCall();
		}
	}

}

package flowtracingexample;

public interface Component2 {

	void forwardCall();

}

package flowtracingexample;

import java.util.Random;

public class DefaultComponent2 implements Component2 {
	
	private Component3 child;

	private Random r = new Random();
	
	public DefaultComponent2(Component3 child) {
		this.child = child;
	}

	public void forwardCall() {
		if (r.nextBoolean()) {
			child.forwardCall();
		}
	}

}

package flowtracingexample;

public interface Component3 {

	void forwardCall();

}

package flowtracingexample;

public class DefaultComponent3 implements Component3 {

	public void forwardCall() {
	}

}

这是一个非常简单的例子,但要点是使用 fowardCall() 方法将消息 50% 的时间传递给下一个子组件(在这种情况下按数字升序)。请注意,这些 POJO 中没有涉及跟踪的逻辑。

为了实现我们的跟踪行为,我们希望有一组计数器;每个组件一个。此外,我们希望有方法来重置计数器、启动和停止监视以及确定是否正在进行监视。为此,我们实现了一个带有计数器的类。


package flowtracingexample;

import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;

@ManagedResource
public class FlowTracer {

	private long component1Count = 0;

	private long component2Count = 0;

	private long component3Count = 0;

	private boolean tracing = false;

	@ManagedAttribute
	public long getComponent1Count() {
		return this.component1Count;
	}

	@ManagedAttribute
	public long getComponent2Count() {
		return this.component2Count;
	}

	@ManagedAttribute
	public long getComponent3Count() {
		return this.component3Count;
	}

	@ManagedAttribute
	public boolean getTracing() {
		return this.tracing;
	}

	public void incrementComponent1Count() {
		if (this.tracing) {
			component1Count++;
		}
	}

	public void incrementComponent2Count() {
		if (this.tracing) {
			component2Count++;
		}
	}

	public void incrementComponent3Count() {
		if (tracing) {
			component3Count++;
		}
	}

	@ManagedOperation
	public void resetAllComponentCount() {
		resetComponent1Count();
		resetComponent2Count();
		resetComponent3Count();
	}

	@ManagedOperation
	public void resetComponent1Count() {
		this.component1Count = 0;
	}

	@ManagedOperation
	public void resetComponent2Count() {
		this.component2Count = 0;
	}

	@ManagedOperation
	public void resetComponent3Count() {
		this.component3Count = 0;
	}

	@ManagedOperation
	public void startTracing() {
		tracing = true;
	}

	@ManagedOperation
	public void stopTracing() {
		tracing = false;
	}
}

该类的方法及其内容都非常简单。对您来说可能比较新颖的是该类上的注解。这些注解由 Spring 的 JMX 支持使用,以便在每个 bean 部署到 JMX MBeanServer 时自动构建 MBean 管理接口。

  • ManagedResource:声明此类应作为 JMX MBean 公开
  • ManagedAttribute:声明此 getter/setter 表示的 JavaBean 属性应为 MBean 属性。如果您希望对此属性具有读写访问权限,则需要同时注解 getter 和 setter。
  • ManagedOperation:声明此方法应作为 MBean 操作公开

最后,是把整个东西连接起来。首先,我们连接构成流的组件。接下来,我们声明将跟踪器放置在每个组件上的切面。在这种情况下,我们使用的是非常棒的 AspectJ 切入点语言。最后,我们设置 JMX 导出器以自动检测带有 @ManagedResource 注解的类的实例。


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/aop 
       http://www.springframework.org/schema/aop/spring-aop.xsd">

	<!-- Components -->
	<bean id="component3" class="flowtracingexample.DefaultComponent3" />

	<bean id="component2"
		class="flowtracingexample.DefaultComponent2">
		<constructor-arg ref="component3" />
	</bean>

	<bean id="component1"
		class="flowtracingexample.DefaultComponent1">
		<constructor-arg ref="component2" />
	</bean>

	<!-- Aspect -->
	<bean id="flowTracer" class="flowtracingexample.FlowTracer" />

	<aop:config>
		<aop:aspect id="component1Aspect" ref="flowTracer">
			<aop:before method="incrementComponent1Count"
				pointcut="execution(public void flowtracingexample.Component1.forwardCall())" />
		</aop:aspect>

		<aop:aspect id="component2Aspect" ref="flowTracer">
			<aop:before method="incrementComponent2Count"
				pointcut="execution(public void flowtracingexample.Component2.forwardCall())" />
		</aop:aspect>

		<aop:aspect id="component3Aspect" ref="flowTracer">
			<aop:before method="incrementComponent3Count"
				pointcut="execution(public void flowtracingexample.Component3.forwardCall())" />
		</aop:aspect>
	</aop:config>

	<!-- JMX -->
	<bean class="org.springframework.jmx.export.MBeanExporter">
		<property name="autodetectModeName" value="AUTODETECT_ALL" />
		<property name="assembler">
			<bean
				class="org.springframework.jmx.export.assembler.MetadataMBeanInfoAssembler">
				<property name="attributeSource">
					<bean
						class="org.springframework.jmx.export.annotation.AnnotationJmxAttributeSource" />
				</property>
			</bean>
		</property>
		<property name="namingStrategy">
			<bean
				class="org.springframework.jmx.export.naming.IdentityNamingStrategy" />
		</property>
	</bean>

</beans>

接下来我们需要做的是有一个驱动类。在这种情况下,驱动类只是以小于 750 毫秒的随机延迟发送一条消息。


package flowtracingexample;

import java.io.IOException;
import java.util.Random;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class FlowTracingExample {

	public static void main(String[] args) throws InterruptedException,
			IOException {
		ApplicationContext ctx = new ClassPathXmlApplicationContext(
				"classpath:flowtracingexample/applicationContext.xml");

		Component1 comp = (Component1) ctx.getBean("component1");
		Random r = new Random();

		System.out.print("Ready...");
		 System.in.read();

		for (;;) {
			comp.forwardCall();
			Thread.sleep(r.nextInt(750));
		}
	}
}

在我的情况下,我将运行带有 Java VM 管理的这个应用程序,因为它为我提供了一个免费的 MBean 服务器(而且我喜欢漂亮的内存图)。如果你没有听说过,它是Java 5 VM 中的一个系统属性,它使 VM 使用 JMX 来管理自身。它有关于内存消耗、线程和无数其他东西的 bean。你只需在运行应用程序的命令行上添加 -Dcom.sun.management.jmxremote 即可启动它。在另一个方便的 Java 5 补充中,我将使用 jconsole 来显示我的结果。

根据我不怎么灵光的数学技能,从长远来看,我预计组件 1 被调用 100%,组件 2 被调用 50%,组件 3 被调用 25%。让我们看看

Tracing Screen Shot

很高兴看到我记对了我的概率。最好的部分是这仍然符合良好的设计原则。例如,这些组件都不知道任何关于追踪的事情,因为那不是它们的工作。同样,这个子系统的所有追踪要求都包含在一个类中,并且有一个实现满足 AOP 的 1:1 要求到实现目标。最后,通过关闭追踪的能力,任何性能影响都或多或少地被抵消了。我知道,我知道增加一个整数并不昂贵,但是如果你的追踪做了昂贵的事情,拥有它很好,而且你不必担心是否要将其投入生产;你可以简单地禁用监控,直到你的客户打电话寻求支持。

所以图表确实很漂亮,如果你知道你预期的百分比,甚至可能会告诉你一些东西,但你还能做些什么呢?那么最近的 100 条消息及其决策呢?那么消息被丢弃的原因日志呢?那么丢弃决策和管道末端消息缺失之间的关联呢?如果一条消息因为你从未故意丢弃它但在其进入 500 毫秒内未能到达末端而丢失(可能是由于线程问题),那知道这一点难道不好吗?沿着这条线,如果从管道一端到另一端所需的时间超过 250 毫秒,那么给管理员发送一封电子邮件怎么样?

跟踪/监控的可能性是无限的(而且是可插拔的!)。你会用它做什么?

当然,还有源代码

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有