springBoot+jpa+jta+atpmikos十分钟实现分布式事务,模拟多数据源

springBoot+jpa+jta+atpmikos十分钟实现分布式事务,模拟多数据源

2019-10-18 22:20:22发布 浏览数:5451
概述:jta: Java Transactio API,即是java中对事务处理的api,api即是接口的意思. atomikos:Atomikos TransactionsEssentials 是一个为Java平台提供增值服务的并且开源类事务管理器,基于2PC协议.不明白XA 2PC 3PC TCC的同学可以花几分钟的时间去看一下.

1.pom的dependency依赖

[这里由于是demo,我在其父工程中指明了版本号.各位同学在学习的时候,可以自己选择版本号]

  <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--jta-atomikos的依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>
    </dependencies>

2.spring配置文件application.yml

server:
  port: 8081  #应用端口号
spring:
  application:
    name: seata-storage #应用名称
  jpa:  #jpa相关配置
    hibernate:
      ddl-auto: none
    show-sql: true
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQL5Dialect
        #format_sql: true
        transaction:
          jta:
            platform: com.ajc.seata.config.AtomikosJtaPlatform  #处理事务的类  hibernate不提供 需要自己写 需要继承 AbstractJtaPlatform[这里要注意 这里写的是这个类的全路径]
      javax:
        persistence:
          transactionType: JTA  #指明事务处理类型
  jta:  #jta相关配置 
    enabled: true #允许使用jta
    atomikos: #atomikos相关配置  主要是配置数据源
      datasource: #模拟多数据源
        jta-bank1: #数据源1  名字自定义  唯一就好
          xa-properties.url: jdbc:mysql://xx.xx.xx.xx:3306/bank1
          xa-properties.user: root
          xa-properties.password: 123456
          xa-data-source-class-name: com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
          unique-resource-name: jta-bank1
          uniqueResourceName: jta-bank1
          max-pool-size: 25
          min-pool-size: 3
          max-lifetime: 20000
          borrow-connection-timeout: 10000
        jta-bank2: #数据源2  名字自定义  唯一就好
          xa-properties.url: jdbc:mysql://xx.xx.xx.xx:3306/bank2
          xa-properties.user: root
          xa-properties.password: 123456
          xa-data-source-class-name: com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
          unique-resource-name: jta-bank2
          uniqueResourceName: jta-bank2
          max-pool-size: 25
          min-pool-size: 3
          max-lifetime: 20000
          borrow-connection-timeout: 10000

3.定义处理事务的类.[固定写法]

import org.hibernate.engine.transaction.jta.platform.internal.AbstractJtaPlatform;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

/**
 * 处理事务的类  hibernate不提供 需要自己写
 */
public class AtomikosJtaPlatform extends AbstractJtaPlatform {

    private static final long serialVersionUID = 1L;
      //事务管理器 atomikos的userTransactionManager
     public static TransactionManager transactionManager;
     //atomikos的事务管理的相关配置  比如超时时间 等
     public static UserTransaction transaction;

    @Override
    protected TransactionManager locateTransactionManager() {
        return transactionManager;
    }

    @Override
    protected UserTransaction locateUserTransaction() {
        return transaction;
    }
}

4.定义事务管理需要加载的Bean[固定写法,@Bean的name属性可以自定义]

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

/**
 * @Author: AnJunchao
 * @Description:
 * @Date: Create in 2019-10-11 15:01:34
 * @Modified By:
 */
@Configuration
@EnableConfigurationProperties
@EnableAutoConfiguration
@ComponentScan
@EnableTransactionManagement
public class DataSourceConfig {
    /**
     * transaction manager
     *
     * @return
     */
    @Bean(name="userTransaction")
    public UserTransaction userTransaction() {
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        //配置事务超时时间
        try {
            userTransactionImp.setTransactionTimeout(500);
        } catch (SystemException e) {
            e.printStackTrace();
        }
        return userTransactionImp;
    }

