eventify.js 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. 'use strict'
  2. const check = require('check-types')
  3. const EventEmitter = require('events').EventEmitter
  4. const events = require('./events')
  5. const promise = require('./promise')
  6. const invalidTypes = {
  7. undefined: true, // eslint-disable-line no-undefined
  8. function: true,
  9. symbol: true
  10. }
  11. module.exports = eventify
  12. /**
  13. * Public function `eventify`.
  14. *
  15. * Returns an event emitter and asynchronously traverses a data structure
  16. * (depth-first), emitting events as it encounters items. Sanely handles
  17. * promises, buffers, maps and other iterables. The event emitter is
  18. * decorated with a `pause` method that can be called to pause processing.
  19. *
  20. * @param data: The data structure to traverse.
  21. *
  22. * @option promises: 'resolve' or 'ignore', default is 'resolve'.
  23. *
  24. * @option buffers: 'toString' or 'ignore', default is 'toString'.
  25. *
  26. * @option maps: 'object' or 'ignore', default is 'object'.
  27. *
  28. * @option iterables: 'array' or 'ignore', default is 'array'.
  29. *
  30. * @option circular: 'error' or 'ignore', default is 'error'.
  31. *
  32. * @option yieldRate: The number of data items to process per timeslice,
  33. * default is 16384.
  34. *
  35. * @option Promise: The promise constructor to use, defaults to bluebird.
  36. **/
  37. function eventify (data, options) {
  38. options = options || {}
  39. const coercions = {}
  40. const emitter = new EventEmitter()
  41. const Promise = promise(options)
  42. const references = new Map()
  43. let count = 0
  44. let disableCoercions = false
  45. let ignoreCircularReferences
  46. let ignoreItems
  47. let pause
  48. let yieldRate
  49. emitter.pause = () => {
  50. let resolve
  51. pause = new Promise(res => resolve = res)
  52. return () => {
  53. pause = null
  54. count = 0
  55. resolve()
  56. }
  57. }
  58. parseOptions()
  59. setImmediate(begin)
  60. return emitter
  61. function parseOptions () {
  62. parseCoercionOption('promises')
  63. parseCoercionOption('buffers')
  64. parseCoercionOption('maps')
  65. parseCoercionOption('iterables')
  66. if (Object.keys(coercions).length === 0) {
  67. disableCoercions = true
  68. }
  69. if (options.circular === 'ignore') {
  70. ignoreCircularReferences = true
  71. }
  72. check.assert.maybe.positive(options.yieldRate)
  73. yieldRate = options.yieldRate || 16384
  74. }
  75. function parseCoercionOption (key) {
  76. if (options[key] !== 'ignore') {
  77. coercions[key] = true
  78. }
  79. }
  80. function begin () {
  81. return proceed(data)
  82. .catch(error => emit(events.error, error))
  83. .then(() => emit(events.end))
  84. }
  85. function proceed (datum) {
  86. if (++count % yieldRate !== 0) {
  87. return coerce(datum).then(after)
  88. }
  89. return new Promise((resolve, reject) => {
  90. setImmediate(() => {
  91. coerce(datum)
  92. .then(after)
  93. .then(resolve)
  94. .catch(reject)
  95. })
  96. })
  97. function after (coerced) {
  98. if (isInvalidType(coerced)) {
  99. return
  100. }
  101. if (coerced === false || coerced === true || coerced === null) {
  102. return literal(coerced)
  103. }
  104. if (Array.isArray(coerced)) {
  105. return array(coerced)
  106. }
  107. const type = typeof coerced
  108. switch (type) {
  109. case 'number':
  110. return value(coerced, type)
  111. case 'string':
  112. return value(escapeString(coerced), type)
  113. default:
  114. return object(coerced)
  115. }
  116. }
  117. }
  118. function coerce (datum) {
  119. if (disableCoercions || check.primitive(datum)) {
  120. return Promise.resolve(datum)
  121. }
  122. if (check.instanceStrict(datum, Promise)) {
  123. return coerceThing(datum, 'promises', coercePromise).then(coerce)
  124. }
  125. if (check.instanceStrict(datum, Buffer)) {
  126. return coerceThing(datum, 'buffers', coerceBuffer)
  127. }
  128. if (check.instanceStrict(datum, Map)) {
  129. return coerceThing(datum, 'maps', coerceMap)
  130. }
  131. if (
  132. check.iterable(datum) &&
  133. check.not.string(datum) &&
  134. check.not.array(datum)
  135. ) {
  136. return coerceThing(datum, 'iterables', coerceIterable)
  137. }
  138. if (check.function(datum.toJSON)) {
  139. return Promise.resolve(datum.toJSON())
  140. }
  141. return Promise.resolve(datum)
  142. }
  143. function coerceThing (datum, thing, fn) {
  144. if (coercions[thing]) {
  145. return fn(datum)
  146. }
  147. return Promise.resolve()
  148. }
  149. function coercePromise (p) {
  150. return p
  151. }
  152. function coerceBuffer (buffer) {
  153. return Promise.resolve(buffer.toString())
  154. }
  155. function coerceMap (map) {
  156. const result = {}
  157. return coerceCollection(map, result, (item, key) => {
  158. result[key] = item
  159. })
  160. }
  161. function coerceCollection (coll, target, push) {
  162. coll.forEach(push)
  163. return Promise.resolve(target)
  164. }
  165. function coerceIterable (iterable) {
  166. const result = []
  167. return coerceCollection(iterable, result, item => {
  168. result.push(item)
  169. })
  170. }
  171. function isInvalidType (datum) {
  172. return !! invalidTypes[typeof datum]
  173. }
  174. function literal (datum) {
  175. return value(datum, 'literal')
  176. }
  177. function value (datum, type) {
  178. return emit(events[type], datum)
  179. }
  180. function emit (event, eventData) {
  181. return (pause || Promise.resolve())
  182. .then(() => emitter.emit(event, eventData))
  183. .catch(err => {
  184. try {
  185. emitter.emit(events.error, err)
  186. } catch (_) {
  187. // When calling user code, anything is possible
  188. }
  189. })
  190. }
  191. function array (datum) {
  192. // For an array, collection:object and collection:array are the same.
  193. return collection(datum, datum, 'array', item => {
  194. if (isInvalidType(item)) {
  195. return proceed(null)
  196. }
  197. return proceed(item)
  198. })
  199. }
  200. function collection (obj, arr, type, action) {
  201. let ignoreThisItem
  202. return Promise.resolve()
  203. .then(() => {
  204. if (references.has(obj)) {
  205. ignoreThisItem = ignoreItems = true
  206. if (! ignoreCircularReferences) {
  207. return emit(events.error, new Error('Circular reference.'))
  208. }
  209. } else {
  210. references.set(obj, true)
  211. }
  212. })
  213. .then(() => emit(events[type]))
  214. .then(() => item(0))
  215. function item (index) {
  216. if (index >= arr.length) {
  217. if (ignoreThisItem) {
  218. ignoreItems = false
  219. }
  220. if (ignoreItems) {
  221. return Promise.resolve()
  222. }
  223. return emit(events.endPrefix + events[type])
  224. .then(() => references.delete(obj))
  225. }
  226. if (ignoreItems) {
  227. return item(index + 1)
  228. }
  229. return action(arr[index])
  230. .then(() => item(index + 1))
  231. }
  232. }
  233. function object (datum) {
  234. // For an object, collection:object and collection:array are different.
  235. return collection(datum, Object.keys(datum), 'object', key => {
  236. const item = datum[key]
  237. if (isInvalidType(item)) {
  238. return Promise.resolve()
  239. }
  240. return emit(events.property, key)
  241. .then(() => proceed(item))
  242. })
  243. }
  244. function escapeString (string) {
  245. string = JSON.stringify(string)
  246. return string.substring(1, string.length - 1)
  247. }
  248. }