diff --git a/spec/ParseLiveQuery.spec.js b/spec/ParseLiveQuery.spec.js index 7ce757a17b..039f079920 100644 --- a/spec/ParseLiveQuery.spec.js +++ b/spec/ParseLiveQuery.spec.js @@ -1308,4 +1308,32 @@ describe('ParseLiveQuery', function () { await new Promise(resolve => setTimeout(resolve, 100)); expect(createSpy).toHaveBeenCalledTimes(1); }); + + it_id('a1b7fa01-877e-46e2-9601-d312ebb9b33a')(fit)('handles query include', async done => { + await reconfigureServer({ + liveQuery: { + classNames: ['TestObject'], + }, + startLiveQueryServer: true, + verbose: false, + silent: true, + }); + + const user = new Parse.User(); + user.setUsername('user'); + user.setPassword('pass'); + await user.signUp(); + + const query = new Parse.Query('TestObject'); + query.include('user'); + const subscription = await query.subscribe(); + subscription.on('create', obj => { + expect(obj.get('user').get('username')).toBe('user'); + done(); + }); + + const obj = new Parse.Object('TestObject'); + obj.set('user', user); + await obj.save(); + }); }); diff --git a/src/LiveQuery/ParseLiveQueryServer.ts b/src/LiveQuery/ParseLiveQueryServer.ts index 3e6048c345..33fe1e7e79 100644 --- a/src/LiveQuery/ParseLiveQueryServer.ts +++ b/src/LiveQuery/ParseLiveQueryServer.ts @@ -1,30 +1,31 @@ -import tv4 from 'tv4'; import Parse from 'parse/node'; -import { Subscription } from './Subscription'; +import tv4 from 'tv4'; import { Client } from './Client'; import { ParseWebSocketServer } from './ParseWebSocketServer'; +import { Subscription } from './Subscription'; // @ts-ignore -import logger from '../logger'; -import RequestSchema from './RequestSchema'; -import { matchesQuery, queryHash } from './QueryTools'; -import { ParsePubSub } from './ParsePubSub'; -import SchemaController from '../Controllers/SchemaController'; +import deepcopy from 'deepcopy'; import _ from 'lodash'; +import { LRUCache as LRU } from 'lru-cache'; +import { isDeepStrictEqual } from 'util'; import { v4 as uuidv4 } from 'uuid'; +import { Auth, getAuthForSessionToken, master as masterAuth } from '../Auth'; +import { getCacheController, getDatabaseController } from '../Controllers'; +import DatabaseController from '../Controllers/DatabaseController'; +import SchemaController from '../Controllers/SchemaController'; +import logger from '../logger'; +import RestQuery from '../RestQuery'; +import UserRouter from '../Routers/UsersRouter'; import { - runLiveQueryEventHandlers, getTrigger, - runTrigger, resolveError, + runLiveQueryEventHandlers, + runTrigger, toJSONwithObjects, } from '../triggers'; -import { getAuthForSessionToken, Auth } from '../Auth'; -import { getCacheController, getDatabaseController } from '../Controllers'; -import { LRUCache as LRU } from 'lru-cache'; -import UserRouter from '../Routers/UsersRouter'; -import DatabaseController from '../Controllers/DatabaseController'; -import { isDeepStrictEqual } from 'util'; -import deepcopy from 'deepcopy'; +import { ParsePubSub } from './ParsePubSub'; +import { matchesQuery, queryHash } from './QueryTools'; +import RequestSchema from './RequestSchema'; class ParseLiveQueryServer { server: any; @@ -241,6 +242,7 @@ class ParseLiveQueryServer { } if (res.object && typeof res.object.toJSON === 'function') { deletedParseObject = toJSONwithObjects(res.object, res.object.className || className); + deletedParseObject = await this._applyInclude(client, requestId, deletedParseObject); } await this._filterSensitiveData( classLevelPermissions, @@ -391,14 +393,17 @@ class ParseLiveQueryServer { if (!res.sendEvent) { return; } - if (res.object && typeof res.object.toJSON === 'function') { - currentParseObject = toJSONwithObjects(res.object, res.object.className || className); + if (res.object) { + if (typeof res.object.toJSON === 'function') { + currentParseObject = toJSONwithObjects(res.object, res.object.className || className); + } + currentParseObject = await this._applyInclude(client, requestId, currentParseObject); } - if (res.original && typeof res.original.toJSON === 'function') { - originalParseObject = toJSONwithObjects( - res.original, - res.original.className || className - ); + if (res.original) { + if (typeof res.original.toJSON === 'function') { + originalParseObject = toJSONwithObjects(res.original, res.original.className || className); + } + originalParseObject = await this._applyInclude(client, requestId, originalParseObject); } await this._filterSensitiveData( classLevelPermissions, @@ -553,7 +558,7 @@ class ParseLiveQueryServer { } } - getAuthForSessionToken(sessionToken?: string): Promise<{ auth?: Auth, userId?: string }> { + getAuthForSessionToken(sessionToken?: string): Promise<{ auth?: Auth; userId?: string }> { if (!sessionToken) { return Promise.resolve({}); } @@ -674,6 +679,24 @@ class ParseLiveQueryServer { res.original = filter(res.original); } + async _applyInclude(client: any, requestId: number, object: any) { + const subscriptionInfo = client.getSubscriptionInfo(requestId); + if (!object || !subscriptionInfo) { + return object; + } + const include = subscriptionInfo.include; + if (!include || include.length === 0) { + return object; + } + const restOptions: any = {}; + if (subscriptionInfo.keys) { + restOptions.keys = Array.isArray(subscriptionInfo.keys) + ? subscriptionInfo.keys.join(',') + : subscriptionInfo.keys; + } + return this.includeObject(this.config, object, include, {}, restOptions, masterAuth(this.config)); + } + _getCLPOperation(query: any) { return typeof query === 'object' && Object.keys(query).length == 1 && @@ -933,6 +956,11 @@ class ParseLiveQueryServer { ? request.query.keys : request.query.keys.split(','); } + if (request.query.include) { + subscriptionInfo.include = Array.isArray(request.query.include) + ? request.query.include + : request.query.include.split(','); + } if (request.query.watch) { subscriptionInfo.watch = request.query.watch; } @@ -1056,6 +1084,192 @@ class ParseLiveQueryServer { `Delete client: ${parseWebsocket.clientId} | subscription: ${request.requestId}` ); } + + async includePath( + config: any, + auth: any, + response: any, + path: Array, + context: any, + restOptions: any = {}, + ) { + const pointers = this.findPointers(response.results, path); + if (pointers.length === 0) { + return response; + } + const pointersHash: any = {}; + for (const pointer of pointers) { + if (!pointer) { + continue; + } + const className = pointer.className; + if (className) { + pointersHash[className] = pointersHash[className] || new Set(); + pointersHash[className].add(pointer.objectId); + } + } + const includeRestOptions: any = {}; + if (restOptions.keys) { + const keys = new Set(restOptions.keys.split(',')); + const keySet = Array.from(keys).reduce((set, key) => { + const keyPath = key.split('.'); + let i = 0; + for (; i < path.length; i++) { + if (path[i] != keyPath[i]) { + return set; + } + } + if (i < keyPath.length) { + set.add(keyPath[i]); + } + return set; + }, new Set()); + if (keySet.size > 0) { + includeRestOptions.keys = Array.from(keySet).join(','); + } + } + + if (restOptions.excludeKeys) { + const excludeKeys = new Set(restOptions.excludeKeys.split(',')); + const excludeKeySet = Array.from(excludeKeys).reduce((set, key) => { + const keyPath = key.split('.'); + let i = 0; + for (; i < path.length; i++) { + if (path[i] != keyPath[i]) { + return set; + } + } + if (i == keyPath.length - 1) { + set.add(keyPath[i]); + } + return set; + }, new Set()); + if (excludeKeySet.size > 0) { + includeRestOptions.excludeKeys = Array.from(excludeKeySet).join(','); + } + } + + if (restOptions.includeReadPreference) { + includeRestOptions.readPreference = restOptions.includeReadPreference; + includeRestOptions.includeReadPreference = restOptions.includeReadPreference; + } else if (restOptions.readPreference) { + includeRestOptions.readPreference = restOptions.readPreference; + } + + const queryPromises = Object.keys(pointersHash).map(async className => { + const objectIds = Array.from(pointersHash[className]); + let where; + if (objectIds.length === 1) { + where = { objectId: objectIds[0] }; + } else { + where = { objectId: { $in: objectIds } }; + } + const query = await RestQuery({ + method: objectIds.length === 1 ? RestQuery.Method.get : RestQuery.Method.find, + config, + auth, + className, + restWhere: where, + restOptions: includeRestOptions, + context: context, + }); + return query.execute({ op: 'get' }).then(results => { + results.className = className; + return Promise.resolve(results); + }); + }); + + const responses = await Promise.all(queryPromises); + const replace = responses.reduce((acc, includeResponse) => { + for (const obj of includeResponse.results) { + obj.__type = 'Object'; + obj.className = includeResponse.className; + if (obj.className === '_User' && !auth.isMaster) { + delete obj.sessionToken; + delete obj.authData; + } + acc[obj.objectId] = obj; + } + return acc; + }, {} as any); + + const resp: any = { + results: this.replacePointers(response.results, path, replace), + }; + if (response.count) { + resp.count = response.count; + } + return resp; + } + + findPointers(object: any, path: Array): any[] { + if (object instanceof Array) { + return object.map(x => this.findPointers(x, path)).flat(); + } + if (typeof object !== 'object' || !object) { + return []; + } + if (path.length === 0) { + if (object === null || object.__type === 'Pointer') { + return [object]; + } + return []; + } + const subObject = object[path[0]]; + if (!subObject) { + return []; + } + return this.findPointers(subObject, path.slice(1)); + } + + replacePointers(object: any, path: Array, replace: any): any { + if (object instanceof Array) { + return object + .map(obj => this.replacePointers(obj, path, replace)) + .filter(obj => typeof obj !== 'undefined'); + } + if (typeof object !== 'object' || !object) { + return object; + } + if (path.length === 0) { + if (object && object.__type === 'Pointer') { + return replace[object.objectId]; + } + return object; + } + const subObject = object[path[0]]; + if (!subObject) { + return object; + } + const newSub = this.replacePointers(subObject, path.slice(1), replace); + const answer: any = {}; + for (const key in object) { + if (key === path[0]) { + answer[key] = newSub; + } else { + answer[key] = object[key]; + } + } + return answer; + } + + async includeObject( + config: any, + object: any, + include: Array, + context: any, + restOptions: any, + auth: any + ) { + if (!include || include.length === 0) { + return object; + } + let response = { results: [object] } as any; + for (const path of include) { + response = await this.includePath(config, auth, response, path.split('.'), context, restOptions); + } + return response.results[0]; + } } export { ParseLiveQueryServer }; diff --git a/src/LiveQuery/RequestSchema.js b/src/LiveQuery/RequestSchema.js index 6e0a0566b2..3f1aec59e2 100644 --- a/src/LiveQuery/RequestSchema.js +++ b/src/LiveQuery/RequestSchema.js @@ -78,6 +78,14 @@ const subscribe = { minItems: 1, uniqueItems: true, }, + include: { + type: 'array', + items: { + type: 'string' + }, + minItems: 1, + uniqueItems: true + }, }, required: ['where', 'className'], additionalProperties: false, @@ -124,6 +132,14 @@ const update = { minItems: 1, uniqueItems: true, }, + include: { + type: 'array', + items: { + type: 'string' + }, + minItems: 1, + uniqueItems: true + }, }, required: ['where', 'className'], additionalProperties: false, diff --git a/types/LiveQuery/ParseLiveQueryServer.d.ts b/types/LiveQuery/ParseLiveQueryServer.d.ts index d5966144b5..2b351d95db 100644 --- a/types/LiveQuery/ParseLiveQueryServer.d.ts +++ b/types/LiveQuery/ParseLiveQueryServer.d.ts @@ -25,6 +25,7 @@ declare class ParseLiveQueryServer { }>; _matchesCLP(classLevelPermissions?: any, object?: any, client?: any, requestId?: number, op?: string): Promise; _filterSensitiveData(classLevelPermissions?: any, res?: any, client?: any, requestId?: number, op?: string, query?: any): Promise; + _applyInclude(client: any, requestId: number, object: any): Promise; _getCLPOperation(query: any): "get" | "find"; _verifyACL(acl: any, token: string): Promise; getAuthFromClient(client: any, requestId: number, sessionToken?: string): Promise; @@ -36,5 +37,9 @@ declare class ParseLiveQueryServer { _handleSubscribe(parseWebsocket: any, request: any): Promise; _handleUpdateSubscription(parseWebsocket: any, request: any): any; _handleUnsubscribe(parseWebsocket: any, request: any, notifyClient?: boolean): any; + includePath(config: any, auth: any, response: any, path: Array, context: any, restOptions?: any): Promise; + findPointers(object: any, path: Array): any[]; + replacePointers(object: any, path: Array, replace: any): any; + includeObject(config: any, object: any, include: Array, context: any, restOptions: any, auth: any): Promise; } export { ParseLiveQueryServer };