Skip to content

Commit cbb01d6

Browse files
committed
wip
1 parent 8647d85 commit cbb01d6

File tree

3 files changed

+102
-1
lines changed

3 files changed

+102
-1
lines changed

app/common/ConfigAPI.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export class ConfigAPI extends BaseAPI {
2626
}
2727

2828
public async healthcheck(): Promise<void> {
29-
const resp = await this.request(`${this._homeUrl}/status?ready=1`);
29+
const resp = await this.request(`${this._homeUrl}/status?allInstancesReady=1`);
3030
if (!resp.ok) {
3131
throw new Error(await resp.text());
3232
}

app/server/lib/FlexServer.ts

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ import {AddressInfo} from 'net';
101101
import fetch from 'node-fetch';
102102
import * as path from 'path';
103103
import * as serveStatic from 'serve-static';
104+
import {v4 as uuidv4} from 'uuid';
104105

105106
// Health checks are a little noisy in the logs, so we don't show them all.
106107
// We show the first N health checks:
@@ -127,6 +128,29 @@ export interface FlexServerOptions {
127128
settings?: IGristCoreConfig;
128129
}
129130

131+
// Not to be confused with health checks from the frontend, these
132+
// request/response pairs are internal checks between Grist instances
133+
// in multi-server environments
134+
interface ServerHealthcheckRequest {
135+
id: string;
136+
checkReady: boolean;
137+
}
138+
interface ServerHealthcheckResponse {
139+
instanceId: string;
140+
requestId: string;
141+
healthy: boolean;
142+
}
143+
144+
// For keeping track of pending health checks for all other servers
145+
// for each request that was broadcast to all of them.
146+
interface PendingServerHealthCheck {
147+
expectedCount: number;
148+
responses: Record<string, boolean>;
149+
resolve: (res: boolean) => void;
150+
reject: (err: Error) => void;
151+
timeout: NodeJS.Timeout;
152+
}
153+
130154
const noop: express.RequestHandler = (req, res, next) => next();
131155

132156
export class FlexServer implements GristServer {
@@ -213,6 +237,8 @@ export class FlexServer implements GristServer {
213237
private _emitNotifier = new EmitNotifier();
214238
private _testPendingNotifications: number = 0;
215239
private _latestVersionAvailable?: LatestVersionAvailable;
240+
private _instanceId: string;
241+
private _pendingServerHealthChecks: Map<string, PendingServerHealthCheck>;
216242

217243
constructor(public port: number, public name: string = 'flexServer',
218244
public readonly options: FlexServerOptions = {}) {
@@ -274,6 +300,9 @@ export class FlexServer implements GristServer {
274300
this.setLatestVersionAvailable(latestVersionAvailable);
275301
});
276302

303+
this._pendingServerHealthChecks = new Map<string, PendingServerHealthCheck>();
304+
this._registerGristInstance();
305+
277306
// The electron build is not supported at this time, but this stub
278307
// implementation of electronServerMethods is present to allow kicking
279308
// its tires.
@@ -600,6 +629,34 @@ export class FlexServer implements GristServer {
600629
if (isParameterOn(req.query.ready)) {
601630
checks.set('ready', this._isReady);
602631
}
632+
if (isParameterOn(req.query.allInstancesReady)) {
633+
const requestId = uuidv4();
634+
const client = this._pubSubManager.getClient();
635+
636+
// If there is no redis, then our current instance is the only instance
637+
const allInstances = await client?.smembers('grist-instances') || [this._instanceId];
638+
639+
const allInstancesPromise: Promise<boolean> = new Promise((resolve, reject) => {
640+
const allInstancesTimeout = setTimeout(() => {
641+
reject(new Error('Timeout waiting for health responses'));
642+
this._pendingServerHealthChecks.delete(requestId);
643+
}, timeout);
644+
645+
this._pendingServerHealthChecks.set(requestId, {
646+
responses: {},
647+
expectedCount: allInstances.length,
648+
resolve,
649+
reject,
650+
timeout: allInstancesTimeout,
651+
});
652+
});
653+
const request: ServerHealthcheckRequest = {
654+
id: requestId,
655+
checkReady: true
656+
};
657+
await this._pubSubManager.publish('healthcheck:requests', JSON.stringify(request));
658+
checks.set('allInstancesReady', allInstancesPromise);
659+
}
603660
let extra = '';
604661
let ok = true;
605662
// If we had any extra check, collect their status to report them.
@@ -1061,6 +1118,7 @@ export class FlexServer implements GristServer {
10611118
if (this.httpsServer) { this.httpsServer.close(); }
10621119
if (this.housekeeper) { await this.housekeeper.stop(); }
10631120
if (this._jobs) { await this._jobs.stop(); }
1121+
await this._pubSubManager.getClient()?.srem('grist-instances', [this._instanceId]);
10641122
await this._shutdown();
10651123
if (this._accessTokens) { await this._accessTokens.close(); }
10661124
// Do this after _shutdown, since DocWorkerMap is used during shutdown.
@@ -2693,6 +2751,40 @@ export class FlexServer implements GristServer {
26932751
},
26942752
});
26952753
}
2754+
2755+
private _registerGristInstance() {
2756+
this._instanceId = process.env.GRIST_INSTANCE_ID || `testInsanceId_${this.port}`;
2757+
this._pubSubManager.getClient()?.sadd('grist-instances', this._instanceId).catch((err) => {
2758+
log.error('Failed to contact redis', err);
2759+
});
2760+
this._pubSubManager.subscribe('healthcheck:requests', async (message) => {
2761+
const request: ServerHealthcheckRequest = JSON.parse(message);
2762+
const response: ServerHealthcheckResponse = {
2763+
instanceId: this._instanceId || '',
2764+
requestId: request.id,
2765+
healthy: !request.checkReady || this._isReady,
2766+
};
2767+
log.debug(`Healthcheck request`, response);
2768+
await this._pubSubManager.publish('healthcheck:responses', JSON.stringify(response));
2769+
});
2770+
2771+
this._pubSubManager.subscribe('healthcheck:responses', (message) => {
2772+
const response: ServerHealthcheckResponse = JSON.parse(message);
2773+
const pending = this._pendingServerHealthChecks.get(response.requestId);
2774+
if (!pending) {
2775+
return;
2776+
}
2777+
2778+
pending.responses[response.instanceId] = response.healthy;
2779+
2780+
if (Object.keys(pending.responses).length === pending.expectedCount) {
2781+
// All servers have replied. Make it known and clean up.
2782+
clearTimeout(pending.timeout);
2783+
pending.resolve(Object.values(pending.responses).every(e => e));
2784+
this._pendingServerHealthChecks.delete(response.requestId);
2785+
}
2786+
});
2787+
}
26962788
}
26972789

26982790
/**

app/server/lib/PubSubManager.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export interface IPubSubManager {
2929
close(): Promise<void>;
3030
subscribe(channel: string, callback: Callback): UnsubscribeCallback;
3131
publish(channel: string, message: string): Promise<void>;
32+
getClient(): IORedis|undefined;
3233
}
3334

3435
export type Callback = (message: string) => void;
@@ -72,6 +73,7 @@ abstract class PubSubManagerBase implements IPubSubManager {
7273
* - In Redis, the channel gets prefixed with getPubSubPrefix() to scope it to the current Redis DB.
7374
*/
7475
public abstract publish(channel: string, message: string): Promise<void>;
76+
public abstract getClient(): IORedis|undefined;
7577

7678
protected abstract _redisSubscribe(channel: string): Promise<void>;
7779
protected abstract _redisUnsubscribe(channel: string): Promise<void>;
@@ -96,6 +98,7 @@ abstract class PubSubManagerBase implements IPubSubManager {
9698

9799
class PubSubManagerNoRedis extends PubSubManagerBase {
98100
public async publish(channel: string, message: string) { this._deliverMessage(channel, message); }
101+
public getClient(): IORedis|undefined { return; }
99102
protected async _redisSubscribe(channel: string): Promise<void> {}
100103
protected async _redisUnsubscribe(channel: string): Promise<void> {}
101104
}
@@ -131,6 +134,12 @@ class PubSubManagerRedis extends PubSubManagerBase {
131134
await this._redisPub.publish(this._prefixChannel(channel), message);
132135
}
133136

137+
public getClient(): IORedis|undefined {
138+
// The redisSub client is already tied listening to a channel, but
139+
// the redisPub is "free" for the client to mess around with.
140+
return this._redisPub;
141+
}
142+
134143
protected async _redisSubscribe(channel: string): Promise<void> {
135144
await this._redisSub.subscribe(this._prefixChannel(channel));
136145
}

0 commit comments

Comments
 (0)