Просмотр исходного кода

'增加发送到MQ,kafka,activeMQ'

o2wwx 5 лет назад
Родитель
Сommit
907104a285
21 измененных файлов с 902 добавлено и 50 удалено
  1. 8 1
      o2server/configSample/communicate.json
  2. 10 0
      o2server/configSample/messages.json
  3. 31 0
      o2server/configSample/mq.json
  4. 46 0
      o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Communicate.java
  5. 18 0
      o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Config.java
  6. 80 0
      o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQ.java
  7. 57 0
      o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQActive.java
  8. 146 0
      o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQKafka.java
  9. 10 10
      o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Messages.java
  10. 2 0
      o2server/x_base_core_project/src/main/java/com/x/base/core/project/message/MessageConnector.java
  11. 37 16
      o2server/x_meeting_assemble_control/src/main/java/com/x/meeting/assemble/control/jaxrs/meeting/ActionPaging.java
  12. 40 20
      o2server/x_meeting_assemble_control/src/main/java/com/x/meeting/assemble/control/jaxrs/meeting/ActionPagingManage.java
  13. 29 0
      o2server/x_message_assemble_communicate/pom.xml
  14. 12 0
      o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/ExceptionMQMessage.java
  15. 76 0
      o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/MQConsumeQueue.java
  16. 15 2
      o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/ThisApplication.java
  17. 20 1
      o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/jaxrs/connector/ActionCreate.java
  18. 95 0
      o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/ActiveMQ.java
  19. 105 0
      o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/KafkaMQ.java
  20. 7 0
      o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/MQInterface.java
  21. 58 0
      o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/schedule/TriggerMq.java

+ 8 - 1
o2server/configSample/communicate.json

@@ -9,9 +9,16 @@
   "clean": {
     "enable": true,
     "cron": "30 30 6 * * ?",
-    "keep": 7.0,
+    "keep": 7,
     "###enable": "是否启用###",
     "###cron": "定时cron表达式###",
     "###keep": "消息保留天数###"
+  },
+  "###cronMq": "定时触发发送到消息队列MQ.###",
+  "cronMq": {
+    "enable": true,
+    "cron": "0 */5 * * * ?",
+    "###enable": "是否启用###",
+    "###cron": "定时cron表达式###"
   }
 }

+ 10 - 0
o2server/configSample/messages.json

