Просмотр исходного кода

增加系统日志获取接口,支持集群日志合并展现

o2sword 5 лет назад
Родитель
Сommit
0f20114680

+ 18 - 13
o2server/x_console/src/main/java/com/x/server/console/NodeAgent.java

@@ -4,7 +4,6 @@ import java.io.*;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -12,7 +11,6 @@ import com.x.base.core.project.tools.*;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.BooleanUtils;
@@ -48,7 +46,7 @@ public class NodeAgent extends Thread {
 
 	public static final Pattern read_log_pattern = Pattern.compile("^readLog:(.+)$", Pattern.CASE_INSENSITIVE);
 
-	public static final int LOG_MAX_READ_SIZE = 10 * 1024;
+	public static final int LOG_MAX_READ_SIZE = 6 * 1024;
 
 	@Override
 	public void run() {
@@ -59,7 +57,7 @@ public class NodeAgent extends Thread {
 					try (DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
 						 DataInputStream dis = new DataInputStream(socket.getInputStream())) {
 						String json = dis.readUTF();
-						logger.print("receive socket json={}",json);
+						logger.info("receive socket json={}",json);
 						CommandObject commandObject = XGsonBuilder.instance().fromJson(json, CommandObject.class);
 						if (BooleanUtils.isTrue(Config.currentNode().nodeAgentEncrypt())) {
 							String decrypt = Crypto.rsaDecrypt(commandObject.getCredential(), Config.privateKey());
@@ -95,7 +93,7 @@ public class NodeAgent extends Thread {
 								}
 								bytes = bos.toByteArray();
 							}
-							logger.print("receive resource bytes {}", bytes.length);
+							logger.info("receive resource bytes {}", bytes.length);
 							String result = this.uploadResource(commandObject.getParam(), bytes);
 							dos.writeUTF(result);
 							dos.flush();
@@ -130,7 +128,7 @@ public class NodeAgent extends Thread {
 		try {
 			File logFile = new File(Config.base(), "logs/" + DateTools.format(new Date(), "yyyy_MM_dd") + ".out.log");
 			if(logFile.exists()){
-				Map<String, String> map = new HashMap<>();
+				List<Map<String, String>> list = new ArrayList<>();
 				try(RandomAccessFile randomFile = new RandomAccessFile(logFile,"r")) {
 					long curFileSize = randomFile.length();
 					if (lastTimeFileSize <= 0 || lastTimeFileSize > curFileSize) {
@@ -159,29 +157,36 @@ public class NodeAgent extends Thread {
 							}
 						} else {
 							if (StringUtils.isEmpty(curTime)) {
-								time = "2020-01-01 00:00:01.001";
+								continue;
 							} else {
 								time = curTime;
 							}
 						}
-						map.put(time+"#"+Config.node(), lineStr);
+						Map<String, String> map = new HashMap<>();
+						map.put("logTime",time+"#"+Config.node());
+						map.put("lineLog", lineStr);
+						list.add(map);
 						if (curReadSize > LOG_MAX_READ_SIZE){
 							break;
 						}
 					}
-					lastTimeFileSize = curReadSize - 1;
+					if(curReadSize>0) {
+						lastTimeFileSize = lastTimeFileSize + curReadSize - 1;
+					}
 				}
-				dos.writeLong(lastTimeFileSize);
+				dos.writeUTF(XGsonBuilder.toJson(list));
 				dos.flush();
 
-				dos.writeUTF(XGsonBuilder.toJson(map));
+				dos.writeLong(lastTimeFileSize);
 				dos.flush();
+
+				return;
 			}
 		} catch (Exception e) {
 			logger.print("readLog error:{}", e.getMessage());
-			dos.writeUTF("failure");
-			dos.flush();
 		}
+		dos.writeUTF("failure");
+		dos.flush();
 	}
 
 	private String uploadResource(Map<String,Object> param, byte[] bytes){

+ 141 - 0
o2server/x_program_center/src/main/java/com/x/program/center/jaxrs/warnlog/ActionGetSystemLog.java

@@ -0,0 +1,141 @@
+package com.x.program.center.jaxrs.warnlog;
+
+import com.google.gson.reflect.TypeToken;
+import com.x.base.core.project.bean.NameValuePair;
+import com.x.base.core.project.cache.ApplicationCache;
+import com.x.base.core.project.config.Config;
+import com.x.base.core.project.config.Nodes;
+import com.x.base.core.project.connection.ConnectionAction;
+import com.x.base.core.project.gson.XGsonBuilder;
+import com.x.base.core.project.http.ActionResult;
+import com.x.base.core.project.http.EffectivePerson;
+import com.x.base.core.project.http.HttpToken;
+import com.x.base.core.project.jaxrs.WrapStringList;
+import com.x.base.core.project.logger.Logger;
+import com.x.base.core.project.logger.LoggerFactory;
+import com.x.base.core.project.tools.Crypto;
+import com.x.base.core.project.tools.ListTools;
+import net.sf.ehcache.Element;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class ActionGetSystemLog extends BaseAction {
+
+	private static Logger logger = LoggerFactory.getLogger(ActionGetSystemLog.class);
+
+	ActionResult<Wo> execute(EffectivePerson effectivePerson, String tag) throws Exception {
+		ActionResult<Wo> result = new ActionResult<>();
+
+		Wo wo = new Wo();
+		String key = effectivePerson.getDistinguishedName();
+		if(key.indexOf("@") > -1){
+			key = key.split("@")[1] + tag;
+		}
+
+		if(Config.node().equals(Config.resource_node_centersPirmaryNode())){
+			wo.setValueList(getSystemLog(key));
+		}else{
+			List<NameValuePair> headers = ListTools.toList(new NameValuePair(HttpToken.X_Token, effectivePerson.getToken()));
+			wo = ConnectionAction.get(Config.url_x_program_center_jaxrs("warnlog", "view", "system", "log", "tag", tag), headers).getData(Wo.class);
+		}
+
+		result.setData(wo);
+		return result;
+	}
+
+	synchronized private List<String> getSystemLog(String key) throws Exception{
+		Nodes nodes = Config.nodes();
+		List<SystemLog> allLogs = new ArrayList<>();
+		for (String node : nodes.keySet()){
+			if(nodes.get(node).getApplication().getEnable() || nodes.get(node).getCenter().getEnable()){
+				try (Socket socket = new Socket(node, nodes.get(node).nodeAgentPort())) {
+					socket.setKeepAlive(true);
+					socket.setSoTimeout(5000);
+					try (DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
+						 DataInputStream dis = new DataInputStream(socket.getInputStream())){
+						Map<String, Object> commandObject = new HashMap<>();
+						commandObject.put("command", "readLog:readLog");
+						commandObject.put("credential", Crypto.rsaEncrypt("o2@", Config.publicKey()));
+
+						dos.writeUTF(XGsonBuilder.toJson(commandObject));
+						dos.flush();
+
+						long lastPoint = 0;
+						String cacheKey = ApplicationCache.concreteCacheKey(key, node.toLowerCase());
+						Element element = cacheLog.get(cacheKey);
+						CacheLogObject clo = null;
+						if ((null != element) && (null != element.getObjectValue())) {
+							clo = (CacheLogObject) element.getObjectValue();
+							lastPoint = clo.getLastPoint();
+						}
+						dos.writeLong(lastPoint);
+						dos.flush();
+
+						logger.info("socket dispatch getSystemLog to {}:{} lastPoint={}", node, nodes.get(node).nodeAgentPort(), lastPoint);
+
+						String result = dis.readUTF();
+						if(StringUtils.isNotEmpty(result) && result.startsWith("[")){
+							List<SystemLog> list = gson.fromJson(result, new TypeToken<List<SystemLog>>(){}.getType());
+							allLogs.addAll(list);
+							long returnLastPoint = dis.readLong();
+							logger.info("用户的cacheKey={},最后日志标志:{}", cacheKey, returnLastPoint);
+							if(clo==null){
+								clo = new CacheLogObject();
+								clo.setUserToken(key);
+								clo.setNode(node);
+								clo.setLastPoint(returnLastPoint);
+							}else{
+								clo.setLastPoint(returnLastPoint);
+							}
+							cacheLog.put(new Element(cacheKey, clo));
+						}
+					}
+
+				} catch (Exception ex) {
+					logger.warn("socket dispatch getSystemLog to {}:{} error={}", node, nodes.get(node).nodeAgentPort(), ex.getMessage());
+				}
+			}
+		}
+		List<String> list = new ArrayList<>();
+		allLogs.stream().sorted((o1, o2) -> {
+			return o1.logTime.compareTo(o2.logTime);
+		}).forEach(o -> {
+			list.add(o.getLineLog());
+		});
+		return list;
+	}
+
+	public static class Wo extends WrapStringList {
+
+	}
+
+	public static class SystemLog {
+		private String logTime;
+
+		private String lineLog;
+
+		public String getLogTime() {
+			return logTime;
+		}
+
+		public void setLogTime(String logTime) {
+			this.logTime = logTime;
+		}
+
+		public String getLineLog() {
+			return lineLog;
+		}
+
+		public void setLineLog(String lineLog) {
+			this.lineLog = lineLog;
+		}
+	}
+
+}

+ 39 - 0
o2server/x_program_center/src/main/java/com/x/program/center/jaxrs/warnlog/BaseAction.java

@@ -1,6 +1,45 @@
 package com.x.program.center.jaxrs.warnlog;
 
+import com.x.base.core.project.cache.ApplicationCache;
+import com.x.base.core.project.gson.GsonPropertyObject;
 import com.x.base.core.project.jaxrs.StandardJaxrsAction;
+import net.sf.ehcache.Ehcache;
 
 abstract class BaseAction extends StandardJaxrsAction {
+
+    public static Ehcache cacheLog = ApplicationCache.instance().getCache(CacheLogObject.class);
+
+    public static class CacheLogObject extends GsonPropertyObject {
+
+        private String userToken;
+
+        private String node;
+
+        private long lastPoint;
+
+        public long getLastPoint() {
+            return lastPoint;
+        }
+
+        public void setLastPoint(long lastPoint) {
+            this.lastPoint = lastPoint;
+        }
+
+        public String getNode() {
+            return node;
+        }
+
+        public void setNode(String node) {
+            this.node = node;
+        }
+
+        public String getUserToken() {
+            return userToken;
+        }
+
+        public void setUserToken(String userToken) {
+            this.userToken = userToken;
+        }
+    }
+
 }

+ 18 - 0
o2server/x_program_center/src/main/java/com/x/program/center/jaxrs/warnlog/WarnLogAction.java

@@ -140,4 +140,22 @@ public class WarnLogAction extends StandardJaxrsAction {
 		asyncResponse.resume(ResponseFactory.getEntityTagActionResultResponse(request, result));
 	}
 
+	@JaxrsMethodDescribe(value = "获取系统日志.", action = ActionGetSystemLog.class)
+	@GET
+	@Path("view/system/log/tag/{tag}")
+	@Produces(HttpMediaType.APPLICATION_JSON_UTF_8)
+	@Consumes(MediaType.APPLICATION_JSON)
+	public void getSystemLog(@Suspended final AsyncResponse asyncResponse, @Context HttpServletRequest request,
+							 @JaxrsParameterDescribe("日志标识") @PathParam("tag") String tag) {
+		EffectivePerson effectivePerson = this.effectivePerson(request);
+		ActionResult<ActionGetSystemLog.Wo> result = new ActionResult<>();
+		try {
+			result = new ActionGetSystemLog().execute(effectivePerson, tag);
+		} catch (Exception e) {
+			e.printStackTrace();
+			result.error(e);
+		}
+		asyncResponse.resume(ResponseFactory.getEntityTagActionResultResponse(request, result));
+	}
+
 }