Procházet zdrojové kódy

Merge branch 'fix/优化流程数据映射的执行过程' into 'wrdp'

[流程平台]优化流程数据映射的执行过程

See merge request o2oa/o2oa!2580
胡起 před 5 roky
rodič
revize
d787ecfd38

+ 119 - 62
o2server/x_processplatform_assemble_designer/src/main/java/com/x/processplatform/assemble/designer/ProjectionExecuteQueue.java

@@ -1,7 +1,11 @@
 package com.x.processplatform.assemble.designer;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
+import com.x.base.core.project.tools.ListTools;
+import org.apache.commons.collections4.list.TreeList;
 import org.apache.commons.lang3.BooleanUtils;
 
 import com.google.gson.JsonElement;
@@ -37,96 +41,150 @@ public class ProjectionExecuteQueue extends AbstractQueue<String> {
 
 	@Override
 	protected void execute(String id) throws Exception {
+		logger.print("开始执行流程数据映射process:{}", id);
+		Process process = null;
 		try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
-			Business business = new Business(emc);
-			Process process = emc.find(id, Process.class);
+			process = emc.find(id, Process.class);
 			if (null == process) {
 				throw new ExceptionEntityNotExist(id, Process.class);
 			}
-			if (XGsonBuilder.isJsonArray(process.getProjection())) {
-				List<Projection> projections = XGsonBuilder.instance().fromJson(process.getProjection(),
-						new TypeToken<List<Projection>>() {
-						}.getType());
-				this.work(business, process, projections);
-				this.workCompleted(business, process, projections);
+		} catch (Exception e) {
+			logger.error(e);
+		}
+		try {
+			if(process!=null) {
+				if (XGsonBuilder.isJsonArray(process.getProjection())) {
+					List<Projection> projections = XGsonBuilder.instance().fromJson(process.getProjection(),
+							new TypeToken<List<Projection>>() {
+							}.getType());
+					logger.print("开始执行流转中工作数据映射process:{}", id);
+					this.work(process, projections);
+					logger.print("开始执行已完成工作数据映射process:{}", id);
+					this.workCompleted(process, projections);
+				}
 			}
 		} catch (Exception e) {
 			logger.error(e);
 		}
+
+		logger.print("完成流程数据映射process:{}", id);
 	}
 
-	private void work(Business business, Process process, List<Projection> projections) throws Exception {
-		String sequence = "";
-		List<Work> os;
-		do {
-			os = business.entityManagerContainer().listEqualAndSequenceAfter(Work.class, Work.process_FIELDNAME,
-					process.getId(), 100, sequence);
-			if (!os.isEmpty()) {
-				sequence = workProjection(business, projections, sequence, os);
-			}
-		} while (!os.isEmpty());
+	private void work(Process process, final List<Projection> projections) throws Exception {
+		List<String> jobList;
+		try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
+			Business business = new Business(emc);
+			jobList = business.work().listJobWithProcess(process.getId());
+		}
+		if(ListTools.isNotEmpty(jobList)){
+			logger.print("流转中工作需要执行数据映射个数:{}",jobList.size());
+			for (List<String> partJobs : ListTools.batch(jobList, 10)){
+				List<CompletableFuture<Void>> futures = new TreeList<>();
+				for (String job : partJobs){
+					CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+						try {
+							this.workProjection(job, process.getId(), projections);
+						} catch (Exception e) {
+							logger.warn("流程{}的工作job={}数据映射异常:{}",process.getId(),job,e.getMessage());
+							logger.error(e);
+						}
+					});
+					futures.add(future);
+				}
+				for (CompletableFuture<Void> future : futures) {
+					try {
+						future.get(300, TimeUnit.SECONDS);
+					} catch (Exception e) {
+						logger.warn("允许流程数据映射任务异常:{}",e.getMessage());
+					}
+				}
+				futures.clear();
+			}
+		}
 	}
 
-	private String workProjection(Business business, List<Projection> projections, String sequence, List<Work> os)
-			throws Exception {
-		business.entityManagerContainer().beginTransaction(Work.class);
-		business.entityManagerContainer().beginTransaction(Task.class);
-		business.entityManagerContainer().beginTransaction(TaskCompleted.class);
-		business.entityManagerContainer().beginTransaction(Read.class);
-		business.entityManagerContainer().beginTransaction(ReadCompleted.class);
-		business.entityManagerContainer().beginTransaction(Review.class);
-		for (Work o : os) {
-			sequence = o.getSequence();
-			Data data = this.data(business, o);
-			ProjectionFactory.projectionWork(projections, data, o);
+	private void workProjection(String job, String process, List<Projection> projections) throws Exception{
+		try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
+			Business business = new Business(emc);
+			Data data = this.data(business, job);
+			emc.beginTransaction(Work.class);
+			emc.beginTransaction(Task.class);
+			emc.beginTransaction(TaskCompleted.class);
+			emc.beginTransaction(Read.class);
+			emc.beginTransaction(ReadCompleted.class);
+			emc.beginTransaction(Review.class);
+			for (Work work : business.entityManagerContainer().listEqualAndEqual(Work.class, Work.job_FIELDNAME,
+					job, Work.process_FIELDNAME, process)) {
+				ProjectionFactory.projectionWork(projections, data, work);
+			}
 			for (Task task : business.entityManagerContainer().listEqualAndEqual(Task.class, Task.job_FIELDNAME,
-					o.getJob(), Task.process_FIELDNAME, o.getProcess())) {
+					job, Task.process_FIELDNAME, process)) {
 				ProjectionFactory.projectionTask(projections, data, task);
 			}
 			for (TaskCompleted taskCompleted : business.entityManagerContainer().listEqualAndEqual(TaskCompleted.class,
-					TaskCompleted.job_FIELDNAME, o.getJob(), TaskCompleted.process_FIELDNAME, o.getProcess())) {
+					TaskCompleted.job_FIELDNAME, job, TaskCompleted.process_FIELDNAME, process)) {
 				ProjectionFactory.projectionTaskCompleted(projections, data, taskCompleted);
 			}
 			for (Read read : business.entityManagerContainer().listEqualAndEqual(Read.class, Read.job_FIELDNAME,
-					o.getJob(), Read.process_FIELDNAME, o.getProcess())) {
+					job, Read.process_FIELDNAME, process)) {
 				ProjectionFactory.projectionRead(projections, data, read);
 			}
 			for (ReadCompleted readCompleted : business.entityManagerContainer().listEqualAndEqual(ReadCompleted.class,
-					ReadCompleted.job_FIELDNAME, o.getJob(), ReadCompleted.process_FIELDNAME, o.getProcess())) {
+					ReadCompleted.job_FIELDNAME, job, ReadCompleted.process_FIELDNAME, process)) {
 				ProjectionFactory.projectionReadCompleted(projections, data, readCompleted);
 			}
 			for (Review review : business.entityManagerContainer().listEqualAndEqual(Review.class, Review.job_FIELDNAME,
-					o.getJob(), Review.process_FIELDNAME, o.getProcess())) {
+					job, Review.process_FIELDNAME, process)) {
 				ProjectionFactory.projectionReview(projections, data, review);
 			}