    /**
     * jta transactionManager  spring对jta的支持
     * @return AtomikosJtaPlatform.transactionManager = userTransactionManager;
     */
    @Bean(destroyMethod = "close", initMethod = "init",name = "transactionManager")
    public TransactionManager userTransactionManager() {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(false);//设置为true 网络出现问题时 强制断掉
        AtomikosJtaPlatform.transactionManager = userTransactionManager;
        return userTransactionManager;
    }
    /**
     *  事务管理器  注意这个bean的名字 一会儿还要用到
     */
    @Bean(name = "transactionManager")
    @DependsOn({"userTransaction", "transactionManager"})
    public PlatformTransactionManager transactionManager() throws Throwable {
        UserTransaction userTransaction = userTransaction();
        AtomikosJtaPlatform.transaction = userTransaction;
        TransactionManager atomikosTransactionManager = userTransactionManager();
        return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
    }
}

5.多个数据源相关配置

该demo中我在spring的配置文件中定义了两个数据库,模拟一下多数据源jta-bank1和jta-bank2

每个数据源有一个相关的配置类,看代码:

jta-bank1配置配置类 Bank1Config:

import com.atomikos.jdbc.AtomikosDataSourceBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;

/**
 * @Author: AJC
 * @Description:
 * @Date: Create in 2019-10-12 09:33:59
 * @Modified By:
 */
@Configuration
@EnableTransactionManagement
@DependsOn("transactionManager")
@EnableJpaRepositories(
        entityManagerFactoryRef="bank1ManagerFactory",//指明实体的管理Factory 名字在本类配置的LocalContainerEntityManagerFactoryBean
        transactionManagerRef="transactionManager",//这里注意了 这里的name就是上面DataSourceConfig这个类中事务管理器@Bean的name
        basePackages=  "com.ajc.seata.repository.bank1") //设置jta-bank1对应的Repository所在位置 
public class Bank1Config {
  /**
   * 配置数据源 可以配置多个但必须用@Qualifier("xxx")指明
   * @return
   */
  @Primary//由多个数据源的时候 用这个指定默
  @Bean
  @ConfigurationProperties(prefix = "spring.jta.atomikos.datasource.jta-bank1") //以什么开头的spring文件中的配置路径
  @Qualifier("dataSourceJTABank1")
  public DataSource dataSourceJTABank1() {
    return new AtomikosDataSourceBean();
  }

    @Autowired
    JpaProperties jpaProperties;
    //*--------- atomikos -----------*//*
    @Primary //自动装配时当出现多个Bean候选者时,被注解为@Primary的Bean将作为首选者,否则将抛出异常
    @Bean(name = "bank1ManagerFactory")
    @DependsOn("transactionManager")
    public LocalContainerEntityManagerFactoryBean bannk1ManagerFactoryDemo(EntityManagerFactoryBuilder builder) {
    //如果在spring的配置文件中不指定这里可以用这种方式指明
    // jpaProperties.getProperties().put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
        return builder
                .dataSource(dataSourceJTABank1())
                .properties(jpaProperties.getProperties())
                .packages("com.ajc.seata.entity.bank1") //设置实体类所在位置:类或包
                .persistenceUnit("jta-bank1") //持久化单元名称
                .build();
    }

}

jta-bank1配置配置类 Bank2Config:

同样的代码,只需要把相关包路径改成bank2

import com.atomikos.jdbc.AtomikosDataSourceBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;

/**
 * @Author: AJC
 * @Description:
 * @Date: Create in 2019-10-12 09:33:59
 * @Modified By:
 */
@Configuration
@EnableTransactionManagement
@DependsOn("transactionManager")
@EnableJpaRepositories(
        entityManagerFactoryRef="bank2ManagerFactory",
        transactionManagerRef="transactionManager",
        basePackages=  "com.ajc.seata.repository.bank2")
public class Bank2Config {
    @Autowired
    JpaProperties jpaProperties;
    @Bean
    @ConfigurationProperties(prefix = "spring.jta.atomikos.datasource.jta-bank2")
    @Qualifier("dataSourceJTABank2")
    public DataSource dataSourceJTABank2() {
        return new AtomikosDataSourceBean();
    }

