test.js 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. var tape = require('tape')
  2. var through = require('through2')
  3. var pumpify = require('./')
  4. var stream = require('stream')
  5. tape('basic', function(t) {
  6. t.plan(3)
  7. var pipeline = pumpify(
  8. through(function(data, enc, cb) {
  9. t.same(data.toString(), 'hello')
  10. cb(null, data.toString().toUpperCase())
  11. }),
  12. through(function(data, enc, cb) {
  13. t.same(data.toString(), 'HELLO')
  14. cb(null, data.toString().toLowerCase())
  15. })
  16. )
  17. pipeline.write('hello')
  18. pipeline.on('data', function(data) {
  19. t.same(data.toString(), 'hello')
  20. t.end()
  21. })
  22. })
  23. tape('3 times', function(t) {
  24. t.plan(4)
  25. var pipeline = pumpify(
  26. through(function(data, enc, cb) {
  27. t.same(data.toString(), 'hello')
  28. cb(null, data.toString().toUpperCase())
  29. }),
  30. through(function(data, enc, cb) {
  31. t.same(data.toString(), 'HELLO')
  32. cb(null, data.toString().toLowerCase())
  33. }),
  34. through(function(data, enc, cb) {
  35. t.same(data.toString(), 'hello')
  36. cb(null, data.toString().toUpperCase())
  37. })
  38. )
  39. pipeline.write('hello')
  40. pipeline.on('data', function(data) {
  41. t.same(data.toString(), 'HELLO')
  42. t.end()
  43. })
  44. })
  45. tape('destroy', function(t) {
  46. var test = through()
  47. test.destroy = function() {
  48. t.ok(true)
  49. t.end()
  50. }
  51. var pipeline = pumpify(through(), test)
  52. pipeline.destroy()
  53. })
  54. tape('close', function(t) {
  55. var test = through()
  56. var pipeline = pumpify(through(), test)
  57. pipeline.on('error', function(err) {
  58. t.same(err.message, 'lol')
  59. t.end()
  60. })
  61. test.emit('error', new Error('lol'))
  62. })
  63. tape('end waits for last one', function(t) {
  64. var ran = false
  65. var a = through()
  66. var b = through()
  67. var c = through(function(data, enc, cb) {
  68. setTimeout(function() {
  69. ran = true
  70. cb()
  71. }, 100)
  72. })
  73. var pipeline = pumpify(a, b, c)
  74. pipeline.write('foo')
  75. pipeline.end(function() {
  76. t.ok(ran)
  77. t.end()
  78. })
  79. t.ok(!ran)
  80. })
  81. tape('always wait for finish', function(t) {
  82. var a = new stream.Readable()
  83. a._read = function() {}
  84. a.push('hello')
  85. var pipeline = pumpify(a, through(), through())
  86. var ran = false
  87. pipeline.on('finish', function() {
  88. t.ok(ran)
  89. t.end()
  90. })
  91. setTimeout(function() {
  92. ran = true
  93. a.push(null)
  94. }, 100)
  95. })
  96. tape('async', function(t) {
  97. var pipeline = pumpify()
  98. t.plan(4)
  99. pipeline.write('hello')
  100. pipeline.on('data', function(data) {
  101. t.same(data.toString(), 'HELLO')
  102. t.end()
  103. })
  104. setTimeout(function() {
  105. pipeline.setPipeline(
  106. through(function(data, enc, cb) {
  107. t.same(data.toString(), 'hello')
  108. cb(null, data.toString().toUpperCase())
  109. }),
  110. through(function(data, enc, cb) {
  111. t.same(data.toString(), 'HELLO')
  112. cb(null, data.toString().toLowerCase())
  113. }),
  114. through(function(data, enc, cb) {
  115. t.same(data.toString(), 'hello')
  116. cb(null, data.toString().toUpperCase())
  117. })
  118. )
  119. }, 100)
  120. })
  121. tape('early destroy', function(t) {
  122. var a = through()
  123. var b = through()
  124. var c = through()
  125. b.destroy = function() {
  126. t.ok(true)
  127. t.end()
  128. }
  129. var pipeline = pumpify()
  130. pipeline.destroy()
  131. setTimeout(function() {
  132. pipeline.setPipeline(a, b, c)
  133. }, 100)
  134. })
  135. tape('preserves error', function (t) {
  136. var a = through()
  137. var b = through(function (data, enc, cb) {
  138. cb(new Error('stop'))
  139. })
  140. var c = through()
  141. var s = pumpify()
  142. s.on('error', function (err) {
  143. t.same(err.message, 'stop')
  144. t.end()
  145. })
  146. s.setPipeline(a, b, c)
  147. s.resume()
  148. s.write('hi')
  149. })
  150. tape('preserves error again', function (t) {
  151. var ws = new stream.Writable()
  152. var rs = new stream.Readable()
  153. ws._write = function (data, enc, cb) {
  154. cb(null)
  155. }
  156. rs._read = function () {
  157. this.push(Buffer.alloc(16 * 1024))
  158. }
  159. var pumpifyErr = pumpify(
  160. through(),
  161. through(function(chunk, _, cb) {
  162. cb(new Error('test'))
  163. }),
  164. ws
  165. )
  166. rs.pipe(pumpifyErr)
  167. .on('error', function (err) {
  168. t.ok(err)
  169. t.ok(err.message !== 'premature close', 'does not close with premature close')
  170. t.end()
  171. })
  172. })