+			emc.commit();
 		}
-		business.entityManagerContainer().commit();
-		return sequence;
 	}
 
-	private void workCompleted(Business business, Process process, List<Projection> projections) throws Exception {
-		String sequence = "";
-		List<WorkCompleted> os;
-		do {
-			os = business.entityManagerContainer().listEqualAndSequenceAfter(WorkCompleted.class,
-					WorkCompleted.process_FIELDNAME, process.getId(), 100, sequence);
-			if (!os.isEmpty()) {
-				sequence = workCompletedProjection(business, projections, sequence, os);
-			}
-		} while (!os.isEmpty());
+	private void workCompleted(Process process, final List<Projection> projections) throws Exception {
+		List<String> workComList;
+		try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
+			Business business = new Business(emc);
+			workComList = business.workCompleted().listWithProcess(process.getId());
+		}
+		if(ListTools.isNotEmpty(workComList)){
+			logger.print("已完成工作需要执行数据映射个数:{}",workComList.size());
+			for (List<String> partWorkComList : ListTools.batch(workComList, 10)){
+				List<CompletableFuture<Void>> futures = new TreeList<>();
+				for (String workCompletedId : partWorkComList){
+					CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+						try {
+							this.workCompletedProjection(workCompletedId, projections);
+						} catch (Exception e) {
+							logger.warn("流程{}的工作workCompletedId={}数据映射异常:{}",process.getId(),workCompletedId,e.getMessage());
+							logger.error(e);
+						}
+					});
+					futures.add(future);
+				}
+				for (CompletableFuture<Void> future : futures) {
+					try {
+						future.get(300, TimeUnit.SECONDS);
+					} catch (Exception e) {
+						logger.warn("允许流程数据映射任务异常:{}",e.getMessage());
+					}
+				}
+				futures.clear();
+			}
+		}
 	}
 
-	private String workCompletedProjection(Business business, List<Projection> projections, String sequence,
-			List<WorkCompleted> os) throws Exception {
-		business.entityManagerContainer().beginTransaction(WorkCompleted.class);
-		business.entityManagerContainer().beginTransaction(Task.class);
-		business.entityManagerContainer().beginTransaction(TaskCompleted.class);
-		business.entityManagerContainer().beginTransaction(Read.class);
-		business.entityManagerContainer().beginTransaction(ReadCompleted.class);
-		business.entityManagerContainer().beginTransaction(Review.class);
-		for (WorkCompleted o : os) {
-			sequence = o.getSequence();
+	private void workCompletedProjection(String workCompletedId, List<Projection> projections) throws Exception{
+		try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
+			Business business = new Business(emc);
+			WorkCompleted o = emc.find(workCompletedId, WorkCompleted.class);
 			Data data = this.data(business, o);
+			emc.beginTransaction(WorkCompleted.class);
+			emc.beginTransaction(Task.class);
+			emc.beginTransaction(TaskCompleted.class);
+			emc.beginTransaction(Read.class);
+			emc.beginTransaction(ReadCompleted.class);
+			emc.beginTransaction(Review.class);
 			ProjectionFactory.projectionWorkCompleted(projections, data, o);
 			for (Task task : business.entityManagerContainer().listEqualAndEqual(Task.class, Task.job_FIELDNAME,
 					o.getJob(), Task.process_FIELDNAME, o.getProcess())) {
@@ -148,14 +206,13 @@ public class ProjectionExecuteQueue extends AbstractQueue<String> {
 					o.getJob(), Review.process_FIELDNAME, o.getProcess())) {
 				ProjectionFactory.projectionReview(projections, data, review);
 			}
+			emc.commit();
 		}
-		business.entityManagerContainer().commit();
-		return sequence;
 	}
 
-	private Data data(Business business, Work work) throws Exception {
+	private Data data(Business business, String job) throws Exception {
 		List<Item> items = business.entityManagerContainer().listEqualAndEqual(Item.class, DataItem.bundle_FIELDNAME,
-				work.getJob(), DataItem.itemCategory_FIELDNAME, ItemCategory.pp);
+				job, DataItem.itemCategory_FIELDNAME, ItemCategory.pp);
 		if (items.isEmpty()) {
 			return new Data();
 		} else {
@@ -187,4 +244,4 @@ public class ProjectionExecuteQueue extends AbstractQueue<String> {
 			}
 		}
 	}
-}
+}