xiongzhu пре 4 година
родитељ
комит
5444020691

+ 4 - 0
src/main/java/com/izouma/nineth/config/GeneralProperties.java

@@ -17,4 +17,8 @@ public class GeneralProperties {
     private String updateStockTopic;
     private String updateSaleGroup;
     private String updateSaleTopic;
+    private String orderNotifyGroup;
+    private String orderNotifyTopic;
+    private String mintGroup;
+    private String mintTopic;
 }

+ 25 - 0
src/main/java/com/izouma/nineth/listener/MintListener.java

@@ -0,0 +1,25 @@
+package com.izouma.nineth.listener;
+
+import com.izouma.nineth.service.AssetMintService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.ConsumeMode;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+@AllArgsConstructor
+@RocketMQMessageListener(
+        consumerGroup = "${general.mint-group}",
+        topic = "${general.mint-topic}",
+        consumeMode = ConsumeMode.ORDERLY)
+public class MintListener implements RocketMQListener<Long> {
+    private AssetMintService assetMintService;
+
+    @Override
+    public void onMessage(Long assetId) {
+        assetMintService.mint(assetId);
+    }
+}

+ 3 - 3
src/main/java/com/izouma/nineth/listener/TestConsumer.java

@@ -14,10 +14,10 @@ import org.springframework.stereotype.Service;
 @AllArgsConstructor
 @RocketMQMessageListener(
         consumerGroup = "test-group", topic = "test-add-topic", consumeMode = ConsumeMode.CONCURRENTLY)
