领先一步
VMware 提供培训和认证,助您加速进步。
了解更多Spring 通过其 PlatformTransactionManager 接口和一系列实现,为事务管理提供了丰富的支持。Spring 的事务支持为众多 API 的事务语义提供了一致的接口。广义上,事务可以分为两大类:本地事务和全局事务。本地事务是指只影响一个事务资源的事务。大多数情况下,这些资源都有自己的事务 API,即使事务的概念没有被明确地暴露出来。通常,它会以会话(session)的概念来暴露,这是一个工作单元,带有用于告知资源何时提交缓冲工作的 API。全局事务是指跨越一个或多个事务资源的事务,并将它们全部包含在一个事务中。
JMS 和 JDBC API 中的本地事务是一些常见的例子。在 JMS 中,用户可以创建一个事务性会话(transacted Session),发送和接收消息,当消息上的工作完成后,调用 Session.commit() 来告知服务器可以完成工作。在数据库领域,JDBC Connection 默认自动提交查询。这对于一次性语句来说是可以的,但通常最好将几个相关的语句收集到一个批处理中,然后全部提交或都不提交。在 JDBC 中,您可以通过首先将 Connection 的 setAutoCommit() 方法设置为 false,然后在批处理结束时显式调用 Connection.commit() 来实现这一点。这些 API 以及许多其他 API 都提供了事务性工作单元的概念,可以在客户端酌情提交、完成、刷新或其他方式使其永久化。API 差异很大,但概念是相同的。
全局事务是完全不同的东西。当您希望多个资源参与一个事务时,应该使用它们。这有时是一个要求:也许您想发送一条 JMS 消息并写入数据库?或者,您想使用 JPA 的两个不同的持久化上下文?在全局事务设置中,第三方事务监视器将多个事务资源包含在一个事务中,为提交做准备——在这个阶段,资源通常会执行类似预演提交的操作——然后,最后,提交每个资源。这些步骤构成了大多数全局事务实现的基础,称为两阶段提交 (2PC)。如果其中一个提交失败(由于在准备提交时不存在的原因,如网络中断),那么事务监视器会要求每个资源撤销或回滚上一个事务。
全局事务比常规的本地事务复杂得多,因为根据定义,它们必须涉及一个第三方代理,其唯一功能是协调多个事务资源之间的事务状态。事务监视器还必须知道如何与每个事务资源进行通信。由于这个代理还必须保留状态——毕竟,不能信任单个事务资源来了解其他资源在做什么——它具有持久性要求和同步成本,需要维护一个一致的事务日志,就像数据库一样。当发生灾难性事件并且事务监视器关闭时,它必须能够重新启动并重放正在进行的事务,以确保所有资源相对于任何中断的事务都处于一致状态。为了与事务资源进行通信,事务监视器必须与事务资源使用一个通用的协议。通常,这个协议是称为 XA 的规范。
在企业 Java 世界中,为应用程序添加 XA 的典型方法是使用 JTA。Java Transaction API (JTA) 是一个规范,为用户描述了一个标准的全局事务监视器 API。您可以使用 JTA API 以标准方式处理全局事务,适用于支持它的资源类型——通常只有 JDBC 和 JMS。Java EE 应用程序服务器开箱即用地支持 JTA,并且有第三方独立实现的 JTA,您可以用来避免被困在 Java EE 应用程序服务器上。
要使用 JTA,您需要在事务代码中做出一个非常具体的选择来使用它,因为该 API 与 JDBC 暴露的事务 API 大不相同,而 JDBC 的 API 又与 JMS 的 API 大不相同。
在旧代码中看到针对 JTA API 编写的代码是很常见的,即使只涉及一个资源,因为如果将来需要 XA,至少不必完全重写代码来利用 XA。这通常是一个令人遗憾但可以理解的决定。以这种方式编写的代码不幸地也常常与容器绑定;它将无法工作,除非 JNDI 和所有服务器的机器都在运行来引导事务监视器和 JNDI 服务器。
幸运的是,通过使用 Spring,这种复杂性可以优雅地避免。
PlatformTransactionManager API 以及 Spring 框架中相关的事务管理 API。Spring 框架及周边项目提供了丰富的 PlatformTransactionManager 实现,支持 JDBC、JMS、Gemfire、AMQP、Hibernate、JPA、JDO、iBatis 等多种本地事务。
Spring 框架还提供了 TransactionTemplate,它与 PlatformTransactionManager 实现一起工作,可以自动将一个工作单元封装在一个事务中,这样您甚至不需要了解 PlatformTransactionManager API 本身,并且 PlatformTransactionManager 的使用契约也得到了满足:事务将被正确启动、执行、准备和提交,并且——如果在处理过程中抛出异常,事务将被回滚。
@Inject private PlatformTransactionManager txManager;
TransactionTemplate template = new TransactionTemplate(this.txManager);
template.execute( new TransactionCallback<Object>(){
public void doInTransaction(TransactionStatus status){
// work done here will be wrapped by a transaction and committed.
// the transaction will be rolled back if
// status.setRollbackOnly(true) is called or an exception is thrown
}
});
更进一步,Spring 框架通过简单地启用它来提供强大的基于 AOP 的支持,用于将方法调用封装在事务中。有了这种支持,您就不再需要 TransactionTemplate——事务管理会自动发生于任何用声明式事务管理注解的方法。如果您正在使用 Spring 的 XML 配置,那么使用这个
<tx:annotation-driven transaction-manager = “platformTransactionManagerReference” />
如果您正在使用 Spring 3.1,那么您可以简单地注解您的 Java 配置类,如下所示
@EnableTransactionManagement
@Configuration
public class MyConfiguration {
...
此注解将自动扫描您的 bean,并查找一个 PlatformTransactionManager 类型的 bean,然后使用它。然后在您的 Java bean 中,只需通过注解来声明方法为事务性的。
@Transactional
public void work() {
// the transaction will be rolled back if the
// method throws an Exception, otherwise committed
}
Spring 通过一个名为 JtaTransactionManager 的 PlatformTransactionManager 实现,为基于 JTA 的全局事务实现提供了支持。如果您在 JavaEE 应用程序服务器上使用它,它将自动从 JNDI 中查找正确的 javax.transaction.UserTransaction 引用。此外,它还将尝试在 9 个不同的应用程序服务器中查找特定于容器的 javax.transaction.TransactionManager 引用,以用于更高级的用例,如事务挂起。在后台,Spring 加载不同的 JtaTransactionManager 子类,以利用不同服务器中特定、额外的功能(如果可用),例如:WebLogicJtaTransactionManager、WebSphereUowTransactionManager 和 OC4JJtaTransactionManager。
因此,如果您在 Java EE 应用程序服务器中,并且无法逃脱,但又想使用 Spring 的 JTA 支持,那么很可能您可以使用以下命名空间配置支持来正确地(并自动地)创建 JtaTransactionManager
<tx:jta-transaction-manager />
或者,您可以根据需要注册一个 JtaTransactionManager bean 实例,无需构造函数参数,如下所示
@Bean
public PlatformTransactionManager platformTransactionManager(){
return new JtaTransactionManager();
}
无论哪种方式,在 JavaEE 应用程序服务器中的最终结果是,借助 Spring,您可以现在以统一的方式使用 JTA 来管理您的事务。
许多人选择在 Java EE 应用程序服务器之外使用 JTA,原因显而易见:Tomcat 或 Jetty 更轻量、更快、更便宜,可以进行测试(而且更容易),业务逻辑通常不生活在应用程序服务器中,等等。今天,在云环境中,这些原因比以往任何时候都更重要,轻量级、可组合的资源是常态,而臃肿的、单体的应用程序服务器根本无法扩展。
有许多开源和商业的独立 JTA 事务管理器。在开源社区中,您有多种选择,如 Java Open Transaction Manager (JOTM)、JBoss TS、Bitronix Transaction Manager (BTM) 和 Atomikos。
在本帖中,我们将介绍一种采用全局事务的简单方法。我们将重点介绍 Atomikos 特定的配置,但在源代码中也有一个演示使用 Bitronix 的相同配置的示例。
在这种情况下,事务方法是一个简单的基于 JPA 的服务,它必须同时提交一个 JMS 消息。代码是一个典型的在线零售商购物车中的假设的结账方法。代码如下所示
@Transactional
public void checkout(long purchaseId) {
Purchase purchase = getPurchaseById(purchaseId);
if (purchase.isFrozen())
throw new RuntimeException(
"you can't check out Purchase(#" + purchase.getId() + ") that's already been checked out!");
Date purchasedDate = new Date();
Set<LineItem> lis = purchase.getLineItems();
for (LineItem lineItem : lis) {
lineItem.setPurchasedDate(purchasedDate);
entityManager.merge(lineItem);
}
purchase.setFrozen(true);
this.entityManager.merge(purchase);
log.debug("saved purchase updates");
this.jmsTemplate.convertAndSend(this.ordersDestinationName, purchase);
log.debug("sent partner notification");
}
该方法使用 @Transactional 注解来告诉 Spring 将其调用包装在一个事务中。该方法同时使用 JPA——用于合并实体已更改的状态——和 JMS。因此,工作跨越了两个事务资源,并且必须在这两者之间保持一致:数据库更新操作和 JMS 发送操作要么都成功,要么都回滚。毕竟,您不希望发送一个 JMS 消息来触发一个尚未支付的产品的履行周期!
为了测试 JTA 配置是否有效,您可以在最后一行简单地抛出一个 RuntimeException,如下所示
if (true) throw new RuntimeException("Monkey wrench!");
届时,在正常操作下,数据库中的购买实体应该已经将其状态更改为冻结,并且 JMS 消息应该已被发送,从而触发履行。最后一行抛出的异常将回滚这两个更改。Spring 的声明式事务管理将拦截异常,并使用配置的 JtaTransactionManager 自动回滚事务。然后,您可以验证这两个事件从未发生过,并且它们没有反映在相应的资源中:没有 JMS 消息将被排队,JPA 实体的数据库记录也不会被更改。我用来测试这个的测试用例是
package org.springsource.jta.etailer.store.services;
import org.apache.commons.logging.*;
import org.junit.*;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.AnnotationConfigContextLoader;
import org.springsource.jta.etailer.store.config.*;
import org.springsource.jta.etailer.store.domain.*;
import javax.inject.Inject;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(loader = AnnotationConfigContextLoader.class,
classes = {AtomikosJtaConfiguration.class, StoreConfiguration.class})
public class JpaDatabaseCustomerOrderServiceTest {
private Log log = LogFactory.getLog(getClass().getName());
@Inject private CustomerOrderService customerOrderService;
@Inject private CustomerService customerService;
@Inject private ProductService productService;
@Test
public void testAddingProductsToCart() throws Exception {
Customer customer = customerService.createCustomer("A", "Customer");
Purchase purchase = customerOrderService.createPurchase(customer.getId());
Product product1 = productService.createProduct(
"Widget1", "a widget that slices (but not dices)", 12.0);
Product product2 = productService.createProduct(
"Widget2", "a widget that dices (but not slices)", 7.5);
LineItem one = customerOrderService.addProductToPurchase(
purchase.getId(), product1.getId());
LineItem two = customerOrderService.addProductToPurchase(
purchase.getId(), product2.getId());
purchase = customerOrderService.getPurchaseById(purchase.getId());
assertTrue(purchase.getTotal() == (product1.getPrice() + product2.getPrice()));
assertEquals(one.getPurchase().getId(), purchase.getId());
assertEquals(two.getPurchase().getId(), purchase.getId());
// this is the part that requires XA to work correctly
customerOrderService.checkout(purchase.getId());
}
}
这个测试是一个简单的事务性脚本:购物者创建一个账户,找到她喜欢的东西,将它们作为行项目添加到购物车,然后结账。结账方法将购物车已更改的状态保存到数据库,然后发送一个触发 JMS 消息来通知其他系统新订单。正是在这个结账方法中,JTA 至关重要。
我们有两个配置类——JTA 提供商特定的代码,它被设置为正确地构造 Spring 的 JtaTransactionManager 实例——以及其余的配置,无论选择哪种 PlatformTransactionManager 策略,它都应该保持静态。
我使用了 Spring 的模块化 Java 配置,将 JTA 提供商特定的配置类与其余配置分开,这样您就可以轻松地在 Atomikos 特定的 JTA 配置和 Bitronix 特定的 JTA 配置之间切换。
让我们看一下 StoreConfiguration 类——无论您使用哪种事务管理器实现,它都将是相同的。我只节选了相关部分,以便您可以看到哪些部分与 JTA 提供商特定的配置进行了交互。
package org.springsource.jta.etailer.store.config;
import org.hibernate.cfg.ImprovedNamingStrategy;
import org.hibernate.dialect.MySQL5Dialect;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.Database;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.inject.Inject;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
import java.util.Properties;
@EnableTransactionManagement
@Configuration
@ComponentScan(value = "org.springsource.jta.etailer.store.services")
public class StoreConfiguration {
// ...
// this is a reference to a specific Java configuration class for JTA
@Inject private AtomikosJtaConfiguration jtaConfiguration ;
@Bean
public JmsTemplate jmsTemplate() throws Throwable{
JmsTemplate jmsTemplate = new JmsTemplate( jtaConfiguration.connectionFactory() );
// ...
}
@Bean
public LocalContainerEntityManagerFactoryBean entityManager () throws Throwable {
LocalContainerEntityManagerFactoryBean entityManager =
new LocalContainerEntityManagerFactoryBean();
entityManager.setDataSource(jtaConfiguration.dataSource());
Properties properties = new Properties();
// ...
jtaConfiguration.tailorProperties(properties);
entityManager.setJpaProperties(properties);
return entityManager;
}
@Bean
public PlatformTransactionManager platformTransactionManager() throws Throwable {
return new JtaTransactionManager(
jtaConfiguration.userTransaction(), jtaConfiguration.transactionManager());
}
}
配置都是非常标准的——只是您为任何 JPA 或 JMS 应用程序可能配置的普通对象。配置类需要访问 javax.jms.ConnectionFactory、javax.sql.DataSource、javax.transaction.UserTransaction 和 javax.transaction.TransactionManager 才能完成工作。由于满足这些接口的对象的构造特定于每个事务管理器实现,因此这些 bean 的定义位于一个单独的 Java 配置类中,我们在 StoreConfiguration 类的顶部使用字段注入(@Inject private AtomikosJtaConfiguration jtaConfiguration)导入它。
我们的 StoreConfiguration 通过 @EnableTransactionManagement 注解开启自动事务处理。
我们使用 Spring 3.1 的 @PropertySource 注解——它连接到环境抽象——来访问 services.properties 中的键值对。属性文件如下所示
dataSource.url=jdbc:mysql://127.0.0.1/crm
dataSource.driverClassName=com.mysql.jdbc.Driver
dataSource.dialect=org.hibernate.dialect.MySQL5InnoDBDialect
dataSource.user=crm
dataSource.password=crm
jms.partnernotifications.destination=orders
jms.broker.url=tcp://:61616
任何 JTA 配置最终都会做的最重要的事情是提供对提供商特定的 UserTransaction 和提供商特定的 TransactionManager 的引用,后者用于创建 PlatformTransactionManager 实例,如下所示
@Bean
public PlatformTransactionManager platformTransactionManager() throws Throwable {
UserTransaction userTransaction = jtaConfiguration.userTransaction() ;
TransactionManager transactionManager = jtaConfiguration.transactionManager() ;
return new JtaTransactionManager( userTransaction, transactionManager );
}
我们将只看其中一个实现,而不是 Bitronix 和 Atomikos 的具体细节,因为 Atomikos 配置的源代码在这里,而 Bitronix 配置的源代码在这里。让我们来分析 Atomikos 实现,以便我们能够分解重要的参与者。一旦知道了参与者是谁,那么理解任何第三方 JTA 提供商的配置就很简单了。在运行代码时,切换使用哪种配置也很直接。
package org.springsource.jta.etailer.store.config;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.icatch.jta.hibernate3.TransactionManagerLookup;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import javax.inject.Inject;
import javax.jms.ConnectionFactory;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
import java.util.Properties;
@Configuration
public class AtomikosJtaConfiguration {
@Inject private Environment environment ;
public void tailorProperties(Properties properties) {
properties.setProperty( "hibernate.transaction.manager_lookup_class",
TransactionManagerLookup.class.getName());
}
@Bean
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(1000);
return userTransactionImp;
}
@Bean(initMethod = "init", destroyMethod = "close")
public TransactionManager transactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
@Bean(initMethod = "init", destroyMethod = "close")
public DataSource dataSource() {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(this.environment.getProperty("dataSource.url"));
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(this.environment.getProperty("dataSource.password"));
mysqlXaDataSource.setUser(this.environment.getProperty ("dataSource.password"));
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("xads");
return xaDataSource;
}
@Bean(initMethod = "init", destroyMethod = "close")
public ConnectionFactory connectionFactory() {
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory();
activeMQXAConnectionFactory.setBrokerURL(this.environment.getProperty( "jms.broker.url") );
AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
atomikosConnectionFactoryBean.setUniqueResourceName("xamq");
atomikosConnectionFactoryBean.setLocalTransactionMode(false);
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
return atomikosConnectionFactoryBean;
}
}
Atomikos 提供了自己的 java.sql.DataSource 和 javax.jms.ConnectionFactory 包装器,它们将任何本地 java.sql.DataSource 或 javax.jms.ConnectionFactory 适配为 JTA(和 XA)感知的。
为了让 Hibernate 知道如何参与 Atomikos 事务,我们必须设置一个属性——hibernate.transaction.manager_lookup_class——在这种情况下,是一个名为 TransactionManagerLookup 的类。您需要为任何 JTA 实现执行此操作。
最后,我们需要提供一个 javax.transaction.TransactionManager 实现和一个 javax.transaction.UserTransaction 实现。类顶部的两个 bean 是 Atomikos 对这两个接口的实现,它们用于构造 Spring 的 JtaTransactionManager 实现,而 JtaTransactionManager 是 PlatformTransactionManager 的一个实现。
PlatformTransactionManager 实例随后被我们配置类上的 @EnableTransactionManagement 注解自动拾取,并在任何带有 @Transactional 的方法被调用时用于执行事务。
javax.transaction.UserTransaction 实现和 javax.transaction.TransactionManager 实现的职责相似:UserTransaction 是面向用户的 API,而 TransactionManager 是面向服务器的 API。所有 JTA 实现都指定了一个 UserTransaction0 实现,因为它是 JavaEE 所必需的最低要求。TransactionManager 不是必需的,并且并非在所有服务器或 JTA 实现中都可用。
对于熟悉 JTA 的人来说,使用 UserTransaction(就像在 JavaEE 中以编程方式控制事务一样)存在一些显著的差距,这可能是可以理解的,考虑到近十年前 J2EE 最初构想时,假设没有人会不使用 EJB 来进行事务管理。
问题在于,某些操作(如挂起事务(例如,获得“requires new”语义))只能在 TransactionManager 上进行。此接口在 JTA 规范中进行了标准化,但与 UserTransaction 不同,它没有提供一个众所周知的 JNDI 位置或其他获取它的方法。其他一些事情,例如控制隔离级别或服务器特定的“事务命名”(用于监控或其他目的),根本无法通过 JTA 实现。
TransactionManager 提供了事务挂起和恢复等高级功能,因此大多数提供商也支持它。事实上,许多 javax.transaction.TransactionManager 实现可以在运行时强制转换为 javax.transaction.UserTransaction 实现。Spring 知道这一点,并且在这方面非常智能。如果您只提供一个 javax.transaction.TransactionManager 的引用来定义 Spring 的 JtaTransactionManager 实现的实例,它将在运行时尝试从中强制转换出一个 javax.transaction.UserTransaction 实例。然而,Atomikos 不这样做,因此我们显式定义了一个 javax.transaction.UserTransaction 实例——并为了更好地利用 javax.transaction.TransactionManager 的增强功能——一个单独的 javax.transaction.TransactionManager 实例。
就是这样!您可以用 Spring 拥有并享用一切。Bitronix 配置看起来也很相似,因为它承担了类似的职责。您不太可能经常需要调整此代码。您很可能可以简单地重用此处提供的配置,只需相应地调整连接字符串和驱动程序。