subscription.js 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. /*
  2. This file is part of web3.js.
  3. web3.js is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Lesser General Public License as published by
  5. the Free Software Foundation, either version 3 of the License, or
  6. (at your option) any later version.
  7. web3.js is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Lesser General Public License for more details.
  11. You should have received a copy of the GNU Lesser General Public License
  12. along with web3.js. If not, see <http://www.gnu.org/licenses/>.
  13. */
  14. /**
  15. * @file subscription.js
  16. * @author Fabian Vogelsteller <fabian@ethereum.org>
  17. * @date 2017
  18. */
  19. "use strict";
  20. var errors = require('web3-core-helpers').errors;
  21. var EventEmitter = require('eventemitter3');
  22. var formatters = require('web3-core-helpers').formatters;
  23. function identity(value) {
  24. return value;
  25. }
  26. function Subscription(options) {
  27. EventEmitter.call(this);
  28. this.id = null;
  29. this.callback = identity;
  30. this.arguments = null;
  31. this.lastBlock = null; // "from" block tracker for backfilling events on reconnection
  32. this.options = {
  33. subscription: options.subscription,
  34. type: options.type,
  35. requestManager: options.requestManager
  36. };
  37. }
  38. // INHERIT
  39. Subscription.prototype = Object.create(EventEmitter.prototype);
  40. Subscription.prototype.constructor = Subscription;
  41. /**
  42. * Should be used to extract callback from array of arguments. Modifies input param
  43. *
  44. * @method extractCallback
  45. * @param {Array} arguments
  46. * @return {Function|Null} callback, if exists
  47. */
  48. Subscription.prototype._extractCallback = function (args) {
  49. if (typeof args[args.length - 1] === 'function') {
  50. return args.pop(); // modify the args array!
  51. }
  52. };
  53. /**
  54. * Should be called to check if the number of arguments is correct
  55. *
  56. * @method validateArgs
  57. * @param {Array} arguments
  58. * @throws {Error} if it is not
  59. */
  60. Subscription.prototype._validateArgs = function (args) {
  61. var subscription = this.options.subscription;
  62. if (!subscription)
  63. subscription = {};
  64. if (!subscription.params)
  65. subscription.params = 0;
  66. if (args.length !== subscription.params) {
  67. throw errors.InvalidNumberOfParams(args.length, subscription.params, subscription.subscriptionName);
  68. }
  69. };
  70. /**
  71. * Should be called to format input args of method
  72. *
  73. * @method formatInput
  74. * @param {Array}
  75. * @return {Array}
  76. */
  77. Subscription.prototype._formatInput = function (args) {
  78. var subscription = this.options.subscription;
  79. if (!subscription) {
  80. return args;
  81. }
  82. if (!subscription.inputFormatter) {
  83. return args;
  84. }
  85. var formattedArgs = subscription.inputFormatter.map(function (formatter, index) {
  86. return formatter ? formatter(args[index]) : args[index];
  87. });
  88. return formattedArgs;
  89. };
  90. /**
  91. * Should be called to format output(result) of method
  92. *
  93. * @method formatOutput
  94. * @param result {Object}
  95. * @return {Object}
  96. */
  97. Subscription.prototype._formatOutput = function (result) {
  98. var subscription = this.options.subscription;
  99. return (subscription && subscription.outputFormatter && result) ? subscription.outputFormatter(result) : result;
  100. };
  101. /**
  102. * Should create payload from given input args
  103. *
  104. * @method toPayload
  105. * @param {Array} args
  106. * @return {Object}
  107. */
  108. Subscription.prototype._toPayload = function (args) {
  109. var params = [];
  110. this.callback = this._extractCallback(args) || identity;
  111. if (!this.subscriptionMethod) {
  112. this.subscriptionMethod = args.shift();
  113. // replace subscription with given name
  114. if (this.options.subscription.subscriptionName) {
  115. this.subscriptionMethod = this.options.subscription.subscriptionName;
  116. }
  117. }
  118. if (!this.arguments) {
  119. this.arguments = this._formatInput(args);
  120. this._validateArgs(this.arguments);
  121. args = []; // make empty after validation
  122. }
  123. // re-add subscriptionName
  124. params.push(this.subscriptionMethod);
  125. params = params.concat(this.arguments);
  126. if (args.length) {
  127. throw new Error('Only a callback is allowed as parameter on an already instantiated subscription.');
  128. }
  129. return {
  130. method: this.options.type + '_subscribe',
  131. params: params
  132. };
  133. };
  134. /**
  135. * Unsubscribes and clears callbacks
  136. *
  137. * @method unsubscribe
  138. * @return {Object}
  139. */
  140. Subscription.prototype.unsubscribe = function (callback) {
  141. this.options.requestManager.removeSubscription(this.id, callback);
  142. this.id = null;
  143. this.lastBlock = null;
  144. this.removeAllListeners();
  145. };
  146. /**
  147. * Subscribes and watches for changes
  148. *
  149. * @method subscribe
  150. * @param {String} subscription the subscription
  151. * @param {Object} options the options object with address topics and fromBlock
  152. * @return {Object}
  153. */
  154. Subscription.prototype.subscribe = function () {
  155. var _this = this;
  156. var args = Array.prototype.slice.call(arguments);
  157. var payload = this._toPayload(args);
  158. if (!payload) {
  159. return this;
  160. }
  161. // throw error, if provider is not set
  162. if (!this.options.requestManager.provider) {
  163. setTimeout(function () {
  164. var err1 = new Error('No provider set.');
  165. _this.callback(err1, null, _this);
  166. _this.emit('error', err1);
  167. }, 0);
  168. return this;
  169. }
  170. // throw error, if provider doesnt support subscriptions
  171. if (!this.options.requestManager.provider.on) {
  172. setTimeout(function () {
  173. var err2 = new Error('The current provider doesn\'t support subscriptions: ' +
  174. _this.options.requestManager.provider.constructor.name);
  175. _this.callback(err2, null, _this);
  176. _this.emit('error', err2);
  177. }, 0);
  178. return this;
  179. }
  180. // Re-subscription only: continue fetching from the last block we received.
  181. // a dropped connection may have resulted in gaps in the logs...
  182. if (this.lastBlock && !!this.options.params && typeof this.options.params === 'object') {
  183. payload.params[1] = this.options.params;
  184. payload.params[1].fromBlock = formatters.inputBlockNumberFormatter(this.lastBlock + 1);
  185. }
  186. // if id is there unsubscribe first
  187. if (this.id) {
  188. this.unsubscribe();
  189. }
  190. // store the params in the options object
  191. this.options.params = payload.params[1];
  192. // get past logs, if fromBlock is available
  193. if (payload.params[0] === 'logs' && !!payload.params[1] && typeof payload.params[1] === 'object' && payload.params[1].hasOwnProperty('fromBlock') && isFinite(payload.params[1].fromBlock)) {
  194. // send the subscription request
  195. // copy the params to avoid race-condition with deletion below this block
  196. var blockParams = Object.assign({}, payload.params[1]);
  197. this.options.requestManager.send({
  198. method: 'eth_getLogs',
  199. params: [blockParams]
  200. }, function (err, logs) {
  201. if (!err) {
  202. logs.forEach(function (log) {
  203. var output = _this._formatOutput(log);
  204. _this.callback(null, output, _this);
  205. _this.emit('data', output);
  206. });
  207. // TODO subscribe here? after the past logs?
  208. }
  209. else {
  210. setTimeout(function () {
  211. _this.callback(err, null, _this);
  212. _this.emit('error', err);
  213. }, 0);
  214. }
  215. });
  216. }
  217. // create subscription
  218. // TODO move to separate function? so that past logs can go first?
  219. if (typeof payload.params[1] === 'object')
  220. delete payload.params[1].fromBlock;
  221. this.options.requestManager.send(payload, function (err, result) {
  222. if (!err && result) {
  223. _this.id = result;
  224. _this.method = payload.params[0];
  225. // call callback on notifications
  226. _this.options.requestManager.addSubscription(_this, function (error, result) {
  227. if (!error) {
  228. if (!Array.isArray(result)) {
  229. result = [result];
  230. }
  231. result.forEach(function (resultItem) {
  232. var output = _this._formatOutput(resultItem);
  233. // Track current block (for gaps introduced by dropped connections)
  234. _this.lastBlock = !!output && typeof output === 'object' ? output.blockNumber : null;
  235. if (typeof _this.options.subscription.subscriptionHandler === 'function') {
  236. return _this.options.subscription.subscriptionHandler.call(_this, output);
  237. }
  238. else {
  239. _this.emit('data', output);
  240. }
  241. // call the callback, last so that unsubscribe there won't affect the emit above
  242. _this.callback(null, output, _this);
  243. });
  244. }
  245. else {
  246. _this.callback(error, false, _this);
  247. _this.emit('error', error);
  248. }
  249. });
  250. _this.emit('connected', result);
  251. }
  252. else {
  253. setTimeout(function () {
  254. _this.callback(err, false, _this);
  255. _this.emit('error', err);
  256. }, 0);
  257. }
  258. });
  259. // return an object to cancel the subscription
  260. return this;
  261. };
  262. /**
  263. * Resubscribe
  264. *
  265. * @method resubscribe
  266. *
  267. * @returns {void}
  268. */
  269. Subscription.prototype.resubscribe = function () {
  270. this.options.requestManager.removeSubscription(this.id); // unsubscribe
  271. this.id = null;
  272. this.subscribe(this.callback);
  273. };
  274. module.exports = Subscription;