-public class TestConsumer implements RocketMQListener<MyMqEvent> {
+public class TestConsumer implements RocketMQListener<Object> {
 
     @SneakyThrows
-    public void onMessage(MyMqEvent event) {
-        log.info("receive message: {}", event.getData());
+    public void onMessage(Object event) {
+        log.info("receive message: {}", event);
     }
 }

+ 33 - 31
src/main/java/com/izouma/nineth/service/AssetMintService.java

@@ -17,6 +17,8 @@ import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.context.ApplicationContext;
+import org.springframework.retry.annotation.Backoff;
+import org.springframework.retry.annotation.Retryable;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
@@ -31,9 +33,10 @@ public class AssetMintService {
     private NFTService         nftService;
     private ApplicationContext applicationContext;
 
-    @Async
-    public void mint(Long assetId, Long userId) {
-        User user = userRepo.findById(userId).orElseThrow(new BusinessException("用户不存在"));
+    @Retryable(maxAttempts = 10, backoff = @Backoff(delay = 1000))
+    public void mint(Long assetId) {
+        Asset asset = assetRepo.findById(assetId).orElseThrow(new BusinessException("asset不存在"));
+        User user = userRepo.findById(asset.getUserId()).orElseThrow(new BusinessException("用户不存在"));
         if (StringUtils.isEmpty(user.getPublicKey())) {
             NFTAccount account = nftService.createAccount(user.getUsername() + "_");
             user.setNftAccount(account.getAccountId());
@@ -42,7 +45,6 @@ public class AssetMintService {
             userRepo.save(user);
         }
         try {
-            Asset asset = assetRepo.findById(assetId).orElseThrow(new BusinessException("asset不存在"));
             NFT nft = nftService.createToken(user.getNftAccount(), asset.getTokenId());
             if (nft != null) {
                 asset.setTokenId(nft.getTokenId());
@@ -62,33 +64,33 @@ public class AssetMintService {
         }
     }
 
-    @Async
-    public void mint(Asset asset) {
-        User user = userRepo.findById(asset.getUserId()).orElseThrow(new BusinessException("用户不存在"));
-        if (StringUtils.isEmpty(user.getPublicKey())) {
-            NFTAccount account = nftService.createAccount(user.getUsername() + "_");
-            user.setNftAccount(account.getAccountId());
-            user.setKmsId(account.getAccountKmsId());
-            user.setPublicKey(account.getPublicKey());
-            userRepo.save(user);
-        }
-        try {
-            NFT nft = nftService.createToken(user.getNftAccount(), asset.getTokenId());
-            if (nft != null) {
-                asset.setTokenId(nft.getTokenId());
-                asset.setBlockNumber(nft.getBlockNumber());
-                asset.setTxHash(nft.getTxHash());
-                asset.setGasUsed(nft.getGasUsed());
-                if (asset.getIpfsUrl() == null) {
-                    asset.setIpfsUrl(ipfsUpload(asset.getPic().get(0).getUrl()));
-                }
-                assetRepo.save(asset);
-            }
-        } catch (Exception e) {
-            log.error("铸造失败", e);
-        }
-        applicationContext.publishEvent(new CreateAssetEvent(this, true, asset));
-    }
+//    @Async
+//    public void mint(Asset asset) {
+//        User user = userRepo.findById(asset.getUserId()).orElseThrow(new BusinessException("用户不存在"));
+//        if (StringUtils.isEmpty(user.getPublicKey())) {
+//            NFTAccount account = nftService.createAccount(user.getUsername() + "_");
+//            user.setNftAccount(account.getAccountId());
+//            user.setKmsId(account.getAccountKmsId());
+//            user.setPublicKey(account.getPublicKey());
+//            userRepo.save(user);
+//        }
+//        try {
+//            NFT nft = nftService.createToken(user.getNftAccount(), asset.getTokenId());
+//            if (nft != null) {
+//                asset.setTokenId(nft.getTokenId());
+//                asset.setBlockNumber(nft.getBlockNumber());
+//                asset.setTxHash(nft.getTxHash());
+//                asset.setGasUsed(nft.getGasUsed());
+//                if (asset.getIpfsUrl() == null) {
+//                    asset.setIpfsUrl(ipfsUpload(asset.getPic().get(0).getUrl()));
+//                }
+//                assetRepo.save(asset);
+//            }
+//        } catch (Exception e) {
+//            log.error("铸造失败", e);
+//        }
+//        applicationContext.publishEvent(new CreateAssetEvent(this, true, asset));
+//    }
 
     public String ipfsUpload(String url) {
         try {

+ 11 - 5
src/main/java/com/izouma/nineth/service/AssetService.java

@@ -1,6 +1,7 @@
 package com.izouma.nineth.service;
 
 import com.izouma.nineth.TokenHistory;
+import com.izouma.nineth.config.GeneralProperties;
 import com.izouma.nineth.domain.*;
 import com.izouma.nineth.dto.PageQuery;
 import com.izouma.nineth.dto.UserHistory;
@@ -17,6 +18,9 @@ import com.izouma.nineth.utils.TokenUtils;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.springframework.beans.BeanUtils;
 import org.springframework.context.ApplicationContext;
 import org.springframework.data.domain.Page;
@@ -25,6 +29,7 @@ import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
 import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
 import java.time.LocalDateTime;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
@@ -44,8 +49,9 @@ public class AssetService {
     private ApplicationContext applicationContext;
     private OrderRepo          orderRepo;
     private TokenHistoryRepo   tokenHistoryRepo;
-    private AssetMintService   assetMintService;
     private SysConfigService   sysConfigService;
+    private RocketMQTemplate   rocketMQTemplate;
+    private GeneralProperties  generalProperties;
 
     public Page<Asset> all(PageQuery pageQuery) {
         return assetRepo.findAll(JpaUtils.toSpecification(pageQuery, Asset.class), JpaUtils.toPageRequest(pageQuery));
@@ -57,7 +63,7 @@ public class AssetService {
         asset.setNumber(number);
         asset.setOrderId(orderId);
         asset.setPrice(price);
-        assetRepo.save(asset);
+        assetRepo.saveAndFlush(asset);
 
         tokenHistoryRepo.save(TokenHistory.builder()
                 .tokenId(asset.getTokenId())
@@ -70,7 +76,7 @@ public class AssetService {
                 .operation(type)
                 .price(price)
                 .build());
-        assetMintService.mint(asset);
+        rocketMQTemplate.syncSend(generalProperties.getMintTopic(), asset.getId());
         return asset;
     }
 
@@ -93,7 +99,7 @@ public class AssetService {
                 .operation(type)
                 .price(price)
                 .build());
-        assetMintService.mint(asset.getId(), user.getId());
+        rocketMQTemplate.syncSend(generalProperties.getMintTopic(), asset.getId());
         return asset;
     }
 
@@ -367,7 +373,7 @@ public class AssetService {
 
     public String mint() {
         for (Asset asset : assetRepo.findByTxHashIsNullAndTokenIdNotNullAndCreatedAtBefore(LocalDateTime.now())) {
-            assetMintService.mint(asset);
+            rocketMQTemplate.syncSend(generalProperties.getMintTopic(), asset.getId());
         }
         return "ok";
     }

+ 1 - 1
src/main/java/com/izouma/nineth/utils/TokenUtils.java

@@ -7,7 +7,7 @@ import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 
 public class TokenUtils {
-    public static String genTokenId() {
+    public synchronized static String genTokenId() {
         try {
             Random random = ThreadLocalRandom.current();
             byte[] r = new byte[32];

+ 11 - 0
src/main/resources/application.yaml

@@ -145,6 +145,10 @@ general:
   update-stock-topic: update-stock-topic-dev
   update-sale-group: update-sale-group-dev
   update-sale-topic: update-sale-topic-dev
+  order-notify-group: order-notify-group-dev
+  order-notify-topic: order-notify-topic-dev
+  mint-group: mint-group-dev
+  mint-topic: mint-topic-dev
 mychain:
   rest:
     bizid: a00e36c5
@@ -183,6 +187,7 @@ rocketmq:
   name-server: 120.24.204.226:9876
   producer:
     group: my-producer-dev
+    send-message-timeout: 30000
 ---
 
 spring:
@@ -225,6 +230,12 @@ general:
   create-order-topic: create-order-topic
   update-stock-group: update-stock-group
   update-stock-topic: update-stock-topic
+  update-sale-group: update-sale-group
+  update-sale-topic: update-sale-topic
+  order-notify-group: order-notify-group
+  order-notify-topic: order-notify-topic
+  mint-group: mint-group
+  mint-topic: mint-topic
 wx:
   pay:
     notify-url: https://www.raex.vip/notify/order/weixin

+ 1 - 1
src/test/java/com/izouma/nineth/service/AssetServiceTest.java

@@ -60,7 +60,7 @@ class AssetServiceTest extends ApplicationTests {
     @Test
     public void mint() {
         Asset asset = assetRepo.findById(4622L).get();
-        assetMintService.mint(asset);
+        assetMintService.mint(asset.getId());
     }
 
     @Test