index.js 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306
  1. // @ts-check
  2. const os = require('os');
  3. const throng = require('throng');
  4. const dotenv = require('dotenv');
  5. const express = require('express');
  6. const http = require('http');
  7. const redis = require('redis');
  8. const pg = require('pg');
  9. const log = require('npmlog');
  10. const url = require('url');
  11. const uuid = require('uuid');
  12. const fs = require('fs');
  13. const WebSocket = require('ws');
  14. const { JSDOM } = require('jsdom');
  15. const env = process.env.NODE_ENV || 'development';
  16. const alwaysRequireAuth = process.env.LIMITED_FEDERATION_MODE === 'true' || process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
  17. dotenv.config({
  18. path: env === 'production' ? '.env.production' : '.env',
  19. });
  20. log.level = process.env.LOG_LEVEL || 'verbose';
  21. /**
  22. * @param {string} dbUrl
  23. * @return {Object.<string, any>}
  24. */
  25. const dbUrlToConfig = (dbUrl) => {
  26. if (!dbUrl) {
  27. return {};
  28. }
  29. const params = url.parse(dbUrl, true);
  30. const config = {};
  31. if (params.auth) {
  32. [config.user, config.password] = params.auth.split(':');
  33. }
  34. if (params.hostname) {
  35. config.host = params.hostname;
  36. }
  37. if (params.port) {
  38. config.port = params.port;
  39. }
  40. if (params.pathname) {
  41. config.database = params.pathname.split('/')[1];
  42. }
  43. const ssl = params.query && params.query.ssl;
  44. if (ssl && ssl === 'true' || ssl === '1') {
  45. config.ssl = true;
  46. }
  47. return config;
  48. };
  49. /**
  50. * @param {Object.<string, any>} defaultConfig
  51. * @param {string} redisUrl
  52. */
  53. const redisUrlToClient = async (defaultConfig, redisUrl) => {
  54. const config = defaultConfig;
  55. let client;
  56. if (!redisUrl) {
  57. client = redis.createClient(config);
  58. } else if (redisUrl.startsWith('unix://')) {
  59. client = redis.createClient(Object.assign(config, {
  60. socket: {
  61. path: redisUrl.slice(7),
  62. },
  63. }));
  64. } else {
  65. client = redis.createClient(Object.assign(config, {
  66. url: redisUrl,
  67. }));
  68. }
  69. client.on('error', (err) => log.error('Redis Client Error!', err));
  70. await client.connect();
  71. return client;
  72. };
  73. const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
  74. /**
  75. * @param {string} json
  76. * @param {any} req
  77. * @return {Object.<string, any>|null}
  78. */
  79. const parseJSON = (json, req) => {
  80. try {
  81. return JSON.parse(json);
  82. } catch (err) {
  83. if (req.accountId) {
  84. log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
  85. } else {
  86. log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
  87. }
  88. return null;
  89. }
  90. };
  91. const startMaster = () => {
  92. if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
  93. log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
  94. }
  95. log.warn(`Starting streaming API server master with ${numWorkers} workers`);
  96. };
  97. const startWorker = async (workerId) => {
  98. log.warn(`Starting worker ${workerId}`);
  99. const pgConfigs = {
  100. development: {
  101. user: process.env.DB_USER || pg.defaults.user,
  102. password: process.env.DB_PASS || pg.defaults.password,
  103. database: process.env.DB_NAME || 'mastodon_development',
  104. host: process.env.DB_HOST || pg.defaults.host,
  105. port: process.env.DB_PORT || pg.defaults.port,
  106. max: 10,
  107. },
  108. production: {
  109. user: process.env.DB_USER || 'mastodon',
  110. password: process.env.DB_PASS || '',
  111. database: process.env.DB_NAME || 'mastodon_production',
  112. host: process.env.DB_HOST || 'localhost',
  113. port: process.env.DB_PORT || 5432,
  114. max: 10,
  115. },
  116. };
  117. if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
  118. pgConfigs.development.ssl = true;
  119. pgConfigs.production.ssl = true;
  120. }
  121. const app = express();
  122. app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal');
  123. const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL)));
  124. const server = http.createServer(app);
  125. const redisNamespace = process.env.REDIS_NAMESPACE || null;
  126. const redisParams = {
  127. socket: {
  128. host: process.env.REDIS_HOST || '127.0.0.1',
  129. port: process.env.REDIS_PORT || 6379,
  130. },
  131. database: process.env.REDIS_DB || 0,
  132. password: process.env.REDIS_PASSWORD || undefined,
  133. };
  134. if (redisNamespace) {
  135. redisParams.namespace = redisNamespace;
  136. }
  137. const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
  138. /**
  139. * @type {Object.<string, Array.<function(string): void>>}
  140. */
  141. const subs = {};
  142. const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
  143. const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
  144. /**
  145. * @param {string[]} channels
  146. * @return {function(): void}
  147. */
  148. const subscriptionHeartbeat = channels => {
  149. const interval = 6 * 60;
  150. const tellSubscribed = () => {
  151. channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3));
  152. };
  153. tellSubscribed();
  154. const heartbeat = setInterval(tellSubscribed, interval * 1000);
  155. return () => {
  156. clearInterval(heartbeat);
  157. };
  158. };
  159. /**
  160. * @param {string} message
  161. * @param {string} channel
  162. */
  163. const onRedisMessage = (message, channel) => {
  164. const callbacks = subs[channel];
  165. log.silly(`New message on channel ${channel}`);
  166. if (!callbacks) {
  167. return;
  168. }
  169. callbacks.forEach(callback => callback(message));
  170. };
  171. /**
  172. * @param {string} channel
  173. * @param {function(string): void} callback
  174. */
  175. const subscribe = (channel, callback) => {
  176. log.silly(`Adding listener for ${channel}`);
  177. subs[channel] = subs[channel] || [];
  178. if (subs[channel].length === 0) {
  179. log.verbose(`Subscribe ${channel}`);
  180. redisSubscribeClient.subscribe(channel, onRedisMessage);
  181. }
  182. subs[channel].push(callback);
  183. };
  184. /**
  185. * @param {string} channel
  186. */
  187. const unsubscribe = (channel, callback) => {
  188. log.silly(`Removing listener for ${channel}`);
  189. if (!subs[channel]) {
  190. return;
  191. }
  192. subs[channel] = subs[channel].filter(item => item !== callback);
  193. if (subs[channel].length === 0) {
  194. log.verbose(`Unsubscribe ${channel}`);
  195. redisSubscribeClient.unsubscribe(channel);
  196. delete subs[channel];
  197. }
  198. };
  199. const FALSE_VALUES = [
  200. false,
  201. 0,
  202. '0',
  203. 'f',
  204. 'F',
  205. 'false',
  206. 'FALSE',
  207. 'off',
  208. 'OFF',
  209. ];
  210. /**
  211. * @param {any} value
  212. * @return {boolean}
  213. */
  214. const isTruthy = value =>
  215. value && !FALSE_VALUES.includes(value);
  216. /**
  217. * @param {any} req
  218. * @param {any} res
  219. * @param {function(Error=): void}
  220. */
  221. const allowCrossDomain = (req, res, next) => {
  222. res.header('Access-Control-Allow-Origin', '*');
  223. res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control');
  224. res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
  225. next();
  226. };
  227. /**
  228. * @param {any} req
  229. * @param {any} res
  230. * @param {function(Error=): void}
  231. */
  232. const setRequestId = (req, res, next) => {
  233. req.requestId = uuid.v4();
  234. res.header('X-Request-Id', req.requestId);
  235. next();
  236. };
  237. /**
  238. * @param {any} req
  239. * @param {any} res
  240. * @param {function(Error=): void}
  241. */
  242. const setRemoteAddress = (req, res, next) => {
  243. req.remoteAddress = req.connection.remoteAddress;
  244. next();
  245. };
  246. /**
  247. * @param {any} req
  248. * @param {string[]} necessaryScopes
  249. * @return {boolean}
  250. */
  251. const isInScope = (req, necessaryScopes) =>
  252. req.scopes.some(scope => necessaryScopes.includes(scope));
  253. /**
  254. * @param {string} token
  255. * @param {any} req
  256. * @return {Promise.<void>}
  257. */
  258. const accountFromToken = (token, req) => new Promise((resolve, reject) => {
  259. pgPool.connect((err, client, done) => {
  260. if (err) {
  261. reject(err);
  262. return;
  263. }
  264. client.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
  265. done();
  266. if (err) {
  267. reject(err);
  268. return;
  269. }
  270. if (result.rows.length === 0) {
  271. err = new Error('Invalid access token');
  272. err.status = 401;
  273. reject(err);
  274. return;
  275. }
  276. req.accessTokenId = result.rows[0].id;
  277. req.scopes = result.rows[0].scopes.split(' ');
  278. req.accountId = result.rows[0].account_id;
  279. req.chosenLanguages = result.rows[0].chosen_languages;
  280. req.deviceId = result.rows[0].device_id;
  281. resolve();
  282. });
  283. });
  284. });
  285. /**
  286. * @param {any} req
  287. * @param {boolean=} required
  288. * @return {Promise.<void>}
  289. */
  290. const accountFromRequest = (req, required = true) => new Promise((resolve, reject) => {
  291. const authorization = req.headers.authorization;
  292. const location = url.parse(req.url, true);
  293. const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
  294. if (!authorization && !accessToken) {
  295. if (required) {
  296. const err = new Error('Missing access token');
  297. err.status = 401;
  298. reject(err);
  299. return;
  300. } else {
  301. resolve();
  302. return;
  303. }
  304. }
  305. const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
  306. resolve(accountFromToken(token, req));
  307. });
  308. /**
  309. * @param {any} req
  310. * @return {string}
  311. */
  312. const channelNameFromPath = req => {
  313. const { path, query } = req;
  314. const onlyMedia = isTruthy(query.only_media);
  315. switch (path) {
  316. case '/api/v1/streaming/user':
  317. return 'user';
  318. case '/api/v1/streaming/user/notification':
  319. return 'user:notification';
  320. case '/api/v1/streaming/public':
  321. return onlyMedia ? 'public:media' : 'public';
  322. case '/api/v1/streaming/public/local':
  323. return onlyMedia ? 'public:local:media' : 'public:local';
  324. case '/api/v1/streaming/public/remote':
  325. return onlyMedia ? 'public:remote:media' : 'public:remote';
  326. case '/api/v1/streaming/hashtag':
  327. return 'hashtag';
  328. case '/api/v1/streaming/hashtag/local':
  329. return 'hashtag:local';
  330. case '/api/v1/streaming/direct':
  331. return 'direct';
  332. case '/api/v1/streaming/list':
  333. return 'list';
  334. default:
  335. return undefined;
  336. }
  337. };
  338. const PUBLIC_CHANNELS = [
  339. 'public',
  340. 'public:media',
  341. 'public:local',
  342. 'public:local:media',
  343. 'public:remote',
  344. 'public:remote:media',
  345. 'hashtag',
  346. 'hashtag:local',
  347. ];
  348. /**
  349. * @param {any} req
  350. * @param {string} channelName
  351. * @return {Promise.<void>}
  352. */
  353. const checkScopes = (req, channelName) => new Promise((resolve, reject) => {
  354. log.silly(req.requestId, `Checking OAuth scopes for ${channelName}`);
  355. // When accessing public channels, no scopes are needed
  356. if (PUBLIC_CHANNELS.includes(channelName)) {
  357. resolve();
  358. return;
  359. }
  360. // The `read` scope has the highest priority, if the token has it
  361. // then it can access all streams
  362. const requiredScopes = ['read'];
  363. // When accessing specifically the notifications stream,
  364. // we need a read:notifications, while in all other cases,
  365. // we can allow access with read:statuses. Mind that the
  366. // user stream will not contain notifications unless
  367. // the token has either read or read:notifications scope
  368. // as well, this is handled separately.
  369. if (channelName === 'user:notification') {
  370. requiredScopes.push('read:notifications');
  371. } else {
  372. requiredScopes.push('read:statuses');
  373. }
  374. if (req.scopes && requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) {
  375. resolve();
  376. return;
  377. }
  378. const err = new Error('Access token does not cover required scopes');
  379. err.status = 401;
  380. reject(err);
  381. });
  382. /**
  383. * @param {any} info
  384. * @param {function(boolean, number, string): void} callback
  385. */
  386. const wsVerifyClient = (info, callback) => {
  387. // When verifying the websockets connection, we no longer pre-emptively
  388. // check OAuth scopes and drop the connection if they're missing. We only
  389. // drop the connection if access without token is not allowed by environment
  390. // variables. OAuth scope checks are moved to the point of subscription
  391. // to a specific stream.
  392. accountFromRequest(info.req, alwaysRequireAuth).then(() => {
  393. callback(true, undefined, undefined);
  394. }).catch(err => {
  395. log.error(info.req.requestId, err.toString());
  396. callback(false, 401, 'Unauthorized');
  397. });
  398. };
  399. /**
  400. * @typedef SystemMessageHandlers
  401. * @property {function(): void} onKill
  402. */
  403. /**
  404. * @param {any} req
  405. * @param {SystemMessageHandlers} eventHandlers
  406. * @return {function(string): void}
  407. */
  408. const createSystemMessageListener = (req, eventHandlers) => {
  409. return message => {
  410. const json = parseJSON(message, req);
  411. if (!json) return;
  412. const { event } = json;
  413. log.silly(req.requestId, `System message for ${req.accountId}: ${event}`);
  414. if (event === 'kill') {
  415. log.verbose(req.requestId, `Closing connection for ${req.accountId} due to expired access token`);
  416. eventHandlers.onKill();
  417. } else if (event === 'filters_changed') {
  418. log.verbose(req.requestId, `Invalidating filters cache for ${req.accountId}`);
  419. req.cachedFilters = null;
  420. }
  421. };
  422. };
  423. /**
  424. * @param {any} req
  425. * @param {any} res
  426. */
  427. const subscribeHttpToSystemChannel = (req, res) => {
  428. const accessTokenChannelId = `timeline:access_token:${req.accessTokenId}`;
  429. const systemChannelId = `timeline:system:${req.accountId}`;
  430. const listener = createSystemMessageListener(req, {
  431. onKill() {
  432. res.end();
  433. },
  434. });
  435. res.on('close', () => {
  436. unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  437. unsubscribe(`${redisPrefix}${systemChannelId}`, listener);
  438. });
  439. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  440. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  441. };
  442. /**
  443. * @param {any} req
  444. * @param {any} res
  445. * @param {function(Error=): void} next
  446. */
  447. const authenticationMiddleware = (req, res, next) => {
  448. if (req.method === 'OPTIONS') {
  449. next();
  450. return;
  451. }
  452. accountFromRequest(req, alwaysRequireAuth).then(() => checkScopes(req, channelNameFromPath(req))).then(() => {
  453. subscribeHttpToSystemChannel(req, res);
  454. }).then(() => {
  455. next();
  456. }).catch(err => {
  457. next(err);
  458. });
  459. };
  460. /**
  461. * @param {Error} err
  462. * @param {any} req
  463. * @param {any} res
  464. * @param {function(Error=): void} next
  465. */
  466. const errorMiddleware = (err, req, res, next) => {
  467. log.error(req.requestId, err.toString());
  468. if (res.headersSent) {
  469. next(err);
  470. return;
  471. }
  472. res.writeHead(err.status || 500, { 'Content-Type': 'application/json' });
  473. res.end(JSON.stringify({ error: err.status ? err.toString() : 'An unexpected error occurred' }));
  474. };
  475. /**
  476. * @param {array} arr
  477. * @param {number=} shift
  478. * @return {string}
  479. */
  480. const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
  481. /**
  482. * @param {string} listId
  483. * @param {any} req
  484. * @return {Promise.<void>}
  485. */
  486. const authorizeListAccess = (listId, req) => new Promise((resolve, reject) => {
  487. const { accountId } = req;
  488. pgPool.connect((err, client, done) => {
  489. if (err) {
  490. reject();
  491. return;
  492. }
  493. client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [listId], (err, result) => {
  494. done();
  495. if (err || result.rows.length === 0 || result.rows[0].account_id !== accountId) {
  496. reject();
  497. return;
  498. }
  499. resolve();
  500. });
  501. });
  502. });
  503. /**
  504. * @param {string[]} ids
  505. * @param {any} req
  506. * @param {function(string, string): void} output
  507. * @param {function(string[], function(string): void): void} attachCloseHandler
  508. * @param {boolean=} needsFiltering
  509. * @return {function(string): void}
  510. */
  511. const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
  512. const accountId = req.accountId || req.remoteAddress;
  513. log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
  514. const listener = message => {
  515. const json = parseJSON(message, req);
  516. if (!json) return;
  517. const { event, payload, queued_at } = json;
  518. const transmit = () => {
  519. const now = new Date().getTime();
  520. const delta = now - queued_at;
  521. const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
  522. log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
  523. output(event, encodedPayload);
  524. };
  525. // Only send local-only statuses to logged-in users
  526. if (payload.local_only && !req.accountId) {
  527. log.silly(req.requestId, `Message ${payload.id} filtered because it was local-only`);
  528. return;
  529. }
  530. // Only messages that may require filtering are statuses, since notifications
  531. // are already personalized and deletes do not matter
  532. if (!needsFiltering || event !== 'update') {
  533. transmit();
  534. return;
  535. }
  536. const unpackedPayload = payload;
  537. const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
  538. const accountDomain = unpackedPayload.account.acct.split('@')[1];
  539. if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
  540. log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
  541. return;
  542. }
  543. // When the account is not logged in, it is not necessary to confirm the block or mute
  544. if (!req.accountId) {
  545. transmit();
  546. return;
  547. }
  548. pgPool.connect((err, client, done) => {
  549. if (err) {
  550. log.error(err);
  551. return;
  552. }
  553. const queries = [
  554. client.query(`SELECT 1
  555. FROM blocks
  556. WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)}))
  557. OR (account_id = $2 AND target_account_id = $1)
  558. UNION
  559. SELECT 1
  560. FROM mutes
  561. WHERE account_id = $1
  562. AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
  563. ];
  564. if (accountDomain) {
  565. queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
  566. }
  567. if (!unpackedPayload.filtered && !req.cachedFilters) {
  568. queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
  569. }
  570. Promise.all(queries).then(values => {
  571. done();
  572. if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
  573. return;
  574. }
  575. if (!unpackedPayload.filtered && !req.cachedFilters) {
  576. const filterRows = values[accountDomain ? 2 : 1].rows;
  577. req.cachedFilters = filterRows.reduce((cache, row) => {
  578. if (cache[row.id]) {
  579. cache[row.id].keywords.push([row.keyword, row.whole_word]);
  580. } else {
  581. cache[row.id] = {
  582. keywords: [[row.keyword, row.whole_word]],
  583. expires_at: row.expires_at,
  584. repr: {
  585. id: row.id,
  586. title: row.title,
  587. context: row.context,
  588. expires_at: row.expires_at,
  589. filter_action: ['warn', 'hide'][row.filter_action],
  590. },
  591. };
  592. }
  593. return cache;
  594. }, {});
  595. Object.keys(req.cachedFilters).forEach((key) => {
  596. req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
  597. let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
  598. if (whole_word) {
  599. if (/^[\w]/.test(expr)) {
  600. expr = `\\b${expr}`;
  601. }
  602. if (/[\w]$/.test(expr)) {
  603. expr = `${expr}\\b`;
  604. }
  605. }
  606. return expr;
  607. }).join('|'), 'i');
  608. });
  609. }
  610. // Check filters
  611. if (req.cachedFilters && !unpackedPayload.filtered) {
  612. const status = unpackedPayload;
  613. const searchContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
  614. const searchIndex = JSDOM.fragment(searchContent).textContent;
  615. const now = new Date();
  616. payload.filtered = [];
  617. Object.values(req.cachedFilters).forEach((cachedFilter) => {
  618. if ((cachedFilter.expires_at === null || cachedFilter.expires_at > now)) {
  619. const keyword_matches = searchIndex.match(cachedFilter.regexp);
  620. if (keyword_matches) {
  621. payload.filtered.push({
  622. filter: cachedFilter.repr,
  623. keyword_matches,
  624. });
  625. }
  626. }
  627. });
  628. }
  629. transmit();
  630. }).catch(err => {
  631. log.error(err);
  632. done();
  633. });
  634. });
  635. };
  636. ids.forEach(id => {
  637. subscribe(`${redisPrefix}${id}`, listener);
  638. });
  639. if (attachCloseHandler) {
  640. attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
  641. }
  642. return listener;
  643. };
  644. /**
  645. * @param {any} req
  646. * @param {any} res
  647. * @return {function(string, string): void}
  648. */
  649. const streamToHttp = (req, res) => {
  650. const accountId = req.accountId || req.remoteAddress;
  651. res.setHeader('Content-Type', 'text/event-stream');
  652. res.setHeader('Cache-Control', 'no-store');
  653. res.setHeader('Transfer-Encoding', 'chunked');
  654. res.write(':)\n');
  655. const heartbeat = setInterval(() => res.write(':thump\n'), 15000);
  656. req.on('close', () => {
  657. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  658. clearInterval(heartbeat);
  659. });
  660. return (event, payload) => {
  661. res.write(`event: ${event}\n`);
  662. res.write(`data: ${payload}\n\n`);
  663. };
  664. };
  665. /**
  666. * @param {any} req
  667. * @param {function(): void} [closeHandler]
  668. * @return {function(string[]): void}
  669. */
  670. const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
  671. req.on('close', () => {
  672. ids.forEach(id => {
  673. unsubscribe(id);
  674. });
  675. if (closeHandler) {
  676. closeHandler();
  677. }
  678. });
  679. };
  680. /**
  681. * @param {any} req
  682. * @param {any} ws
  683. * @param {string[]} streamName
  684. * @return {function(string, string): void}
  685. */
  686. const streamToWs = (req, ws, streamName) => (event, payload) => {
  687. if (ws.readyState !== ws.OPEN) {
  688. log.error(req.requestId, 'Tried writing to closed socket');
  689. return;
  690. }
  691. ws.send(JSON.stringify({ stream: streamName, event, payload }));
  692. };
  693. /**
  694. * @param {any} res
  695. */
  696. const httpNotFound = res => {
  697. res.writeHead(404, { 'Content-Type': 'application/json' });
  698. res.end(JSON.stringify({ error: 'Not found' }));
  699. };
  700. app.use(setRequestId);
  701. app.use(setRemoteAddress);
  702. app.use(allowCrossDomain);
  703. app.get('/api/v1/streaming/health', (req, res) => {
  704. res.writeHead(200, { 'Content-Type': 'text/plain' });
  705. res.end('OK');
  706. });
  707. app.use(authenticationMiddleware);
  708. app.use(errorMiddleware);
  709. app.get('/api/v1/streaming/*', (req, res) => {
  710. channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
  711. const onSend = streamToHttp(req, res);
  712. const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
  713. streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering);
  714. }).catch(err => {
  715. log.verbose(req.requestId, 'Subscription error:', err.toString());
  716. httpNotFound(res);
  717. });
  718. });
  719. const wss = new WebSocket.Server({ server, verifyClient: wsVerifyClient });
  720. /**
  721. * @typedef StreamParams
  722. * @property {string} [tag]
  723. * @property {string} [list]
  724. * @property {string} [only_media]
  725. */
  726. /**
  727. * @param {any} req
  728. * @return {string[]}
  729. */
  730. const channelsForUserStream = req => {
  731. const arr = [`timeline:${req.accountId}`];
  732. if (isInScope(req, ['crypto']) && req.deviceId) {
  733. arr.push(`timeline:${req.accountId}:${req.deviceId}`);
  734. }
  735. if (isInScope(req, ['read', 'read:notifications'])) {
  736. arr.push(`timeline:${req.accountId}:notifications`);
  737. }
  738. return arr;
  739. };
  740. /**
  741. * See app/lib/ascii_folder.rb for the canon definitions
  742. * of these constants
  743. */
  744. const NON_ASCII_CHARS = 'ÀÁÂÃÄÅàáâãäåĀāĂ㥹ÇçĆćĈĉĊċČčÐðĎďĐđÈÉÊËèéêëĒēĔĕĖėĘęĚěĜĝĞğĠġĢģĤĥĦħÌÍÎÏìíîïĨĩĪīĬĭĮįİıĴĵĶķĸĹĺĻļĽľĿŀŁłÑñŃńŅņŇňʼnŊŋÒÓÔÕÖØòóôõöøŌōŎŏŐőŔŕŖŗŘřŚśŜŝŞşŠšſŢţŤťŦŧÙÚÛÜùúûüŨũŪūŬŭŮůŰűŲųŴŵÝýÿŶŷŸŹźŻżŽž';
  745. const EQUIVALENT_ASCII_CHARS = 'AAAAAAaaaaaaAaAaAaCcCcCcCcCcDdDdDdEEEEeeeeEeEeEeEeEeGgGgGgGgHhHhIIIIiiiiIiIiIiIiIiJjKkkLlLlLlLlLlNnNnNnNnnNnOOOOOOooooooOoOoOoRrRrRrSsSsSsSssTtTtTtUUUUuuuuUuUuUuUuUuUuWwYyyYyYZzZzZz';
  746. /**
  747. * @param {string} str
  748. * @return {string}
  749. */
  750. const foldToASCII = str => {
  751. const regex = new RegExp(NON_ASCII_CHARS.split('').join('|'), 'g');
  752. return str.replace(regex, match => {
  753. const index = NON_ASCII_CHARS.indexOf(match);
  754. return EQUIVALENT_ASCII_CHARS[index];
  755. });
  756. };
  757. /**
  758. * @param {string} str
  759. * @return {string}
  760. */
  761. const normalizeHashtag = str => {
  762. return foldToASCII(str.normalize('NFKC').toLowerCase()).replace(/[^\p{L}\p{N}_\u00b7\u200c]/gu, '');
  763. };
  764. /**
  765. * @param {any} req
  766. * @param {string} name
  767. * @param {StreamParams} params
  768. * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
  769. */
  770. const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
  771. switch (name) {
  772. case 'user':
  773. resolve({
  774. channelIds: channelsForUserStream(req),
  775. options: { needsFiltering: false },
  776. });
  777. break;
  778. case 'user:notification':
  779. resolve({
  780. channelIds: [`timeline:${req.accountId}:notifications`],
  781. options: { needsFiltering: false },
  782. });
  783. break;
  784. case 'public':
  785. resolve({
  786. channelIds: ['timeline:public'],
  787. options: { needsFiltering: true },
  788. });
  789. break;
  790. case 'public:local':
  791. resolve({
  792. channelIds: ['timeline:public:local'],
  793. options: { needsFiltering: true },
  794. });
  795. break;
  796. case 'public:remote':
  797. resolve({
  798. channelIds: ['timeline:public:remote'],
  799. options: { needsFiltering: true },
  800. });
  801. break;
  802. case 'public:media':
  803. resolve({
  804. channelIds: ['timeline:public:media'],
  805. options: { needsFiltering: true },
  806. });
  807. break;
  808. case 'public:local:media':
  809. resolve({
  810. channelIds: ['timeline:public:local:media'],
  811. options: { needsFiltering: true },
  812. });
  813. break;
  814. case 'public:remote:media':
  815. resolve({
  816. channelIds: ['timeline:public:remote:media'],
  817. options: { needsFiltering: true },
  818. });
  819. break;
  820. case 'direct':
  821. resolve({
  822. channelIds: [`timeline:direct:${req.accountId}`],
  823. options: { needsFiltering: false },
  824. });
  825. break;
  826. case 'hashtag':
  827. if (!params.tag || params.tag.length === 0) {
  828. reject('No tag for stream provided');
  829. } else {
  830. resolve({
  831. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`],
  832. options: { needsFiltering: true },
  833. });
  834. }
  835. break;
  836. case 'hashtag:local':
  837. if (!params.tag || params.tag.length === 0) {
  838. reject('No tag for stream provided');
  839. } else {
  840. resolve({
  841. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`],
  842. options: { needsFiltering: true },
  843. });
  844. }
  845. break;
  846. case 'list':
  847. authorizeListAccess(params.list, req).then(() => {
  848. resolve({
  849. channelIds: [`timeline:list:${params.list}`],
  850. options: { needsFiltering: false },
  851. });
  852. }).catch(() => {
  853. reject('Not authorized to stream this list');
  854. });
  855. break;
  856. default:
  857. reject('Unknown stream type');
  858. }
  859. });
  860. /**
  861. * @param {string} channelName
  862. * @param {StreamParams} params
  863. * @return {string[]}
  864. */
  865. const streamNameFromChannelName = (channelName, params) => {
  866. if (channelName === 'list') {
  867. return [channelName, params.list];
  868. } else if (['hashtag', 'hashtag:local'].includes(channelName)) {
  869. return [channelName, params.tag];
  870. } else {
  871. return [channelName];
  872. }
  873. };
  874. /**
  875. * @typedef WebSocketSession
  876. * @property {any} socket
  877. * @property {any} request
  878. * @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
  879. */
  880. /**
  881. * @param {WebSocketSession} session
  882. * @param {string} channelName
  883. * @param {StreamParams} params
  884. */
  885. const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) =>
  886. checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({
  887. channelIds,
  888. options,
  889. }) => {
  890. if (subscriptions[channelIds.join(';')]) {
  891. return;
  892. }
  893. const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
  894. const stopHeartbeat = subscriptionHeartbeat(channelIds);
  895. const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
  896. subscriptions[channelIds.join(';')] = {
  897. listener,
  898. stopHeartbeat,
  899. };
  900. }).catch(err => {
  901. log.verbose(request.requestId, 'Subscription error:', err.toString());
  902. socket.send(JSON.stringify({ error: err.toString() }));
  903. });
  904. /**
  905. * @param {WebSocketSession} session
  906. * @param {string} channelName
  907. * @param {StreamParams} params
  908. */
  909. const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) =>
  910. channelNameToIds(request, channelName, params).then(({ channelIds }) => {
  911. log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`);
  912. const subscription = subscriptions[channelIds.join(';')];
  913. if (!subscription) {
  914. return;
  915. }
  916. const { listener, stopHeartbeat } = subscription;
  917. channelIds.forEach(channelId => {
  918. unsubscribe(`${redisPrefix}${channelId}`, listener);
  919. });
  920. stopHeartbeat();
  921. delete subscriptions[channelIds.join(';')];
  922. }).catch(err => {
  923. log.verbose(request.requestId, 'Unsubscription error:', err);
  924. socket.send(JSON.stringify({ error: err.toString() }));
  925. });
  926. /**
  927. * @param {WebSocketSession} session
  928. */
  929. const subscribeWebsocketToSystemChannel = ({ socket, request, subscriptions }) => {
  930. const accessTokenChannelId = `timeline:access_token:${request.accessTokenId}`;
  931. const systemChannelId = `timeline:system:${request.accountId}`;
  932. const listener = createSystemMessageListener(request, {
  933. onKill() {
  934. socket.close();
  935. },
  936. });
  937. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  938. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  939. subscriptions[accessTokenChannelId] = {
  940. listener,
  941. stopHeartbeat: () => {
  942. },
  943. };
  944. subscriptions[systemChannelId] = {
  945. listener,
  946. stopHeartbeat: () => {
  947. },
  948. };
  949. };
  950. /**
  951. * @param {string|string[]} arrayOrString
  952. * @return {string}
  953. */
  954. const firstParam = arrayOrString => {
  955. if (Array.isArray(arrayOrString)) {
  956. return arrayOrString[0];
  957. } else {
  958. return arrayOrString;
  959. }
  960. };
  961. wss.on('connection', (ws, req) => {
  962. const location = url.parse(req.url, true);
  963. req.requestId = uuid.v4();
  964. req.remoteAddress = ws._socket.remoteAddress;
  965. ws.isAlive = true;
  966. ws.on('pong', () => {
  967. ws.isAlive = true;
  968. });
  969. /**
  970. * @type {WebSocketSession}
  971. */
  972. const session = {
  973. socket: ws,
  974. request: req,
  975. subscriptions: {},
  976. };
  977. const onEnd = () => {
  978. const keys = Object.keys(session.subscriptions);
  979. keys.forEach(channelIds => {
  980. const { listener, stopHeartbeat } = session.subscriptions[channelIds];
  981. channelIds.split(';').forEach(channelId => {
  982. unsubscribe(`${redisPrefix}${channelId}`, listener);
  983. });
  984. stopHeartbeat();
  985. });
  986. };
  987. ws.on('close', onEnd);
  988. ws.on('error', onEnd);
  989. ws.on('message', data => {
  990. const json = parseJSON(data, session.request);
  991. if (!json) return;
  992. const { type, stream, ...params } = json;
  993. if (type === 'subscribe') {
  994. subscribeWebsocketToChannel(session, firstParam(stream), params);
  995. } else if (type === 'unsubscribe') {
  996. unsubscribeWebsocketFromChannel(session, firstParam(stream), params);
  997. } else {
  998. // Unknown action type
  999. }
  1000. });
  1001. subscribeWebsocketToSystemChannel(session);
  1002. if (location.query.stream) {
  1003. subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query);
  1004. }
  1005. });
  1006. setInterval(() => {
  1007. wss.clients.forEach(ws => {
  1008. if (ws.isAlive === false) {
  1009. ws.terminate();
  1010. return;
  1011. }
  1012. ws.isAlive = false;
  1013. ws.ping('', false);
  1014. });
  1015. }, 30000);
  1016. attachServerWithConfig(server, address => {
  1017. log.warn(`Worker ${workerId} now listening on ${address}`);
  1018. });
  1019. const onExit = () => {
  1020. log.warn(`Worker ${workerId} exiting`);
  1021. server.close();
  1022. process.exit(0);
  1023. };
  1024. const onError = (err) => {
  1025. log.error(err);
  1026. server.close();
  1027. process.exit(0);
  1028. };
  1029. process.on('SIGINT', onExit);
  1030. process.on('SIGTERM', onExit);
  1031. process.on('exit', onExit);
  1032. process.on('uncaughtException', onError);
  1033. };
  1034. /**
  1035. * @param {any} server
  1036. * @param {function(string): void} [onSuccess]
  1037. */
  1038. const attachServerWithConfig = (server, onSuccess) => {
  1039. if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
  1040. server.listen(process.env.SOCKET || process.env.PORT, () => {
  1041. if (onSuccess) {
  1042. fs.chmodSync(server.address(), 0o666);
  1043. onSuccess(server.address());
  1044. }
  1045. });
  1046. } else {
  1047. server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => {
  1048. if (onSuccess) {
  1049. onSuccess(`${server.address().address}:${server.address().port}`);
  1050. }
  1051. });
  1052. }
  1053. };
  1054. /**
  1055. * @param {function(Error=): void} onSuccess
  1056. */
  1057. const onPortAvailable = onSuccess => {
  1058. const testServer = http.createServer();
  1059. testServer.once('error', err => {
  1060. onSuccess(err);
  1061. });
  1062. testServer.once('listening', () => {
  1063. testServer.once('close', () => onSuccess());
  1064. testServer.close();
  1065. });
  1066. attachServerWithConfig(testServer);
  1067. };
  1068. onPortAvailable(err => {
  1069. if (err) {
  1070. log.error('Could not start server, the port or socket is in use');
  1071. return;
  1072. }
  1073. throng({
  1074. workers: numWorkers,
  1075. lifetime: Infinity,
  1076. start: startWorker,
  1077. master: startMaster,
  1078. });
  1079. });