FileUploadDownloadService.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. const fs = require('fs');
  2. const path = require('path');
  3. const app = require('electron').app || require('electron').remote.app;
  4. const { UploadStatus, DownloadStatus } = require('../Constants');
  5. const OSS = require('ali-oss');
  6. const readChunk = require('read-chunk');
  7. const fileType = require('file-type');
  8. const ImageUtils = require('./ImageUtils').default;
  9. const { groupBy } = require('lodash');
  10. const archiver = require('archiver');
  11. const uuidv1 = require('uuid/v1');
  12. const eventBus = require('./EventBus').default;
  13. const queue = require('block-queue');
  14. const Sequelize = require('sequelize');
  15. const Op = Sequelize.Op;
  16. const { UploadJob } = require('./UploadJob');
  17. const { DownloadJob } = require('./DownloadJob');
  18. const { Order, SubOrder, File, Download } = require('./model');
  19. const rimraf = require('rimraf');
  20. const fsUtils = require('nodejs-fs-utils');
  21. if (!fs.existsSync(path.resolve(app.getPath('userData'), 'db'))) {
  22. fs.mkdirSync(path.resolve(app.getPath('userData'), 'db'));
  23. }
  24. let dbDir = path.resolve(app.getPath('userData'), 'db');
  25. let archiveDir = path.resolve(app.getPath('userData'), 'archive');
  26. let thumbDir = path.resolve(app.getPath('userData'), 'thumb');
  27. console.log(dbDir);
  28. if (!fs.existsSync(archiveDir)) {
  29. fs.mkdirSync(archiveDir);
  30. }
  31. const sequelize = new Sequelize({
  32. dialect: 'sqlite',
  33. storage: path.resolve(app.getPath('userData'), 'db', 'application.db'),
  34. logging: false,
  35. });
  36. const Model = Sequelize.Model;
  37. Order.init(sequelize);
  38. SubOrder.init(sequelize);
  39. File.init(sequelize);
  40. Download.init(sequelize);
  41. let db = {
  42. Order,
  43. SubOrder,
  44. File,
  45. Download,
  46. };
  47. window.db = db;
  48. sequelize.sync().then(() => {
  49. checkFalse();
  50. init();
  51. });
  52. let uploadQueue = queue(3, function(job, done) {
  53. job.start(done);
  54. });
  55. //下载队列
  56. let downloadQueue = queue(3, function(job, done) {
  57. job.start(done);
  58. });
  59. //检查错误下载或上传
  60. async function checkFalse() {
  61. //下载,下载队列为空,把所有状态为downloading的状态改为ERROR
  62. let downloadList = (await db.Download.findAll({
  63. where: { status: DownloadStatus.DOWNLOADING, status: DownloadStatus.WAITING },
  64. })).map(i => i.get());
  65. downloadList.forEach(async i => {
  66. await db.Download.update(
  67. {
  68. status: DownloadStatus.PAUSE,
  69. },
  70. {
  71. where: {
  72. id: i.id,
  73. },
  74. },
  75. );
  76. });
  77. //上传
  78. let fileList = (await db.File.findAll({ where: { status: UploadStatus.UPLOADING } })).map(i =>
  79. i.get(),
  80. );
  81. fileList.forEach(async i => {
  82. await db.File.update(
  83. {
  84. status: UploadStatus.PAUSE,
  85. },
  86. {
  87. where: {
  88. id: i.id,
  89. },
  90. },
  91. );
  92. });
  93. }
  94. function sleep(ms) {
  95. return new Promise(resolve => {
  96. setTimeout(resolve, ms);
  97. });
  98. }
  99. let initiated = false;
  100. const init = async () => {
  101. if (initiated) {
  102. return;
  103. }
  104. resume();
  105. initiated = true;
  106. };
  107. const PromiseQueue = arr => {
  108. var sequence = Promise.resolve();
  109. arr.forEach(function(item) {
  110. sequence = sequence.then(item).then(() => {
  111. return res;
  112. });
  113. });
  114. return sequence;
  115. };
  116. const pause = async arg => {
  117. if (arg && arg.orderId) {
  118. await db.Order.update(
  119. {
  120. pause: 1,
  121. },
  122. {
  123. where: {
  124. id: arg.orderId,
  125. },
  126. },
  127. );
  128. await db.SubOrder.update(
  129. { pause: 1 },
  130. {
  131. where: {
  132. orderId: arg.orderId,
  133. },
  134. },
  135. );
  136. }
  137. if (arg && arg.subOrderId) {
  138. await db.SubOrder.update(
  139. { pause: 1 },
  140. {
  141. where: {
  142. id: arg.subOrderId,
  143. },
  144. },
  145. );
  146. }
  147. eventBus.publish('pause', arg);
  148. };
  149. const resume = async arg => {
  150. let records = [];
  151. if (arg && arg.orderId) {
  152. records = (await db.File.findAll({
  153. where: {
  154. orderId: arg.orderId,
  155. status: {
  156. [Op.not]: UploadStatus.FINISHED,
  157. },
  158. },
  159. })).map(i => i.get());
  160. } else if (arg && arg.subOrderId) {
  161. records = (await db.File.findAll({
  162. where: {
  163. subOrderId: arg.subOrderId,
  164. status: {
  165. [Op.not]: UploadStatus.FINISHED,
  166. },
  167. },
  168. })).map(i => i.get());
  169. } else if (arg && arg.fileId) {
  170. records = (await db.File.findAll({
  171. where: {
  172. id: arg.fileId,
  173. status: {
  174. [Op.not]: UploadStatus.FINISHED,
  175. },
  176. },
  177. })).map(i => i.get());
  178. } else {
  179. records = (await db.File.findAll({
  180. where: {
  181. status: {
  182. [Op.not]: UploadStatus.FINISHED,
  183. },
  184. },
  185. })).map(i => i.get());
  186. }
  187. let orderIds = [];
  188. let subOrderIds = [];
  189. records.forEach(i => {
  190. if (orderIds.indexOf(i.orderId) === -1) {
  191. orderIds.push(i.orderId);
  192. }
  193. if (subOrderIds.indexOf(i.subOrderId) === -1) {
  194. subOrderIds.push(i.subOrderId);
  195. }
  196. });
  197. await db.Order.update(
  198. {
  199. pause: 0,
  200. },
  201. {
  202. where: {
  203. id: orderIds,
  204. },
  205. },
  206. );
  207. await db.SubOrder.update(
  208. { pause: 0 },
  209. {
  210. where: {
  211. id: subOrderIds,
  212. },
  213. },
  214. );
  215. records.forEach(i => {
  216. let job = new UploadJob(i, db);
  217. uploadQueue.push(job);
  218. });
  219. };
  220. const queryUnfinishedOrder = async userId => {
  221. // let sql = `
  222. // SELECT
  223. // o.*,
  224. // fileNum,
  225. // totalSize,
  226. // uploadedSize
  227. // FROM
  228. // Orders o
  229. // JOIN ( SELECT SUM( size ) AS totalSize, SUM( progress * size ) AS uploadedSize, COUNT( * ) AS fileNum, orderId FROM Files GROUP BY orderId ) t ON o.id = t.orderId
  230. // WHERE
  231. // o.id IN (
  232. // SELECT
  233. // orderId
  234. // FROM
  235. // Files
  236. // WHERE
  237. // status != ${UploadStatus.FINISHED}
  238. // GROUP BY
  239. // orderId)
  240. // and o.userId = ${userId}`;
  241. let sql = `
  242. SELECT
  243. Orders.* ,
  244. ( SELECT SUM( size ) FROM SubOrders WHERE SubOrders.orderId = Orders.id ) AS totalSize,
  245. ( SELECT SUM( f0.progress * f0.size ) FROM Files f0 WHERE f0.orderId = Orders.id ) AS uploadedSize,
  246. ( SELECT COUNT( f1.id ) FROM Files f1 WHERE f1.orderId = Orders.id ) AS fileNum
  247. FROM
  248. Orders
  249. JOIN Files ON Files.orderId = Orders.id
  250. WHERE
  251. Files.status != ${UploadStatus.FINISHED}
  252. AND Orders.userId = ${userId}
  253. GROUP BY
  254. Orders.id
  255. `;
  256. let res = await sequelize.query(sql, { type: sequelize.QueryTypes.SELECT });
  257. return res;
  258. };
  259. const queryFinishedOrder = async userId => {
  260. let res = await sequelize.query(
  261. `SELECT
  262. *,
  263. ( SELECT COUNT( * ) FROM Files WHERE Files.orderId = Orders.id ) AS totalNum,
  264. ( SELECT COUNT( * ) FROM Files WHERE Files.orderId = Orders.id AND status = ${
  265. UploadStatus.FINISHED
  266. } ) AS finishedNum,
  267. ( SELECT SUM( size ) FROM Files WHERE Files.orderId = Orders.id ) AS totalSize
  268. FROM
  269. Orders
  270. WHERE
  271. totalNum = finishedNum AND userId= ${userId}
  272. order by updatedAt DESC`,
  273. { type: sequelize.QueryTypes.SELECT },
  274. );
  275. return res;
  276. };
  277. const queryDetail = async orderId => {
  278. let res = await sequelize.query(
  279. `SELECT
  280. s.id,
  281. s.orderId,
  282. s.parentId,
  283. s.path,
  284. s.name,
  285. s.size AS totalSize,
  286. s.pause,
  287. s.createdAt,
  288. s.updatedAt,
  289. COUNT( * ) AS fileNum,
  290. ( SELECT COUNT( * ) FROM Files WHERE src LIKE '%.zip' AND subOrderId = s.id ) > 0 AS hasZip,
  291. ( SELECT COUNT( * ) FROM Files WHERE status = - 1 AND subOrderId = s.id ) > 0 AS packing,
  292. ( SUM( progress * f.size ) / s.size) AS progress,
  293. (
  294. SELECT
  295. a = b
  296. FROM
  297. (
  298. ( SELECT COUNT( * ) AS a FROM Files ta WHERE ta.subOrderId = f.subOrderId )
  299. JOIN ( SELECT COUNT( * ) AS b FROM Files tb WHERE tb.subOrderId = f.subOrderId AND (tb.status = ${
  300. UploadStatus.PAUSE
  301. } OR tb.status = ${UploadStatus.FINISHED}) )
  302. )
  303. ) AS paused
  304. FROM
  305. Files f
  306. JOIN SubOrders s ON f.subOrderId = s.id
  307. WHERE
  308. f.orderId = ${orderId}
  309. GROUP BY
  310. f.subOrderId`,
  311. { type: sequelize.QueryTypes.SELECT },
  312. );
  313. return res;
  314. };
  315. const transferFilename = filename => {
  316. return filename.replace(/\.[0-9a-z]+$/i, '') + '.jpg';
  317. };
  318. const checkClientOrder = async clientOrderId => {
  319. let res = await sequelize.query(
  320. `SELECT
  321. *
  322. FROM
  323. Orders
  324. WHERE
  325. clientOrderId = ${clientOrderId} `,
  326. { type: sequelize.QueryTypes.SELECT },
  327. );
  328. return res;
  329. };
  330. const uploadOrder = async (topDir, prefixes, userId, clientOrderId) => {
  331. console.log(topDir, JSON.stringify(prefixes), userId, clientOrderId);
  332. if (!prefixes instanceof Object) {
  333. throw 'prefixes not found';
  334. }
  335. let name = path.basename(topDir);
  336. let mainOrder = (await db.Order.create({
  337. path: topDir,
  338. name: name,
  339. userId: userId,
  340. clientOrderId: clientOrderId,
  341. pause: 0,
  342. })).get();
  343. let orderId = mainOrder.id;
  344. let hasFile = false;
  345. let hasSubDir = false;
  346. fs.readdirSync(topDir).forEach(child => {
  347. if (!child.startsWith('.')) {
  348. let info = fs.statSync(path.resolve(topDir, child));
  349. if (info.isDirectory()) {
  350. hasSubDir = true;
  351. } else {
  352. hasFile = true;
  353. }
  354. }
  355. });
  356. if (hasFile && hasSubDir) {
  357. throw '不能同时包含文件夹和文件';
  358. }
  359. if (!hasFile && !hasSubDir) {
  360. throw '空文件夹';
  361. }
  362. if (!hasSubDir) {
  363. if (prefixes[0]) {
  364. throw 'prefixes not found';
  365. }
  366. let prefix = prefixes[path.basename(topDir)];
  367. let subOrder = (await db.SubOrder.create({
  368. orderId,
  369. parentId: mainOrder.id,
  370. path: topDir,
  371. name,
  372. size: fsUtils.fsizeSync(topDir),
  373. pause: 0,
  374. })).get();
  375. let outPath = path.resolve(archiveDir, uuidv1() + '.zip');
  376. let fileinfo = (await db.File.create({
  377. orderId,
  378. subOrderId: subOrder.id,
  379. src: outPath,
  380. // dst: orderId + "/" + path.basename(outPath),
  381. dst: prefix + '/' + path.basename(topDir) + '_original.zip',
  382. size: 1,
  383. uploaded: 0,
  384. progress: 0,
  385. status: UploadStatus.PREPARING,
  386. })).get();
  387. await readDir(topDir, orderId, subOrder.id, null, prefix);
  388. let archiveFile = await archive(topDir, outPath);
  389. let info = fs.statSync(archiveFile);
  390. await sequelize.query(`UPDATE SubOrders
  391. SET size = ( SELECT SUM( size ) FROM Files WHERE subOrderId = ${subOrder.id} )
  392. WHERE id = ${subOrder.id}`);
  393. await db.File.update(
  394. {
  395. status: UploadStatus.WAITING,
  396. size: info.size,
  397. },
  398. { where: { id: fileinfo.id } },
  399. );
  400. uploadQueue.push(new UploadJob(fileinfo, db));
  401. } else {
  402. fs.readdirSync(topDir).forEach(async child => {
  403. if (!child.startsWith('.')) {
  404. let prefix = prefixes[child];
  405. let info = fs.statSync(path.resolve(topDir, child));
  406. if (info.isDirectory()) {
  407. let subOrder = (await db.SubOrder.create({
  408. orderId,
  409. parentId: mainOrder.id,
  410. path: path.resolve(topDir, child),
  411. name: child,
  412. size: fsUtils.fsizeSync(path.resolve(topDir, child)),
  413. pause: 0,
  414. })).get();
  415. let outPath = path.resolve(archiveDir, uuidv1() + '.zip');
  416. let fileinfo = (await db.File.create({
  417. orderId,
  418. subOrderId: subOrder.id,
  419. src: outPath,
  420. // dst: orderId + "/" + path.basename(outPath),
  421. dst: prefix + '/' + child + '_original.zip',
  422. size: 1,
  423. uploaded: 0,
  424. progress: 0,
  425. status: UploadStatus.PREPARING,
  426. })).get();
  427. await readDir(path.resolve(topDir, child), orderId, subOrder.id, child, prefix);
  428. let archiveFile = await archive(path.resolve(topDir, child), outPath);
  429. let info = fs.statSync(archiveFile);
  430. await db.File.update(
  431. {
  432. status: UploadStatus.WAITING,
  433. size: info.size,
  434. },
  435. { where: { id: fileinfo.id } },
  436. );
  437. await sequelize.query(`
  438. UPDATE SubOrders SET size = ( SELECT SUM( size ) FROM Files WHERE subOrderId = ${
  439. subOrder.id
  440. } ) WHERE id = ${subOrder.id}`);
  441. uploadQueue.push(new UploadJob(fileinfo, db));
  442. }
  443. }
  444. });
  445. }
  446. async function readDir(dir, orderId, subOrderId, relative, prefix) {
  447. for await (const child of fs.readdirSync(dir)) {
  448. if (!child.startsWith('.')) {
  449. let currRelative = relative ? relative + '/' + child : child;
  450. let info = fs.statSync(path.resolve(dir, child));
  451. if (info.isDirectory()) {
  452. await readDir(
  453. path.resolve(dir, child),
  454. orderId,
  455. subOrderId,
  456. currRelative,
  457. prefix,
  458. );
  459. } else {
  460. let filepath = path.resolve(dir, child);
  461. const buffer = readChunk.sync(filepath, 0, fileType.minimumBytes);
  462. let type = fileType(buffer);
  463. let src = filepath;
  464. let dst = currRelative;
  465. if (type && type.mime.startsWith('image')) {
  466. src = await ImageUtils.makeThumbnail(filepath);
  467. // dst = orderId + "/" + relative + '/' + transferFilename(path.basename(currRelative))
  468. dst =
  469. prefix +
  470. '/original/min/' +
  471. transferFilename(path.basename(currRelative));
  472. info = fs.statSync(src);
  473. console.log('thumbnail:' + src);
  474. let fileinfo = (await db.File.create({
  475. orderId,
  476. subOrderId,
  477. src,
  478. dst,
  479. size: info.size,
  480. uploaded: 0,
  481. progress: 0,
  482. status: UploadStatus.WAITING,
  483. })).get();
  484. console.log(fileinfo);
  485. uploadQueue.push(new UploadJob(fileinfo, db));
  486. }
  487. }
  488. }
  489. }
  490. }
  491. };
  492. const archive = async (dir, outPath) => {
  493. return new Promise((resolve, reject) => {
  494. // create a file to stream archive data to.
  495. let output = fs.createWriteStream(outPath);
  496. let archive = archiver('zip', {
  497. zlib: { level: 9 }, // Sets the compression level.
  498. });
  499. // listen for all archive data to be written
  500. // 'close' event is fired only when a file descriptor is involved
  501. output.on('close', function() {
  502. console.log(archive.pointer() + ' total bytes');
  503. console.log('archiver has been finalized and the output file descriptor has closed.');
  504. resolve(outPath);
  505. });
  506. // This event is fired when the data source is drained no matter what was the data source.
  507. // It is not part of this library but rather from the NodeJS Stream API.
  508. // @see: https://nodejs.org/api/stream.html#stream_event_end
  509. output.on('end', function() {
  510. console.log('Data has been drained');
  511. });
  512. // good practice to catch warnings (ie stat failures and other non-blocking errors)
  513. archive.on('warning', function(err) {
  514. if (err.code === 'ENOENT') {
  515. // log warning
  516. } else {
  517. // throw error
  518. reject(err);
  519. }
  520. });
  521. // good practice to catch this error explicitly
  522. archive.on('error', function(err) {
  523. reject(err);
  524. });
  525. // pipe archive data to the file
  526. archive.pipe(output);
  527. archive.directory(dir, path.basename(dir));
  528. archive.finalize();
  529. });
  530. };
  531. const clearDB = async () => {
  532. await db.Order.destroy({ where: {} });
  533. await db.SubOrder.destroy({ where: {} });
  534. await db.File.destroy({ where: {} });
  535. require('child_process').execSync('rm -rf *', {
  536. cwd: path.resolve(app.getPath('userData'), 'thumb'),
  537. });
  538. require('child_process').execSync('rm -rf *', {
  539. cwd: path.resolve(app.getPath('userData'), 'archive'),
  540. });
  541. console.log('done');
  542. };
  543. //下载方法
  544. const downloadPause = async arg => {
  545. console.log('暂停');
  546. eventBus.publish('downpause', arg);
  547. };
  548. const downloadResume = async arg => {
  549. if (arg.id) {
  550. await db.Download.update(
  551. { status: DownloadStatus.WAITING },
  552. {
  553. where: {
  554. id: arg.id,
  555. },
  556. },
  557. );
  558. var record = await db.Download.findOne({ where: { id: arg.id } });
  559. console.log(record);
  560. let job = new DownloadJob(record, db);
  561. downloadQueue.push(job);
  562. }
  563. };
  564. const queryUnfinishedDownloadOrder = async userId => {
  565. let res = await sequelize.query(
  566. `SELECT
  567. *
  568. FROM
  569. Downloads
  570. WHERE
  571. userId = ${userId} AND status != ${DownloadStatus.FINISHED}`,
  572. { type: sequelize.QueryTypes.SELECT },
  573. );
  574. return res;
  575. };
  576. const queryfinishedDownloadOrder = async userId => {
  577. let res = await sequelize.query(
  578. `SELECT
  579. *
  580. FROM
  581. Downloads
  582. WHERE
  583. userId = ${userId} AND status = ${DownloadStatus.FINISHED}
  584. order by updatedAt DESC`,
  585. { type: sequelize.QueryTypes.SELECT },
  586. );
  587. return res;
  588. };
  589. async function getFilePath(pathName, i) {
  590. var _path = path.resolve(window.localStorage['downLoadingPath2'], pathName);
  591. var index = i ? i : 0;
  592. if (index) {
  593. _path = path.resolve(
  594. window.localStorage['downLoadingPath2'],
  595. path.basename(pathName, path.extname(pathName)) +
  596. '(' +
  597. i +
  598. ')' +
  599. path.extname(pathName),
  600. );
  601. }
  602. var exist = await fs.existsSync(_path);
  603. if (exist) {
  604. _path = await getFilePath(pathName, index + 1);
  605. }
  606. return _path;
  607. }
  608. const downloadFile = async (objectName, orderName, userId, quantity, userOrderId) => {
  609. let pathName = await getFilePath(userOrderId + '_' + orderName, 0);
  610. let download = (await db.Download.create({
  611. dst: objectName,
  612. size: 0,
  613. path: pathName,
  614. name: path.basename(pathName),
  615. progress: 0,
  616. checkPoint: 0,
  617. status: DownloadStatus.WAITING,
  618. userId: userId,
  619. finishedNum: quantity,
  620. userOrderId: userOrderId,
  621. })).get();
  622. downloadQueue.push(new DownloadJob(download, db));
  623. };
  624. const clearCache = async () => {
  625. rmDir = function(dirPath) {
  626. try {
  627. var files = fs.readdirSync(dirPath);
  628. } catch (e) {
  629. return;
  630. }
  631. for (var i = 0; i < files.length; i++) {
  632. var filePath = path.resolve(dirPath, files[i]);
  633. console.log(filePath);
  634. if (fs.statSync(filePath).isFile()) fs.unlinkSync(filePath);
  635. else rmDir(filePath);
  636. }
  637. };
  638. rmDir(path.resolve(app.getPath('userData'), 'archive'));
  639. rmdir(path.resolve(app.getPath('userData'), 'thumb'));
  640. };
  641. window.clearCache = clearCache;
  642. const deleteUploadedFile = async () => {
  643. let files = await db.File.findAll({
  644. where: {
  645. status: UploadStatus.FINISHED,
  646. },
  647. }).map(i => i.get());
  648. files.forEach(i => {
  649. fs.unlink(i.src, err => {
  650. if (!err) {
  651. console.log(`${i.src} deleted`);
  652. }
  653. });
  654. });
  655. };
  656. export default {
  657. init,
  658. pause,
  659. uploadOrder,
  660. queryUnfinishedOrder,
  661. clearDB,
  662. archive,
  663. queryDetail,
  664. resume,
  665. queryFinishedOrder,
  666. downloadFile,
  667. downloadPause,
  668. downloadResume,
  669. queryUnfinishedDownloadOrder,
  670. queryfinishedDownloadOrder,
  671. checkClientOrder,
  672. deleteUploadedFile,
  673. };