farm.js 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. 'use strict'
  2. const DEFAULT_OPTIONS = {
  3. workerOptions : {}
  4. , maxCallsPerWorker : Infinity
  5. , maxConcurrentWorkers : (require('os').cpus() || { length: 1 }).length
  6. , maxConcurrentCallsPerWorker : 10
  7. , maxConcurrentCalls : Infinity
  8. , maxCallTime : Infinity // exceed this and the whole worker is terminated
  9. , maxRetries : Infinity
  10. , forcedKillTime : 100
  11. , autoStart : false
  12. }
  13. const fork = require('./fork')
  14. , TimeoutError = require('errno').create('TimeoutError')
  15. , ProcessTerminatedError = require('errno').create('ProcessTerminatedError')
  16. , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')
  17. function Farm (options, path) {
  18. this.options = Object.assign({}, DEFAULT_OPTIONS, options)
  19. this.path = path
  20. this.activeCalls = 0
  21. }
  22. // make a handle to pass back in the form of an external API
  23. Farm.prototype.mkhandle = function (method) {
  24. return function () {
  25. let args = Array.prototype.slice.call(arguments)
  26. if (this.activeCalls >= this.options.maxConcurrentCalls) {
  27. let err = new MaxConcurrentCallsError('Too many concurrent calls (' + this.activeCalls + ')')
  28. if (typeof args[args.length - 1] == 'function')
  29. return process.nextTick(args[args.length - 1].bind(null, err))
  30. throw err
  31. }
  32. this.addCall({
  33. method : method
  34. , callback : args.pop()
  35. , args : args
  36. , retries : 0
  37. })
  38. }.bind(this)
  39. }
  40. // a constructor of sorts
  41. Farm.prototype.setup = function (methods) {
  42. let iface
  43. if (!methods) { // single-function export
  44. iface = this.mkhandle()
  45. } else { // multiple functions on the export
  46. iface = {}
  47. methods.forEach(function (m) {
  48. iface[m] = this.mkhandle(m)
  49. }.bind(this))
  50. }
  51. this.searchStart = -1
  52. this.childId = -1
  53. this.children = {}
  54. this.activeChildren = 0
  55. this.callQueue = []
  56. if (this.options.autoStart) {
  57. while (this.activeChildren < this.options.maxConcurrentWorkers)
  58. this.startChild()
  59. }
  60. return iface
  61. }
  62. // when a child exits, check if there are any outstanding jobs and requeue them
  63. Farm.prototype.onExit = function (childId) {
  64. // delay this to give any sends a chance to finish
  65. setTimeout(function () {
  66. let doQueue = false
  67. if (this.children[childId] && this.children[childId].activeCalls) {
  68. this.children[childId].calls.forEach(function (call, i) {
  69. if (!call) return
  70. else if (call.retries >= this.options.maxRetries) {
  71. this.receive({
  72. idx : i
  73. , child : childId
  74. , args : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ]
  75. })
  76. } else {
  77. call.retries++
  78. this.callQueue.unshift(call)
  79. doQueue = true
  80. }
  81. }.bind(this))
  82. }
  83. this.stopChild(childId)
  84. doQueue && this.processQueue()
  85. }.bind(this), 10)
  86. }
  87. // start a new worker
  88. Farm.prototype.startChild = function () {
  89. this.childId++
  90. let forked = fork(this.path, this.options.workerOptions)
  91. , id = this.childId
  92. , c = {
  93. send : forked.send
  94. , child : forked.child
  95. , calls : []
  96. , activeCalls : 0
  97. , exitCode : null
  98. }
  99. forked.child.on('message', this.receive.bind(this))
  100. forked.child.once('exit', function (code) {
  101. c.exitCode = code
  102. this.onExit(id)
  103. }.bind(this))
  104. this.activeChildren++
  105. this.children[id] = c
  106. }
  107. // stop a worker, identified by id
  108. Farm.prototype.stopChild = function (childId) {
  109. let child = this.children[childId]
  110. if (child) {
  111. child.send('die')
  112. setTimeout(function () {
  113. if (child.exitCode === null)
  114. child.child.kill('SIGKILL')
  115. }, this.options.forcedKillTime).unref()
  116. ;delete this.children[childId]
  117. this.activeChildren--
  118. }
  119. }
  120. // called from a child process, the data contains information needed to
  121. // look up the child and the original call so we can invoke the callback
  122. Farm.prototype.receive = function (data) {
  123. let idx = data.idx
  124. , childId = data.child
  125. , args = data.args
  126. , child = this.children[childId]
  127. , call
  128. if (!child) {
  129. return console.error(
  130. 'Worker Farm: Received message for unknown child. '
  131. + 'This is likely as a result of premature child death, '
  132. + 'the operation will have been re-queued.'
  133. )
  134. }
  135. call = child.calls[idx]
  136. if (!call) {
  137. return console.error(
  138. 'Worker Farm: Received message for unknown index for existing child. '
  139. + 'This should not happen!'
  140. )
  141. }
  142. if (this.options.maxCallTime !== Infinity)
  143. clearTimeout(call.timer)
  144. if (args[0] && args[0].$error == '$error') {
  145. let e = args[0]
  146. switch (e.type) {
  147. case 'TypeError': args[0] = new TypeError(e.message); break
  148. case 'RangeError': args[0] = new RangeError(e.message); break
  149. case 'EvalError': args[0] = new EvalError(e.message); break
  150. case 'ReferenceError': args[0] = new ReferenceError(e.message); break
  151. case 'SyntaxError': args[0] = new SyntaxError(e.message); break
  152. case 'URIError': args[0] = new URIError(e.message); break
  153. default: args[0] = new Error(e.message)
  154. }
  155. args[0].type = e.type
  156. args[0].stack = e.stack
  157. // Copy any custom properties to pass it on.
  158. Object.keys(e).forEach(function(key) {
  159. args[0][key] = e[key];
  160. });
  161. }
  162. process.nextTick(function () {
  163. call.callback.apply(null, args)
  164. })
  165. ;delete child.calls[idx]
  166. child.activeCalls--
  167. this.activeCalls--
  168. if (child.calls.length >= this.options.maxCallsPerWorker
  169. && !Object.keys(child.calls).length) {
  170. // this child has finished its run, kill it
  171. this.stopChild(childId)
  172. }
  173. // allow any outstanding calls to be processed
  174. this.processQueue()
  175. }
  176. Farm.prototype.childTimeout = function (childId) {
  177. let child = this.children[childId]
  178. , i
  179. if (!child)
  180. return
  181. for (i in child.calls) {
  182. this.receive({
  183. idx : i
  184. , child : childId
  185. , args : [ new TimeoutError('worker call timed out!') ]
  186. })
  187. }
  188. this.stopChild(childId)
  189. }
  190. // send a call to a worker, identified by id
  191. Farm.prototype.send = function (childId, call) {
  192. let child = this.children[childId]
  193. , idx = child.calls.length
  194. child.calls.push(call)
  195. child.activeCalls++
  196. this.activeCalls++
  197. child.send({
  198. idx : idx
  199. , child : childId
  200. , method : call.method
  201. , args : call.args
  202. })
  203. if (this.options.maxCallTime !== Infinity) {
  204. call.timer =
  205. setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime)
  206. }
  207. }
  208. // a list of active worker ids, in order, but the starting offset is
  209. // shifted each time this method is called, so we work our way through
  210. // all workers when handing out jobs
  211. Farm.prototype.childKeys = function () {
  212. let cka = Object.keys(this.children)
  213. , cks
  214. if (this.searchStart >= cka.length - 1)
  215. this.searchStart = 0
  216. else
  217. this.searchStart++
  218. cks = cka.splice(0, this.searchStart)
  219. return cka.concat(cks)
  220. }
  221. // Calls are added to a queue, this processes the queue and is called
  222. // whenever there might be a chance to send more calls to the workers.
  223. // The various options all impact on when we're able to send calls,
  224. // they may need to be kept in a queue until a worker is ready.
  225. Farm.prototype.processQueue = function () {
  226. let cka, i = 0, childId
  227. if (!this.callQueue.length)
  228. return this.ending && this.end()
  229. if (this.activeChildren < this.options.maxConcurrentWorkers)
  230. this.startChild()
  231. for (cka = this.childKeys(); i < cka.length; i++) {
  232. childId = +cka[i]
  233. if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
  234. && this.children[childId].calls.length < this.options.maxCallsPerWorker) {
  235. this.send(childId, this.callQueue.shift())
  236. if (!this.callQueue.length)
  237. return this.ending && this.end()
  238. } /*else {
  239. console.log(
  240. , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
  241. , this.children[childId].calls.length < this.options.maxCallsPerWorker
  242. , this.children[childId].calls.length , this.options.maxCallsPerWorker)
  243. }*/
  244. }
  245. if (this.ending)
  246. this.end()
  247. }
  248. // add a new call to the call queue, then trigger a process of the queue
  249. Farm.prototype.addCall = function (call) {
  250. if (this.ending)
  251. return this.end() // don't add anything new to the queue
  252. this.callQueue.push(call)
  253. this.processQueue()
  254. }
  255. // kills child workers when they're all done
  256. Farm.prototype.end = function (callback) {
  257. let complete = true
  258. if (this.ending === false)
  259. return
  260. if (callback)
  261. this.ending = callback
  262. else if (this.ending == null)
  263. this.ending = true
  264. Object.keys(this.children).forEach(function (child) {
  265. if (!this.children[child])
  266. return
  267. if (!this.children[child].activeCalls)
  268. this.stopChild(child)
  269. else
  270. complete = false
  271. }.bind(this))
  272. if (complete && typeof this.ending == 'function') {
  273. process.nextTick(function () {
  274. this.ending()
  275. this.ending = false
  276. }.bind(this))
  277. }
  278. }
  279. module.exports = Farm
  280. module.exports.TimeoutError = TimeoutError