@@ -9,6 +9,7 @@
   "attachment_editor": {
     "consumers": [],
     "consumersV2": {
+      "mq": "",
       "pms": "",
       "zhengwuDingding": "",
       "qiyeweixin": "",
@@ -20,6 +21,7 @@
   "attachment_editorCancel": {
     "consumers": [],
     "consumersV2": {
+      "mq": "",
       "pms": "",
       "zhengwuDingding": "",
       "qiyeweixin": "",
@@ -31,6 +33,7 @@
   "attachment_editorModify": {
     "consumers": [],
     "consumersV2": {
+      "mq": "",
       "pms": "",
       "zhengwuDingding": "",
       "qiyeweixin": "",
@@ -42,6 +45,7 @@
   "attachment_share": {
     "consumers": [],
     "consumersV2": {
+      "mq": "",
       "pms": "",
       "zhengwuDingding": "",
       "qiyeweixin": "",
@@ -53,6 +57,7 @@
   "attachment_shareCancel": {
     "consumers": [],
     "consumersV2": {
+      "mq": "",
       "pms": "",
       "zhengwuDingding": "",
       "qiyeweixin": "",
@@ -95,6 +100,7 @@
   "meeting_delete": {
     "consumers": [],
     "consumersV2": {
+      "mq": "",
       "pms": "",
       "zhengwuDingding": "",
       "qiyeweixin": "",
@@ -106,6 +112,7 @@
   "meeting_invite": {
     "consumers": [],
     "consumersV2": {
+      "mq": "",
       "pms": "",
       "zhengwuDingding": "",
       "qiyeweixin": "",
@@ -139,6 +146,7 @@
   "read_create": {
     "consumers": [],
     "consumersV2": {
+      "mq": "",
       "pms": "",
       "zhengwuDingding": "",
       "qiyeweixin": "",
@@ -162,6 +170,7 @@
   "task_create": {
     "consumers": [],
     "consumersV2": {
+      "mq": "",
       "pms": "",
       "zhengwuDingding": "",
       "qiyeweixin": "",
@@ -177,6 +186,7 @@
   "task_press": {
     "consumers": [],
     "consumersV2": {
+      "mq": "",
       "pms": "",
       "zhengwuDingding": "",
       "qiyeweixin": "",

+ 31 - 0
o2server/configSample/mq.json

@@ -0,0 +1,31 @@
+{
+  "enable": true,
+  "mq":"kafka",
+  "kafka":{
+	  "bootstrap_servers": "localhost:9092",
+	  "topic":"topic-test",
+	  "acks": "all",
+	  "retries": 0,
+	  "batch_size": 16384,
+	  "linger_ms": 1,
+	  "buffer_memory": 33554432,
+	  "key_deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
+	  "value_deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
+	  "###bootstrap_servers": "服务器地址###",
+	  "###acks": "指定必须有多少个分区副本接收消息,生产者才认为消息写入成功,用户检测数据丢失的可能性###",
+	  "###retries": "生产者从服务器收到的错误有可能是临时性的错误的次数###",
+	  "###batch_size": "该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。###",
+	  "###linger_ms": "该参数指定了生产者在发送批次之前等待更多消息加入批次的时间,增加延迟,提高吞吐量###",
+	  "###buffer_memory": "该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息###",
+	  "###key_deserializer": "key值的序列化类###",
+	  "###value_deserializer": "value的序列化类###"
+  },
+  "activeMQ":{
+	   "url":"tcp://127.0.0.1:61616",
+	   "queueName":"queue-test",
+	   "###url": "服务地址,端口默认61616.###",
+	   "###queueName": "要创建的消息名称###"
+  },
+  "###enable": "是否启用.###",
+  "###mq": "消息服务类型.###"
+}

+ 46 - 0
o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Communicate.java

@@ -43,6 +43,51 @@ public class Communicate extends ConfigObject {
 		return BooleanUtils.isTrue(calendarEnable);
 	}
 
+	@FieldDescribe("定时触发发送到消息队列MQ.")
+	private CronMq cronMq;
+	
+	
+	public CronMq cronMq() {
+		return this.cronMq == null ? new CronMq() : this.cronMq;
+	}
+	
+	public static class CronMq extends ConfigObject {
+		
+		public static CronMq defaultInstance() {
+			CronMq o = new CronMq();
+			return o;
+		}
+		
+		public final static Boolean DEFAULT_ENABLE = false;
+		public final static String DEFAULT_CRON = "0 0 * * * ? *"; //每小时运行一次
+		
+		@FieldDescribe("是否启用")
+		private Boolean enable = DEFAULT_ENABLE;
+
+		@FieldDescribe("定时cron表达式")
+		private String cron = DEFAULT_CRON;
+		
+		public String getCron() {
+			if (StringUtils.isNotEmpty(this.cron) && CronExpression.isValidExpression(this.cron)) {
+				return this.cron;
+			} else {
+				return DEFAULT_CRON;
+			}
+		}
+
+		public Boolean getEnable() {
+			return BooleanUtils.isTrue(this.enable);
+		}
+
+		public void setCron(String cron) {
+			this.cron = cron;
+		}
+
+		public void setEnable(Boolean enable) {
+			this.enable = enable;
+		}
+	}
+	
 	@FieldDescribe("清理设置.")
 	private Clean clean;
 
@@ -51,6 +96,7 @@ public class Communicate extends ConfigObject {
 	}
 
 	public static class Clean extends ConfigObject {
+		private static final long serialVersionUID = 1L;
 
 		public static Clean defaultInstance() {
 			Clean o = new Clean();

+ 18 - 0
o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Config.java

@@ -64,6 +64,7 @@ public class Config {
 	public static final String PATH_CONFIG_WELINK = "config/welink.json";
 	public static final String PATH_CONFIG_ZHENGWUDINGDING = "config/zhengwuDingding.json";
 	public static final String PATH_CONFIG_QIYEWEIXIN = "config/qiyeweixin.json";
+	public static final String PATH_CONFIG_MQ = "config/mq.json";
 	public static final String PATH_CONFIG_LOGLEVEL = "config/logLevel.json";
 	public static final String PATH_CONFIG_BINDLOGO = "config/bindLogo.png";
 	public static final String PATH_CONFIG_SLICE = "config/slice.json";
@@ -1095,6 +1096,23 @@ public class Config {
 		return instance().zhengwuDingding;
 	}
 
+	private MQ mq;
+
+	public static MQ mq() throws Exception {
+		if (null == instance().mq) {
+			synchronized (Config.class) {
+				if (null == instance().mq) {
+					MQ obj = BaseTools.readConfigObject(PATH_CONFIG_MQ, MQ.class);
+					if (null == obj) {
+						obj = MQ.defaultInstance();
+					}
+					instance().mq = obj;
+				}
+			}
+		}
+		return instance().mq;
+	}
+	
 	private Vfs vfs;
 
 	public static Vfs vfs() throws Exception {

+ 80 - 0
o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQ.java

@@ -0,0 +1,80 @@
+package com.x.base.core.project.config;
+
+import java.io.File;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import com.x.base.core.project.annotation.FieldDescribe;
+import com.x.base.core.project.connection.HttpConnection;
+import com.x.base.core.project.gson.GsonPropertyObject;
+import com.x.base.core.project.gson.XGsonBuilder;
+import com.x.base.core.project.tools.DefaultCharset;
+
+public class MQ extends ConfigObject {
+
+	@FieldDescribe("是否启用.")
+	private Boolean enable;
+	
+	@FieldDescribe("消息服务类型")
+	private String mq;
+	
+	@FieldDescribe("Kafka服务器配置")
+	private MQKafka kafka;
+	
+	@FieldDescribe("ActiveMQ服务器配置")
+	private MQActive activeMQ;
+	
+	public static MQ defaultInstance() {
+		return new MQ();
+	}
+
+	public static final Boolean default_enable = false;
+	public static final String default_mq = "kafka";
+	
+	public MQ() {
+		this.enable = default_enable;
+		this.mq = default_mq;
+		
+	}
+	
+	public Boolean getEnable() {
+		return BooleanUtils.isTrue(this.enable);
+	}
+
+	public void setEnable(Boolean enable) {
+		this.enable = enable;
+	}
+
+	public String getMq() {
+		return mq;
+	}
+
+	public void setMq(String mq) {
+		this.mq = mq;
+	}
+
+	public MQKafka getKafka() {
+		return kafka;
+	}
+
+	public void setKafka(MQKafka kafka) {
+		this.kafka = kafka;
+	}
+
+	public MQActive getActiveMQ() {
+		return activeMQ;
+	}
+
+	public void setActiveMQ(MQActive activeMQ) {
+		this.activeMQ = activeMQ;
+	}
+
+
+
+	
+	
+}

+ 57 - 0
o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQActive.java

@@ -0,0 +1,57 @@
+package com.x.base.core.project.config;
+
+import java.io.File;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import com.x.base.core.project.annotation.FieldDescribe;
+import com.x.base.core.project.connection.HttpConnection;
+import com.x.base.core.project.gson.GsonPropertyObject;
+import com.x.base.core.project.gson.XGsonBuilder;
+import com.x.base.core.project.tools.DefaultCharset;
+
+public class MQActive extends ConfigObject {
+
+	@FieldDescribe("服务器地址")
+	private String url;
+	
+	@FieldDescribe("消息队列名")
+	private String queueName;
+	
+	public static MQActive defaultInstance() {
+		return new MQActive();
+	}
+
+	public static final String default_url = "tcp://127.0.0.1:61616";
+	public static final String default_queueName = "queue-test";
+	
+	public MQActive() {
+		this.url = default_url;
+		this.queueName = default_queueName;
+
+	}
+
+	public String getUrl() {
+		return url;
+	}
+
+	public void setUrl(String url) {
+		this.url = url;
+	}
+
+	public String getQueueName() {
+		return queueName;
+	}
+
+	public void setQueueName(String queueName) {
+		this.queueName = queueName;
+	}
+	
+
+	
+	
+}

+ 146 - 0
o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQKafka.java

@@ -0,0 +1,146 @@
+package com.x.base.core.project.config;
+
+import java.io.File;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import com.x.base.core.project.annotation.FieldDescribe;
+import com.x.base.core.project.connection.HttpConnection;
+import com.x.base.core.project.gson.GsonPropertyObject;
+import com.x.base.core.project.gson.XGsonBuilder;
+import com.x.base.core.project.tools.DefaultCharset;
+
+public class MQKafka extends ConfigObject {
+	
+	@FieldDescribe("服务器地址")
+	private String bootstrap_servers;
+	
+	@FieldDescribe("主题")
+	private String topic;
+	
+	@FieldDescribe("指定必须有多少个分区副本接收消息,生产者才认为消息写入成功")
+	private String acks;
+	
+	@FieldDescribe("错误的次数")
+	private Integer retries;
+
+	@FieldDescribe("批次可以使用的内存大小")
+	private Integer batch_size;
+	
+	@FieldDescribe("等待更多消息加入批次的时间")
+	private Integer linger_ms;
+	
+	@FieldDescribe("生产者内存缓冲区的大小")
+	private Integer buffer_memory;
+	
+	@FieldDescribe("key值的序列化类")
+	private String key_deserializer;
+	
+	@FieldDescribe("value的序列化类")
+	private String value_deserializer;
+	
+	public static MQKafka defaultInstance() {
+		return new MQKafka();
+	}
+
+	public static final String default_bootstrap_servers = "localhost:9092";
+	public static final String default_topic = "topic-test";
+	public static final String default_acks = "all";
+	public static final Integer default_retries = 0;
+	public static final Integer default_batch_size = 16384;
+	public static final Integer default_linger_ms= 1;
+	public static final Integer default_buffer_memory = 33554432;
+	public static final String default_key_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
+	public static final String default_value_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
+	
+	public MQKafka() {
+		this.bootstrap_servers = default_bootstrap_servers;
+		this.topic = default_topic;
+		this.acks = default_acks;
+		this.retries = default_retries;
+		this.batch_size = default_batch_size;
+		this.linger_ms = default_linger_ms;
+		this.buffer_memory= default_buffer_memory;
+		this.key_deserializer = default_key_deserializer;
+		this.value_deserializer = default_value_deserializer;
+	}
+	
+
+	public String getBootstrap_servers() {
+	      return StringUtils.isEmpty(bootstrap_servers) ? default_bootstrap_servers : this.bootstrap_servers; 
+	}
+	
+	public void setBootstrap_servers(String bootstrap_servers) {
+		this.bootstrap_servers = bootstrap_servers;
+	}
+
+	public String getTopic() {
+		   return StringUtils.isEmpty(topic) ? default_topic : this.topic; 
+	}
+
+	public void setTopic(String topic) {
+		this.topic = topic;
+	}
+
+	public String getAcks() {
+	    return StringUtils.isEmpty(acks) ? default_acks : this.acks; 
+
+	}
+
+	public void setAcks(String acks) {
+		this.acks = acks;
+	}
+
+	public Integer getRetries() {
+		return this.retries; 
+	}
+
+	public void setRetries(Integer retries) {
+		this.retries = retries;
+	}
+
+	public Integer getBatch_size() {
+		return  this.batch_size; 
+	}
+
+	public void setBatch_size(Integer batch_size) {
+		this.batch_size = batch_size;
+	}
+
+	public Integer getLinger_ms() {
+		return this.linger_ms; 
+	}
+
+	public void setLinger_ms(Integer linger_ms) {
+		this.linger_ms = linger_ms;
+	}
+
+	public Integer getBuffer_memory() {
+		return  this.buffer_memory; 
+	}
+
+	public void setBuffer_memory(Integer buffer_memory) {
+		this.buffer_memory = buffer_memory;
+	}
+
+	public String getKey_deserializer() {
+		return StringUtils.isEmpty(key_deserializer) ? default_key_deserializer : this.key_deserializer ;
+	}
+
+	public void setKey_deserializer(String key_deserializer) {
+		this.key_deserializer = key_deserializer;
+	}
+
+	public String getValue_deserializer() {
+		return StringUtils.isEmpty(value_deserializer) ? default_value_deserializer : this.value_deserializer ;
+	}
+
+	public void setValue_deserializer(String value_deserializer) {
+		this.value_deserializer = value_deserializer;
+	}
+	
+}

+ 10 - 10
o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Messages.java

@@ -28,51 +28,51 @@ public class Messages extends ConcurrentSkipListMap<String, Message> {
 		o.put(MessageConnector.TYPE_ATTACHMENT_SHARE,
 				new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
 						MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
-						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
+						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
 
 		o.put(MessageConnector.TYPE_ATTACHMENT_EDITOR,
 				new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
 						MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
-						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
+						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
 
 		o.put(MessageConnector.TYPE_ATTACHMENT_SHARECANCEL,
 				new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
 						MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
-						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
+						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
 
 		o.put(MessageConnector.TYPE_ATTACHMENT_EDITORCANCEL,
 				new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
 						MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
-						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
+						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
 
 		o.put(MessageConnector.TYPE_ATTACHMENT_EDITORMODIFY,
 				new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
 						MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
-						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
+						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
 		/* 文件通知结束 */
 
 		/* 会议通知 */
 		o.put(MessageConnector.TYPE_MEETING_INVITE,
 				new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
 						MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
-						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
+						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
 
 		o.put(MessageConnector.TYPE_MEETING_DELETE,
 				new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
 						MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
-						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
+						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
 		/* 会议通知结束 */
 
 		/* 待办已办通知 */
 		o.put(MessageConnector.TYPE_TASK_CREATE,
 				new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
 						MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
-						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
+						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
 		/* 待办提醒通知 */
 		o.put(MessageConnector.TYPE_TASK_PRESS,
 				new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
 						MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
-						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
+						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
 
 		o.put(MessageConnector.TYPE_TASK_DELETE, new Message());
 
@@ -85,7 +85,7 @@ public class Messages extends ConcurrentSkipListMap<String, Message> {
 		o.put(MessageConnector.TYPE_READ_CREATE,
 				new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
 						MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
-						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
+						MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
 
 		o.put(MessageConnector.TYPE_READ_DELETE, new Message());
 

+ 2 - 0
o2server/x_base_core_project/src/main/java/com/x/base/core/project/message/MessageConnector.java

@@ -135,6 +135,8 @@ public class MessageConnector {
 
 	public static final String CONSUME_QIYEWEIXIN = "qiyeweixin";
 
+	public static final String CONSUME_MQ = "mq";
+	
 	private static Context context;
 
 	private static LinkedBlockingQueue<Wrap> connectQueue = new LinkedBlockingQueue<>();

+ 37 - 16
o2server/x_meeting_assemble_control/src/main/java/com/x/meeting/assemble/control/jaxrs/meeting/ActionPaging.java

@@ -8,6 +8,7 @@ import javax.persistence.EntityManager;
 import javax.persistence.TypedQuery;
 import javax.persistence.criteria.CriteriaBuilder;
 import javax.persistence.criteria.CriteriaQuery;
+import javax.persistence.criteria.Expression;
 import javax.persistence.criteria.Order;
 import javax.persistence.criteria.Predicate;
 import javax.persistence.criteria.Root;
@@ -25,7 +26,6 @@ import com.x.base.core.project.http.ActionResult;
 import com.x.base.core.project.http.EffectivePerson;
 import com.x.base.core.project.logger.Logger;
 import com.x.base.core.project.logger.LoggerFactory;
-import com.x.base.core.project.tools.SortTools;
 import com.x.meeting.assemble.control.Business;
 import com.x.meeting.assemble.control.WrapTools;
 import com.x.meeting.assemble.control.wrapout.WrapOutMeeting;
@@ -51,8 +51,9 @@ class ActionPaging extends BaseAction {
 			Root<Meeting> root = cq.from(Meeting.class);
 			
 			Predicate p = cb.equal(root.get(Meeting_.applicant), effectivePerson.getDistinguishedName());
-			
-			p = cb.or(p, cb.isMember(effectivePerson.getDistinguishedName(),root.get(Meeting_.invitePersonList)));
+			//p = cb.or(p, cb.isMember(effectivePerson.getDistinguishedName(),root.get(Meeting_.invitePersonList)));
+			Expression<List<String>> expression = root.get(Meeting_.invitePersonList);
+			p = cb.or(p, expression.in(effectivePerson.getDistinguishedName()));
 			
 			if(!StringUtils.isBlank(wi.getSubject())) {
 				p = cb.and(p, cb.like(root.get(Meeting_.subject), "%" + wi.getSubject() + "%"));
@@ -96,6 +97,10 @@ class ActionPaging extends BaseAction {
 		    	p = cb.and(p, cb.equal(root.get(Meeting_.confirmStatus), ConfirmStatus.valueOf(wi.getConfirmStatus().trim())));
 			}
 			
+			if(!StringUtils.isBlank(wi.getApplicant())) {
+				p = cb.and(p, cb.equal(root.get(Meeting_.applicant), wi.getApplicant()));
+			 }
+			
 			if(!StringUtils.isBlank(wi.getInvitePersonList())) {
 				p = cb.and(p, cb.isMember( wi.getInvitePersonList().trim(),root.get(Meeting_.invitePersonList)));
 			 }
@@ -127,28 +132,32 @@ class ActionPaging extends BaseAction {
 	        }
 	        
 			cq.select(root.get(Meeting_.id)).where(p).orderBy(order);
-		
+			cq.distinct(true);
+			
 			 TypedQuery<String> typedQuery = em.createQuery(cq);
 			 int pageIndex = (page-1)*size;
-			 int pageSize = page*size;
+			 int pageSize = size;
 			 typedQuery.setFirstResult(pageIndex);
 			 typedQuery.setMaxResults(pageSize);
-			    
-			 //logger.info("typedQuery="+  typedQuery.toString()); 
 			 ids =  typedQuery.getResultList();
-			
-			 CriteriaQuery<Long> cqCount = cb.createQuery(Long.class);
-			 Root<Meeting> rootCount = cqCount.from(Meeting.class);
-			 cqCount.select(cb.countDistinct(rootCount)).where(p);
-			 Long count = em.createQuery(cqCount).getSingleResult().longValue();
-			 //logger.info("count="+  count); 
+			 //logger.info("pagingtypedQuery="+  typedQuery.toString()); 
+			 
+			 TypedQuery<String> tqCount = em.createQuery( cq.select(root.get(Meeting_.id)).where(p).distinct(true));
+			 List<String> allid = tqCount.getResultList();
+			 Long  tpsize =  (long) allid.size();
+			 //logger.info("ids count="+  tpsize); 
 			 
-			List<Wo> wos = Wo.copier.copy(emc.list(Meeting.class, ids));
+			 CriteriaQuery<Meeting> cqMeeting = cb.createQuery(Meeting.class);		
+			 Predicate pMeeting = cb.isMember(root.get(Meeting_.id), cb.literal(ids));
+			 Root<Meeting> rootMeeting = cqMeeting.from(Meeting.class);
+			 cqMeeting.select(rootMeeting).where(pMeeting).orderBy(order);
+		     List<Meeting> os = em.createQuery(cqMeeting).getResultList();
+		    
+			List<Wo> wos = Wo.copier.copy(os);
 			WrapTools.decorate(business, wos, effectivePerson);
 			WrapTools.setAttachment(business, wos);
-			SortTools.desc(wos, Meeting.startTime_FIELDNAME);
 			result.setData(wos);
-			result.setCount(count);
+			result.setCount(tpsize);
 			return result;
 		}
 	}
@@ -178,6 +187,9 @@ class ActionPaging extends BaseAction {
 		@FieldDescribe("会议预定状态.(allow|deny|wait)")
 		private String confirmStatus;
 		
+		@FieldDescribe("创建人员.")
+		private String applicant;
+		
 		@FieldDescribe("邀请人员,身份,组织.")
 		private String invitePersonList;
 		
@@ -240,6 +252,15 @@ class ActionPaging extends BaseAction {
 			this.completedTime = completedTime;
 		}
 
+		public String getApplicant() {
+			return applicant;
+		}
+
+		public void setApplicant(String applicant) {
+			this.applicant = applicant;
+		}
+
+		
 		public String getInvitePersonList() {
 			return invitePersonList;
 		}

+ 40 - 20
o2server/x_meeting_assemble_control/src/main/java/com/x/meeting/assemble/control/jaxrs/meeting/ActionPagingManage.java

@@ -25,10 +25,8 @@ import com.x.base.core.project.http.ActionResult;
 import com.x.base.core.project.http.EffectivePerson;
 import com.x.base.core.project.logger.Logger;
 import com.x.base.core.project.logger.LoggerFactory;
-import com.x.base.core.project.tools.SortTools;
 import com.x.meeting.assemble.control.Business;
 import com.x.meeting.assemble.control.WrapTools;
-import com.x.meeting.assemble.control.jaxrs.meeting.ActionPaging.Wo;
 import com.x.meeting.assemble.control.wrapout.WrapOutMeeting;
 import com.x.meeting.core.entity.ConfirmStatus;
 import com.x.meeting.core.entity.Meeting;
@@ -51,11 +49,14 @@ class ActionPagingManage extends BaseAction {
 			CriteriaQuery<String> cq = cb.createQuery(String.class);
 			Root<Meeting> root = cq.from(Meeting.class);
 			
-			if (StringUtils.isBlank(wi.getDistinguishedName())) {
-				throw new ExceptionDistinguishedNameEmpty();
+			Predicate p = cb.isNotNull(root.get(Meeting_.applicant));
+			
+			if (!StringUtils.isBlank(wi.getDistinguishedName())) {
+				//throw new ExceptionDistinguishedNameEmpty();
+				p = cb.and(p, cb.equal(root.get(Meeting_.applicant), wi.getDistinguishedName()));
 			}
 			
-			Predicate p = cb.equal(root.get(Meeting_.applicant), wi.getDistinguishedName());
+			//Predicate p = cb.equal(root.get(Meeting_.applicant), wi.getDistinguishedName());
 			
 			if(!StringUtils.isBlank(wi.getSubject())) {
 				p = cb.and(p, cb.like(root.get(Meeting_.subject), "%" + wi.getSubject() + "%"));
@@ -99,6 +100,10 @@ class ActionPagingManage extends BaseAction {
 		    	p = cb.and(p, cb.equal(root.get(Meeting_.confirmStatus), ConfirmStatus.valueOf(wi.getConfirmStatus().trim())));
 			}
 			
+			if(!StringUtils.isBlank(wi.getApplicant())) {
+				p = cb.and(p, cb.equal(root.get(Meeting_.applicant), wi.getApplicant()));
+			 }
+			
 			if(!StringUtils.isBlank(wi.getInvitePersonList())) {
 				p = cb.and(p, cb.isMember( wi.getInvitePersonList().trim(),root.get(Meeting_.invitePersonList)));
 			 }
@@ -129,31 +134,34 @@ class ActionPagingManage extends BaseAction {
 	        	   order =  cb.desc(root.get("startTime"));
 	        }
 	        
-			cq.select(root.get(Meeting_.id)).where(p).orderBy(order);
-		
+	    	 cq.select(root.get(Meeting_.id)).where(p).orderBy(order);
+			 cq.distinct(true);
+			
 			 TypedQuery<String> typedQuery = em.createQuery(cq);
 			 int pageIndex = (page-1)*size;
-			 int pageSize = page*size;
+			 int pageSize = size;
 			 typedQuery.setFirstResult(pageIndex);
 			 typedQuery.setMaxResults(pageSize);
-			    
-			 //logger.info("typedQuery="+  typedQuery.toString()); 
 			 ids =  typedQuery.getResultList();
-			
-			 CriteriaQuery<Long> cqCount = cb.createQuery(Long.class);
-			 Root<Meeting> rootCount = cqCount.from(Meeting.class);
-			 cqCount.select(cb.countDistinct(rootCount)).where(p);
-			 Long count = em.createQuery(cqCount).getSingleResult().longValue();
-			// logger.info("count="+  count); 
+			 //logger.info("pagingtypedQuery="+  typedQuery.toString()); 
+			 
+			 TypedQuery<String> tqCount = em.createQuery( cq.select(root.get(Meeting_.id)).where(p).distinct(true));
+			 List<String> allid = tqCount.getResultList();
+			 Long  tpsize =  (long) allid.size();
+			 //logger.info("ids count="+  tpsize); 
 			 
-			List<Wo> wos = Wo.copier.copy(emc.list(Meeting.class, ids));
+			 CriteriaQuery<Meeting> cqMeeting = cb.createQuery(Meeting.class);		
+			 Predicate pMeeting = cb.isMember(root.get(Meeting_.id), cb.literal(ids));
+			 Root<Meeting> rootMeeting = cqMeeting.from(Meeting.class);
+			 cqMeeting.select(rootMeeting).where(pMeeting).orderBy(order);
+		     List<Meeting> os = em.createQuery(cqMeeting).getResultList();
+		    
+			List<Wo> wos = Wo.copier.copy(os);
 			WrapTools.decorate(business, wos, effectivePerson);
 			WrapTools.setAttachment(business, wos);
-			SortTools.desc(wos, Meeting.startTime_FIELDNAME);
 			result.setData(wos);
-			result.setCount(count);
+			result.setCount(tpsize);
 			return result;
-		
 		}
 	}
 
@@ -185,6 +193,9 @@ class ActionPagingManage extends BaseAction {
 		@FieldDescribe("会议预定状态.(allow|deny|wait)")
 		private String confirmStatus;
 		
+		@FieldDescribe("创建人员.")
+		private String applicant;
+		
 		@FieldDescribe("邀请人员,身份,组织.")
 		private String invitePersonList;
 		
@@ -255,6 +266,15 @@ class ActionPagingManage extends BaseAction {
 			this.completedTime = completedTime;
 		}
 
+		public String getApplicant() {
+			return applicant;
+		}
+
+		public void setApplicant(String applicant) {
+			this.applicant = applicant;
+		}
+
+		
 		public String getInvitePersonList() {
 			return invitePersonList;
 		}

+ 29 - 0
o2server/x_message_assemble_communicate/pom.xml

@@ -28,6 +28,35 @@
 			<groupId>o2oa</groupId>
 			<artifactId>x_message_core_entity</artifactId>
 		</dependency>
+		
+		<dependency>
+		     <groupId>org.apache.kafka</groupId>
+			 <artifactId>kafka-clients</artifactId>
+			 <version>2.6.0</version>
+        </dependency>
+        <!-- 
+        <dependency>
+			  <groupId>org.apache.activemq</groupId>
+			  <artifactId>activemq-all</artifactId>
+			  <version>5.14.5</version>
+			  <exclusions>
+				     <exclusion> 
+					        <groupId>org.slf4j</groupId>
+					        <artifactId>slf4j-log4j12</artifactId>
+					  </exclusion>
+					  <exclusion> 
+					        <groupId>log4j</groupId>
+					        <artifactId>log4j</artifactId>
+					  </exclusion>
+	          </exclusions>
+        </dependency>
+        -->
+         <dependency>
+		    <groupId>org.apache.activemq</groupId>
+		    <artifactId>activemq-client</artifactId>
+		    <version>5.14.5</version>
+		</dependency>
+		
 	</dependencies>
 	<build>
 		<plugins>

+ 12 - 0
o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/ExceptionMQMessage.java

@@ -0,0 +1,12 @@
+package com.x.message.assemble.communicate;
+
+import com.x.base.core.project.exception.PromptException;
+
+class ExceptionMQMessage extends PromptException {
+
+	private static final long serialVersionUID = 4132300948670472899L;
+
+	ExceptionMQMessage(Integer retCode, String retMessage) {
+		super("发送消息队列失败,错误代码:{},错误消息:{}.", retCode, retMessage);
+	}
+}

+ 76 - 0
o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/MQConsumeQueue.java

@@ -0,0 +1,76 @@
+package com.x.message.assemble.communicate;
+
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.criteria.CriteriaBuilder;
+import javax.persistence.criteria.CriteriaQuery;
+import javax.persistence.criteria.Order;
+import javax.persistence.criteria.Predicate;
+import javax.persistence.criteria.Root;
+
+import com.google.gson.*;
+import com.x.base.core.container.EntityManagerContainer;
+import com.x.base.core.container.factory.EntityManagerContainerFactory;
+import com.x.base.core.project.config.Config;
+import com.x.base.core.project.logger.Logger;
+import com.x.base.core.project.logger.LoggerFactory;
+import com.x.base.core.project.message.MessageConnector;
+import com.x.base.core.project.queue.AbstractQueue;
+import com.x.message.assemble.communicate.mq.ActiveMQ;
+import com.x.message.assemble.communicate.mq.KafkaMQ;
+import com.x.message.assemble.communicate.mq.MQInterface;
+import com.x.message.core.entity.Message;
+import com.x.message.core.entity.Message_;
+
+
+public class MQConsumeQueue extends AbstractQueue<Message> {
+
+	private static Logger logger = LoggerFactory.getLogger(MQConsumeQueue.class);
+
+	protected void execute(Message message) throws Exception {
+		logger.info("MQConsumeQueue message.getTitle:"+ message.getTitle());
+		if (Config.mq().getEnable()) {
+			try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
+				Business business = new Business(emc);
+				MQInterface MQClient;
+				EntityManager em = business.entityManagerContainer().get(Message.class);
+				CriteriaBuilder cb = em.getCriteriaBuilder();
+				CriteriaQuery<Message> cq = cb.createQuery(Message.class);
+				Root<Message> root = cq.from(Message.class);
+
+				Order order = cb.desc(root.get(Message_.createTime));
+				Predicate p = cb.notEqual(root.get(Message_.consumed), true);
+				
+				p = cb.and(p, cb.equal(root.get(Message_.consumer), MessageConnector.CONSUME_MQ));
+				logger.info(p.toString());
+				List<Message> messages = em.createQuery(cq.select(root).where(p).orderBy(order)).setMaxResults(50).getResultList();
+				if(messages.size()>0) {
+					   if(Config.mq().getMq().equalsIgnoreCase("kafka")) {
+						       MQClient = KafkaMQ.getInstance();
+					      }else {
+							   MQClient = ActiveMQ.getInstance();
+						 }
+					    if(MQClient != null) {
+					    	for(Message mes : messages) {
+								 boolean res = MQClient.sendMessage(mes);
+								 if (res == false) {
+									  Gson gson = new Gson();
+							          String msg =  gson.toJson(mes);
+									  ExceptionMQMessage e = new ExceptionMQMessage(0, msg);
+									  logger.error(e);
+								 } else {
+									Message messageEntityObject = emc.find(mes.getId(), Message.class);
+									if (null != messageEntityObject) {
+										emc.beginTransaction(Message.class);
+										messageEntityObject.setConsumed(true);
+										emc.commit();
+									}
+								}
+							  }
+					    }
+					 }
+			   }
+		}
+	}
+}

+ 15 - 2
o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/ThisApplication.java

@@ -7,6 +7,8 @@ import com.x.base.core.project.Context;
 import com.x.base.core.project.config.Config;
 import com.x.base.core.project.logger.LoggerFactory;
 import com.x.message.assemble.communicate.schedule.Clean;
+import com.x.message.assemble.communicate.schedule.TriggerMq;
+import com.x.message.core.entity.Message;
 
 public class ThisApplication {
 
@@ -27,6 +29,8 @@ public class ThisApplication {
 	public static WeLinkConsumeQueue weLinkConsumeQueue = new WeLinkConsumeQueue();
 
 	public static PmsInnerConsumeQueue pmsInnerConsumeQueue = new PmsInnerConsumeQueue();
+	
+	public static MQConsumeQueue mqConsumeQueue = new MQConsumeQueue();
 
 	public static Context context() {
 		return context;
@@ -64,9 +68,18 @@ public class ThisApplication {
 			if (Config.weLink().getEnable() && Config.weLink().getMessageEnable()) {
 				weLinkConsumeQueue.start();
 			}
-
+			
+			if (Config.mq().getEnable()) {
+				mqConsumeQueue.start();
+			}
+			
 			MessageConnector.start(context());
-
+          
+			
+			if (BooleanUtils.isTrue(Config.communicate().cronMq().getEnable())) {
+				  context().schedule(TriggerMq.class,Config.communicate().cronMq().getCron());
+			}
+			
 		} catch (Exception e) {
 			e.printStackTrace();
 		}

+ 20 - 1
o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/jaxrs/connector/ActionCreate.java

@@ -10,7 +10,6 @@ import com.x.base.core.container.EntityManagerContainer;
 import com.x.base.core.container.factory.EntityManagerContainerFactory;
 import com.x.base.core.entity.annotation.CheckPersistType;
 import com.x.base.core.project.config.Config;
-import com.x.base.core.project.gson.XGsonBuilder;
 import com.x.base.core.project.http.ActionResult;
 import com.x.base.core.project.http.EffectivePerson;
 import com.x.base.core.project.jaxrs.WrapBoolean;
@@ -112,6 +111,9 @@ class ActionCreate extends BaseAction {
 					case MessageConnector.CONSUME_WELINK:
 						message = this.weLinkMessage(effectivePerson, business, cpwi, instant);
 						break;
+					case MessageConnector.CONSUME_MQ:
+						message = this.MQMessage(effectivePerson, business, cpwi, instant);
+						break;
 					default:
 						message = this.defaultMessage(effectivePerson, business, cpwi, consumer, instant);
 						break;
@@ -173,6 +175,11 @@ class ActionCreate extends BaseAction {
 					ThisApplication.pmsInnerConsumeQueue.send(message);
 				}
 				break;
+			case MessageConnector.CONSUME_MQ:
+				if (Config.mq().getEnable()) {
+					ThisApplication.mqConsumeQueue.send(message);
+				}
+				break;
 			default:
 				break;
 			}
@@ -290,6 +297,18 @@ class ActionCreate extends BaseAction {
 		return message;
 	}
 
+	private Message MQMessage(EffectivePerson effectivePerson, Business business, Wi wi, Instant instant) {
+		Message message = new Message();
+		message.setBody(Objects.toString(wi.getBody()));
+		message.setType(wi.getType());
+		message.setPerson(wi.getPerson());
+		message.setTitle(wi.getTitle());
+		message.setConsumer(MessageConnector.CONSUME_MQ);
+		message.setConsumed(false);
+		message.setInstant(instant.getId());
+		return message;
+	}
+	
 	private Message defaultMessage(EffectivePerson effectivePerson, Business business, Wi wi, String consumer,
 			Instant instant) {
 		Message message = new Message();

+ 95 - 0
o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/ActiveMQ.java

@@ -0,0 +1,95 @@
+package com.x.message.assemble.communicate.mq;
+
+import java.util.Date;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import com.google.gson.Gson;
+import com.x.base.core.project.config.Config;
+import com.x.base.core.project.config.MQActive;
+import com.x.base.core.project.logger.Logger;
+import com.x.base.core.project.logger.LoggerFactory;
+import com.x.message.core.entity.Message;
+
+public class ActiveMQ implements MQInterface {
+	
+	private static Logger logger = LoggerFactory.getLogger(ActiveMQ.class);
+	private Connection connection = null;
+	private MessageProducer producer = null;
+	private Session session = null;
+			
+	private ActiveMQ() {
+		try {
+			    MQActive configMQ = Config.mq().getActiveMQ();
+			    logger.info("MqActive initialize.....");
+			    
+			    String url=configMQ.getUrl();
+			    String queueName=configMQ.getQueueName();
+	
+				ConnectionFactory factory=new ActiveMQConnectionFactory(url);
+			    this.connection= factory.createConnection();
+				this.connection.start();
+				this.session= this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+				Destination destination=session.createQueue(queueName);
+				this.producer = session.createProducer(destination);
+		} catch (Exception e) {
+			e.printStackTrace();
+			logger.error(e);
+		}
+		
+	}
+	
+	private static class MQHolder{
+	      private static ActiveMQ instance = new ActiveMQ();
+    }
+	
+	 
+	 public  static  ActiveMQ  getInstance(){
+		   return MQHolder.instance;
+	 }
+
+	 
+	 public static void main(String[] args) {
+		   ActiveMQ MQClient = getInstance();
+		   //System.out.println(MQClient.getTopic());
+		   Message msg = new Message();
+		   msg.setBody("body");
+		   msg.setConsumed(false);
+		   msg.setCreateTime(new Date());
+		   msg.setPerson("person");
+	 }
+
+	@Override
+	public boolean sendMessage(Message message) {
+		  try {
+			   Gson gson = new Gson();
+	           String msg =  gson.toJson(message);
+               TextMessage textMessage= this.session.createTextMessage(msg);
+			    this.producer.send(textMessage);
+	        } catch (Exception e) {
+	             e.printStackTrace();
+	             logger.error(e);
+	             return false;
+	        } finally {
+	            
+	        }
+		return true;
+	}
+
+	public void destroy() {
+		 System.out.println("MqActive destroy.....");
+		  try {
+			this.connection.close();
+		} catch (JMSException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			 logger.error(e);
+		}
+	   }
+}

+ 105 - 0
o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/KafkaMQ.java

@@ -0,0 +1,105 @@
+package com.x.message.assemble.communicate.mq;
+
+import java.util.Date;
+import java.util.Properties;
+
+import com.google.gson.Gson;
+import com.x.base.core.project.config.Config;
+import com.x.base.core.project.config.MQ;
+import com.x.base.core.project.config.MQKafka;
+import com.x.base.core.project.logger.Logger;
+import com.x.base.core.project.logger.LoggerFactory;
+import com.x.message.core.entity.Message;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+public class KafkaMQ  implements MQInterface {
+	
+	private static Logger logger = LoggerFactory.getLogger(KafkaMQ.class);
+	
+	private  Producer<String, String> producer = null;
+	private  String topic = "";
+	
+	public Producer<String, String> getProducer() {
+		return producer;
+	}
+
+	public void setProducer(Producer<String, String> producer) {
+		this.producer = producer;
+	}
+
+	public String getTopic() {
+		return topic;
+	}
+
+	public void setTopic(String topic) {
+		this.topic = topic;
+	}
+
+	private KafkaMQ() {
+		try {
+			    MQKafka configMQ = Config.mq().getKafka();
+			    logger.info("MQ initialize.....");
+			    Properties properties = new Properties();
+		        properties.put("bootstrap.servers",  configMQ.getBootstrap_servers());
+		        properties.put("acks", configMQ.getAcks());
+				properties.put("retries", configMQ.getRetries());
+		        properties.put("batch.size", configMQ.getBatch_size());
+		        properties.put("linger.ms", configMQ.getLinger_ms());
+		        properties.put("buffer.memory", configMQ.getBuffer_memory());
+		        
+		        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		        
+		        this.producer = new KafkaProducer<String, String>(properties);
+		        
+		        this.topic = configMQ.getTopic();
+		        
+		} catch (Exception e) {
+			e.printStackTrace();
+			logger.error(e);
+		}
+		
+	}
+	
+	private static class MQHolder{
+	      private static KafkaMQ instance = new KafkaMQ();
+    }
+	
+	 
+	 public  static  KafkaMQ  getInstance(){
+		   return MQHolder.instance;
+	 }
+
+	 public static void main(String[] args) {
+		   KafkaMQ MQClient = getInstance();
+		   Message msg = new Message();
+		   msg.setBody("body");
+		   msg.setConsumed(false);
+		   msg.setCreateTime(new Date());
+		   msg.setPerson("person");
+		   System.out.println(MQClient.sendMessage(msg));
+	 }
+
+	@Override
+	public boolean sendMessage(Message message) {
+		  try {
+			   Gson gson = new Gson();
+	           String msg =  gson.toJson(message);
+	           this.producer.send(new ProducerRecord<String, String>(this.getTopic(), msg));
+	        } catch (Exception e) {
+	            e.printStackTrace();
+	            logger.error(e);
+	            return false;
+	        } finally {
+	           // this.producer.close();
+	        }
+		return true;
+	}
+
+	public void destroy() {
+		   this.producer.close();
+	    }
+}

+ 7 - 0
o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/MQInterface.java

@@ -0,0 +1,7 @@
+package com.x.message.assemble.communicate.mq;
+
+import com.x.message.core.entity.Message;
+
+public interface MQInterface {
+	public boolean sendMessage(Message message);
+}

+ 58 - 0
o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/schedule/TriggerMq.java

@@ -0,0 +1,58 @@
+package com.x.message.assemble.communicate.schedule;
+
+import java.util.Date;
+import java.util.List;
+import javax.persistence.EntityManager;
+import javax.persistence.criteria.CriteriaBuilder;
+import javax.persistence.criteria.CriteriaQuery;
+import javax.persistence.criteria.Predicate;
+import javax.persistence.criteria.Root;
+
+import org.apache.commons.lang3.time.DateUtils;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+import com.x.base.core.container.EntityManagerContainer;
+import com.x.base.core.container.factory.EntityManagerContainerFactory;
+import com.x.base.core.project.config.Config;
+import com.x.base.core.project.logger.Logger;
+import com.x.base.core.project.logger.LoggerFactory;
+import com.x.base.core.project.message.MessageConnector;
+import com.x.base.core.project.schedule.AbstractJob;
+import com.x.base.core.project.tools.ListTools;
+import com.x.base.core.project.utils.time.TimeStamp;
+import com.x.message.assemble.communicate.Business;
+import com.x.message.assemble.communicate.ThisApplication;
+import com.x.message.core.entity.Instant;
+import com.x.message.core.entity.Instant_;
+import com.x.message.core.entity.Message;
+import com.x.message.core.entity.Message_;
+
+
+public class TriggerMq extends AbstractJob {
+
+	private static Logger logger = LoggerFactory.getLogger(Clean.class);
+
+	@Override
+	public void schedule(JobExecutionContext jobExecutionContext) throws Exception {
+		try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
+			
+		    if (Config.mq().getEnable()) {
+				 Message message = new Message();
+				 message.setBody("");
+				 message.setType("TriggerMq");
+				 message.setPerson("");
+				 message.setTitle("TriggerMq");
+				 message.setConsumer(MessageConnector.CONSUME_MQ);
+				 message.setConsumed(false);
+				 message.setInstant("");
+				 ThisApplication.mqConsumeQueue.send(message);
+			}
+			
+		} catch (Exception e) {
+			logger.error(e);
+			throw new JobExecutionException(e);
+		}
+	}
+
+}