    /*--------- atomikos -----------*/
    @Bean(name = "bank2ManagerFactory")
    @DependsOn("transactionManager")
    public LocalContainerEntityManagerFactoryBean bannk2ManagerFactoryDemo(EntityManagerFactoryBuilder builder) {
        //jpaProperties.getProperties().put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
        return builder
                .dataSource(dataSourceJTABank2())
                .properties(jpaProperties.getProperties())
                .packages("com.ajc.seata.entity.bank2") //设置实体类所在位置:类或包
                .persistenceUnit("jta-bank2") //持久化单元名称
                .build();
    }

}

到此配置结束;

6.定义entity,repository,service ,controller 基本和原来还是一样,就是不同的数据源分包

由于是多个数据源,所以不同的数据源的对应的entity和repository不能放在一起

注意 这里如果用本demo测试的话,看着实体的字段,自行创建数据表


这里bank1与bank2我用的表识一样的 这里为了分别区分,在实体名字后加了1和2;

AccountInfoBank1:

@Table(name = "account_info", schema = "bank1")
@Entity
@Data
public class AccountInfoBank1 implements Serializable {
    @Id
    @Column(name = "id")
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    @Column(name = "account_name")
    private String accountName;
    @Column(name = "account_no")
    private String accountNo;
    @Column(name = "account_password")
    private String accountPassword;
    @Column(name = "account_balance")
    private Double accountBalance;
}

AccountInfoBank1:

@Table(name = "account_info", schema = "bank2")
@Entity
@Data
public class AccountInfoBank2 implements Serializable {
    @Id
    @Column(name = "id")
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    @Column(name = "account_name")
    private String accountName;
    @Column(name = "account_no")
    private String accountNo;
    @Column(name = "account_password")
    private String accountPassword;
    @Column(name = "account_balance")
    private Double accountBalance;
}

AccountRepositoryBank1:

import com.ajc.seata.entity.bank1.AccountInfoBank1;
import org.springframework.data.jpa.repository.JpaRepository;

public interface AccountRepositoryBank1 extends JpaRepository<AccountInfoBank1,Long> {
}

AccountRepositoryBank2:

import com.ajc.seata.entity.bank2.AccountInfoBank2;
import org.springframework.data.jpa.repository.JpaRepository;

public interface AccountRepositoryBank2 extends JpaRepository<AccountInfoBank2,Long> {
}

service:

@Service
public class BankService {
    @Autowired
    private AccountRepositoryBank1 accountRepositoryBank1;
    @Autowired
    private AccountRepositoryBank2 accountRepositoryBank2;
    @Transactional(rollbackFor = Exception.class)//事务注解不可少
    public  String updateBankofAccount(Double amount)  {
        //张三账户减少余额
        AccountInfoBank1 accountInfoBank1 = accountRepositoryBank1.findById(2L).get();
        accountInfoBank1.setAccountBalance(accountInfoBank1.getAccountBalance()-amount);
        AccountInfoBank1 save = accountRepositoryBank1.saveAndFlush(accountInfoBank1);
        if(amount==2){ //人为模拟异常
            throw new RuntimeException("张三账户无法减少余额,访问失败");

        }
        //李四账户增加余额
        AccountInfoBank2 accountInfoBank2 = accountRepositoryBank2.findById(3L).get();
        accountInfoBank2.setAccountBalance(accountInfoBank2.getAccountBalance()+amount);
        AccountInfoBank2 accountInfoBank21 = accountRepositoryBank2.saveAndFlush(accountInfoBank2);
        if(amount==3){//人为模拟异常
            throw new RuntimeException("李四账户无法减少余额,访问失败");
        }
    return "success";
    }
}

controlelr:

@RestController
@RequestMapping("/bank")
public class BankController {
@Autowired
private BankService bankService;
    @Autowired
    JpaProperties jpaProperties;
    @GetMapping("/updateBank")
    public String updateBankofAccount(@RequestParam("amount") Double amount) throws HeuristicRollbackException, HeuristicMixedException, NotSupportedException, RollbackException, SystemException {
        return  bankService.updateBankofAccount(amount);
    }
}

application:

@SpringBootApplication
@EnableDiscoveryClient
public class StorageApplication {
    public static void main(String[] args) {
        SpringApplication.run(StorageApplication.class,args);
    }
}

到此启动测试就可以了.

demo代码地址:https://gitee.com/anjunchao/jta-atomikos

请先
登录
后评论
0 条评论
暂时没有评论
最新文章
更多