wangwenxiong 4 лет назад
Родитель
Сommit
2eca98823e

+ 7 - 1
o2server/configSample/mq.json

@@ -23,8 +23,14 @@
   "activeMQ":{
 	   "url":"tcp://127.0.0.1:61616",
 	   "queueName":"queue-test",
+	   "keyStore":"C:/Users/wwx/client.ks",
+	   "trustStore":"C:/Users/wwx/client.ts",
+	   "keyStorePassword":"password",
 	   "###url": "服务地址,端口默认61616.###",
-	   "###queueName": "要创建的消息名称###"
+	   "###queueName": "要创建的消息名称###",
+	   "###keyStore": "密钥文件存储路径###",
+	   "###trustStore": "证书文件存储路径###",
+	   "###keyStorePassword": "密钥密码###"
   },
   "###enable": "是否启用.###",
   "###mq": "消息服务类型.###"

+ 33 - 0
o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQActive.java

@@ -22,6 +22,15 @@ public class MQActive extends ConfigObject {
 	@FieldDescribe("消息队列名")
 	private String queueName;
 	
+	@FieldDescribe("密钥文件存储路径")
+	private String keyStore;
+	
+	@FieldDescribe("证书文件存储路径")
+	private String trustStore;
+	
+	@FieldDescribe("密钥密码")
+	private String keyStorePassword;
+	
 	public static MQActive defaultInstance() {
 		return new MQActive();
 	}
@@ -50,6 +59,30 @@ public class MQActive extends ConfigObject {
 	public void setQueueName(String queueName) {
 		this.queueName = queueName;
 	}
+
+	public String getKeyStore() {
+		return keyStore;
+	}
+
+	public void setKeyStore(String keyStore) {
+		this.keyStore = keyStore;
+	}
+
+	public String getTrustStore() {
+		return trustStore;
+	}
+
+	public void setTrustStore(String trustStore) {
+		this.trustStore = trustStore;
+	}
+
+	public String getKeyStorePassword() {
+		return keyStorePassword;
+	}
+
+	public void setKeyStorePassword(String keyStorePassword) {
+		this.keyStorePassword = keyStorePassword;
+	}
 	
 
 	

+ 85 - 15
o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/ActiveMQ.java

@@ -10,6 +10,8 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSslConnectionFactory;
+
 import com.google.gson.Gson;
 import com.x.base.core.project.config.Config;
 import com.x.base.core.project.config.MQActive;
@@ -17,6 +19,13 @@ import com.x.base.core.project.logger.Logger;
 import com.x.base.core.project.logger.LoggerFactory;
 import com.x.message.core.entity.Message;
 
+import java.io.FileInputStream;
+import java.security.KeyStore;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+
 public class ActiveMQ implements MQInterface {
 	
 	private static Logger logger = LoggerFactory.getLogger(ActiveMQ.class);
@@ -28,16 +37,37 @@ public class ActiveMQ implements MQInterface {
 		try {
 			    MQActive configMQ = Config.mq().getActiveMQ();
 			    logger.info("MqActive initialize.....");
-			    
-			    String url=configMQ.getUrl();
 			    String queueName=configMQ.getQueueName();
-	
-				ConnectionFactory factory=new ActiveMQConnectionFactory(url);
-			    this.connection= factory.createConnection();
-				this.connection.start();
-				this.session= this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-				Destination destination=session.createQueue(queueName);
-				this.producer = session.createProducer(destination);
+			    String url=configMQ.getUrl();
+			    url = url.trim();
+			    
+			    String protocol = url.substring(0, 3);
+			   if(protocol.equalsIgnoreCase("tcp")) {
+					ConnectionFactory factory=new ActiveMQConnectionFactory(url);
+				    this.connection= factory.createConnection();
+					this.connection.start();
+					
+					this.session= this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+					Destination destination=session.createQueue(queueName);
+					this.producer = session.createProducer(destination);
+					
+			   }else {
+				    String keyStore = configMQ.getKeyStore();
+				    String keyStorePassword = configMQ.getKeyStorePassword();
+				    String trustStore = configMQ.getTrustStore();
+				    
+			        ActiveMQSslConnectionFactory sslConnectionFactory = new ActiveMQSslConnectionFactory();
+			        sslConnectionFactory.setBrokerURL(url);
+			        sslConnectionFactory.setKeyAndTrustManagers(this.loadKeyManager(keyStore, keyStorePassword), this.loadTrustManager(trustStore),
+			                new java.security.SecureRandom());
+			        this.connection = sslConnectionFactory.createConnection();
+			        this.connection.start();
+			        
+			        this.session =  this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			        Destination destination = session.createQueue(queueName);
+			        this.producer = session.createProducer(destination);
+			   }
+			   
 		} catch (Exception e) {
 			e.printStackTrace();
 			logger.error(e);
@@ -57,12 +87,12 @@ public class ActiveMQ implements MQInterface {
 	 
 	 public static void main(String[] args) {
 		   ActiveMQ MQClient = getInstance();
-		   //System.out.println(MQClient.getTopic());
 		   Message msg = new Message();
 		   msg.setBody("body");
 		   msg.setConsumed(false);
 		   msg.setCreateTime(new Date());
 		   msg.setPerson("person");
+		   MQClient.sendMessage(msg);
 	 }
 
 	@Override
@@ -83,13 +113,53 @@ public class ActiveMQ implements MQInterface {
 	}
 
 	public void destroy() {
-		 System.out.println("MqActive destroy.....");
-		  try {
+		try {
+			logger.info("MqActive destroy.....");
 			this.connection.close();
 		} catch (JMSException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
+			 e.printStackTrace();
 			 logger.error(e);
 		}
-	   }
+	}
+	
+	
+	  /**
+        * 加载证书文件
+     * @param trustStore
+     * @return
+     * @throws java.security.NoSuchAlgorithmException
+     * @throws java.security.KeyStoreException
+     * @throws java.io.IOException
+     * @throws java.security.GeneralSecurityException
+     */
+    public static TrustManager[] loadTrustManager(String trustStore) throws java.security.NoSuchAlgorithmException, java.security.KeyStoreException,
+               java.io.IOException, java.security.GeneralSecurityException {
+          KeyStore ks = KeyStore. getInstance("JKS");
+          ks.load( new FileInputStream(trustStore), null);
+          TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory. getDefaultAlgorithm());
+          tmf.init(ks);
+          return tmf.getTrustManagers();
+    }
+
+    /**
+         * 加载密钥文件
+     * @param keyStore
+     * @param keyStorePassword
+     * @return
+     * @throws java.security.NoSuchAlgorithmException
+     * @throws java.security.KeyStoreException
+     * @throws java.security.GeneralSecurityException
+     * @throws java.security.cert.CertificateException
+     * @throws java.io.IOException
+     * @throws java.security.UnrecoverableKeyException
+     */
+    public static KeyManager[] loadKeyManager(String keyStore, String keyStorePassword) throws java.security.NoSuchAlgorithmException,
+               java.security.KeyStoreException, java.security.GeneralSecurityException, java.security.cert.CertificateException, java.io.IOException,
+               java.security.UnrecoverableKeyException {
+          KeyStore ks = KeyStore. getInstance("JKS");
+          ks.load( new FileInputStream(keyStore), keyStorePassword.toCharArray());
+          KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory. getDefaultAlgorithm());
+          kmf.init(ks, keyStorePassword.toCharArray());
+          return kmf.getKeyManagers();
+    }
 }