Skip to content

Commit 6911c03

Browse files
committed
wip
1 parent 8647d85 commit 6911c03

File tree

11 files changed

+308
-20
lines changed

11 files changed

+308
-20
lines changed

app/common/ConfigAPI.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export class ConfigAPI extends BaseAPI {
2626
}
2727

2828
public async healthcheck(): Promise<void> {
29-
const resp = await this.request(`${this._homeUrl}/status?ready=1`);
29+
const resp = await this.request(`${this._homeUrl}/status?allInstancesReady=1`);
3030
if (!resp.ok) {
3131
throw new Error(await resp.text());
3232
}

app/server/MergedServer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ export class MergedServer {
206206
await this.flexServer.finalizePlugins(this.hasComponent("home") ? checkUserContentPort() : null);
207207
this.flexServer.checkOptionCombinations();
208208
this.flexServer.summary();
209-
this.flexServer.setReady(true);
209+
this.flexServer.ready = true;
210210

211211
if (this._options.extraWorkers) {
212212
if (!process.env.REDIS_URL) {

app/server/lib/FlexServer.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ import {AddressInfo} from 'net';
101101
import fetch from 'node-fetch';
102102
import * as path from 'path';
103103
import * as serveStatic from 'serve-static';
104+
import { HealthChecker } from './HealthChecker';
104105

105106
// Health checks are a little noisy in the logs, so we don't show them all.
106107
// We show the first N health checks:
@@ -213,6 +214,7 @@ export class FlexServer implements GristServer {
213214
private _emitNotifier = new EmitNotifier();
214215
private _testPendingNotifications: number = 0;
215216
private _latestVersionAvailable?: LatestVersionAvailable;
217+
private _healthChecker: HealthChecker;
216218

217219
constructor(public port: number, public name: string = 'flexServer',
218220
public readonly options: FlexServerOptions = {}) {
@@ -274,6 +276,8 @@ export class FlexServer implements GristServer {
274276
this.setLatestVersionAvailable(latestVersionAvailable);
275277
});
276278

279+
this._healthChecker = new HealthChecker(this);
280+
277281
// The electron build is not supported at this time, but this stub
278282
// implementation of electronServerMethods is present to allow kicking
279283
// its tires.
@@ -600,6 +604,11 @@ export class FlexServer implements GristServer {
600604
if (isParameterOn(req.query.ready)) {
601605
checks.set('ready', this._isReady);
602606
}
607+
if (isParameterOn(req.query.allInstancesReady)) {
608+
checks.set('allInstancesReady',
609+
this._healthChecker.allServersOkay(timeout).then(() => true).catch(() => false)
610+
);
611+
}
603612
let extra = '';
604613
let ok = true;
605614
// If we had any extra check, collect their status to report them.
@@ -1061,6 +1070,7 @@ export class FlexServer implements GristServer {
10611070
if (this.httpsServer) { this.httpsServer.close(); }
10621071
if (this.housekeeper) { await this.housekeeper.stop(); }
10631072
if (this._jobs) { await this._jobs.stop(); }
1073+
await this._healthChecker.close();
10641074
await this._shutdown();
10651075
if (this._accessTokens) { await this._accessTokens.close(); }
10661076
// Do this after _shutdown, since DocWorkerMap is used during shutdown.
@@ -1892,7 +1902,7 @@ export class FlexServer implements GristServer {
18921902
}
18931903
}
18941904

1895-
public setReady(value: boolean) {
1905+
public set ready(value: boolean) {
18961906
if(value) {
18971907
log.debug('FlexServer is ready');
18981908
} else {
@@ -1901,6 +1911,10 @@ export class FlexServer implements GristServer {
19011911
this._isReady = value;
19021912
}
19031913

1914+
public get ready() {
1915+
return this._isReady;
1916+
}
1917+
19041918
public checkOptionCombinations() {
19051919
// Check for some bad combinations we should warn about.
19061920
const allowedWebhookDomains = appSettings.section('integrations').flag('allowedWebhookDomains').readString({

app/server/lib/GristServer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ export interface StorageCoordinator {
5252
export interface GristServer extends StorageCoordinator {
5353
readonly create: ICreate;
5454
readonly testPending: boolean;
55+
ready: boolean;
5556
settings?: IGristCoreConfig;
5657
getHost(): string;
5758
getHomeUrl(req: express.Request, relPath?: string): string;
@@ -103,7 +104,6 @@ export interface GristServer extends StorageCoordinator {
103104
isRestrictedMode(): boolean;
104105
onUserChange(callback: (change: UserChange) => Promise<void>): void;
105106
onStreamingDestinationsChange(callback: (orgId?: number) => Promise<void>): void;
106-
setReady(value: boolean): void;
107107
}
108108

109109
export interface GristLoginSystem {
@@ -163,6 +163,7 @@ export function createDummyGristServer(): GristServer {
163163
return {
164164
create,
165165
testPending: false,
166+
ready: true,
166167
settings: loadGristCoreConfig(),
167168
getHost() { return 'localhost:4242'; },
168169
getHomeUrl() { return 'http://localhost:4242'; },
@@ -214,7 +215,6 @@ export function createDummyGristServer(): GristServer {
214215
onUserChange() { /* do nothing */ },
215216
onStreamingDestinationsChange() { /* do nothing */ },
216217
hardDeleteDoc() { return Promise.resolve(); },
217-
setReady() { /* do nothing */ },
218218
};
219219
}
220220

app/server/lib/HealthChecker.ts

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import {GristServer} from 'app/server/lib/GristServer';
2+
import log from 'app/server/lib/log';
3+
import {createPubSubManager, IPubSubManager} from 'app/server/lib/PubSubManager';
4+
import * as shutdown from 'app/server/lib/shutdown';
5+
6+
import {v4 as uuidv4} from 'uuid';
7+
8+
// Not to be confused with health checks from the frontend, these
9+
// request/response pairs are internal checks between Grist instances
10+
// in multi-server environments
11+
interface ServerHealthcheckRequest {
12+
id: string;
13+
checkReady: boolean;
14+
}
15+
interface ServerHealthcheckResponse {
16+
instanceId: string;
17+
requestId: string;
18+
healthy: boolean;
19+
}
20+
21+
// For keeping track of pending health checks for all other servers
22+
// for each request that was broadcast to all of them.
23+
interface PendingServerHealthCheck {
24+
expectedCount: number;
25+
responses: Record<string, boolean>;
26+
resolve: (res: boolean) => void;
27+
reject: (err: Error) => void;
28+
timeout: NodeJS.Timeout;
29+
}
30+
31+
export class HealthChecker {
32+
private _pendingServerHealthChecks: Map<string, PendingServerHealthCheck>;
33+
private _serverInstanceID: string;
34+
private _pubSubManager: IPubSubManager;
35+
36+
constructor(
37+
private _server: GristServer
38+
) {
39+
this._pubSubManager = createPubSubManager(process.env.REDIS_URL);
40+
this._pendingServerHealthChecks = new Map<string, PendingServerHealthCheck>();
41+
this._serverInstanceID = process.env.GRIST_INSTANCE_ID || `testInsanceId_${this._server.getHost()}`;
42+
this._pubSubManager.getClient()?.sadd('grist-instances', this._serverInstanceID).catch((err) => {
43+
log.error('Failed to contact redis', err);
44+
});
45+
this._subscribeToChannels();
46+
47+
// Make sure we clean up our Redis mess, if any, even if we exit
48+
// by signal.
49+
shutdown.addCleanupHandler(null, () => this.close());
50+
}
51+
52+
public async allServersOkay(timeout: number) {
53+
const requestId = uuidv4();
54+
const client = this._pubSubManager.getClient();
55+
56+
// If there is no Redis, then our current instance is the only instance
57+
const allInstances = await client?.smembers('grist-instances') || [this._serverInstanceID];
58+
59+
const allInstancesPromise: Promise<boolean> = new Promise((resolve, reject) => {
60+
const allInstancesTimeout = setTimeout(() => {
61+
log.warn('allServersOkay: timeout waiting for responses');
62+
reject(new Error('Timeout waiting for health responses'));
63+
this._pendingServerHealthChecks.delete(requestId);
64+
}, timeout);
65+
66+
this._pendingServerHealthChecks.set(requestId, {
67+
responses: {},
68+
expectedCount: allInstances.length,
69+
resolve,
70+
reject,
71+
timeout: allInstancesTimeout,
72+
});
73+
});
74+
const request: ServerHealthcheckRequest = {
75+
id: requestId,
76+
checkReady: true
77+
};
78+
await this._pubSubManager.publish('healthcheck:requests', JSON.stringify(request));
79+
return allInstancesPromise;
80+
}
81+
82+
public async close() {
83+
await this._pubSubManager.getClient()?.srem('grist-instances', [this._serverInstanceID]);
84+
await this._pubSubManager.close();
85+
}
86+
87+
private _subscribeToChannels() {
88+
this._pubSubManager.subscribe('healthcheck:requests', async (message) => {
89+
const request: ServerHealthcheckRequest = JSON.parse(message);
90+
const response: ServerHealthcheckResponse = {
91+
instanceId: this._serverInstanceID|| '',
92+
requestId: request.id,
93+
healthy: !request.checkReady || this._server.ready,
94+
};
95+
log.debug('allServersOkay request', response);
96+
await this._pubSubManager.publish('healthcheck:responses', JSON.stringify(response));
97+
});
98+
99+
this._pubSubManager.subscribe('healthcheck:responses', (message) => {
100+
const response: ServerHealthcheckResponse = JSON.parse(message);
101+
const pending = this._pendingServerHealthChecks.get(response.requestId);
102+
if (!pending) {
103+
// This instance didn't broadcast a health check request with
104+
// this requestId, so nothing to do.
105+
return;
106+
}
107+
108+
pending.responses[response.instanceId] = response.healthy;
109+
log.debug(
110+
`allServersOkay cleared pending response on ${this._serverInstanceID} for ${response.instanceId}`
111+
);
112+
113+
if (Object.keys(pending.responses).length === pending.expectedCount) {
114+
// All servers have replied. Make it known and clean up.
115+
clearTimeout(pending.timeout);
116+
pending.resolve(Object.values(pending.responses).every(e => e));
117+
this._pendingServerHealthChecks.delete(response.requestId);
118+
}
119+
});
120+
}
121+
}

app/server/lib/PubSubManager.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export interface IPubSubManager {
2929
close(): Promise<void>;
3030
subscribe(channel: string, callback: Callback): UnsubscribeCallback;
3131
publish(channel: string, message: string): Promise<void>;
32+
getClient(): IORedis|undefined;
3233
}
3334

3435
export type Callback = (message: string) => void;
@@ -72,6 +73,7 @@ abstract class PubSubManagerBase implements IPubSubManager {
7273
* - In Redis, the channel gets prefixed with getPubSubPrefix() to scope it to the current Redis DB.
7374
*/
7475
public abstract publish(channel: string, message: string): Promise<void>;
76+
public abstract getClient(): IORedis|undefined;
7577

7678
protected abstract _redisSubscribe(channel: string): Promise<void>;
7779
protected abstract _redisUnsubscribe(channel: string): Promise<void>;
@@ -96,6 +98,7 @@ abstract class PubSubManagerBase implements IPubSubManager {
9698

9799
class PubSubManagerNoRedis extends PubSubManagerBase {
98100
public async publish(channel: string, message: string) { this._deliverMessage(channel, message); }
101+
public getClient(): IORedis|undefined { return; }
99102
protected async _redisSubscribe(channel: string): Promise<void> {}
100103
protected async _redisUnsubscribe(channel: string): Promise<void> {}
101104
}
@@ -131,6 +134,12 @@ class PubSubManagerRedis extends PubSubManagerBase {
131134
await this._redisPub.publish(this._prefixChannel(channel), message);
132135
}
133136

137+
public getClient(): IORedis|undefined {
138+
// The redisSub client is already tied listening to a channel, but
139+
// the redisPub is "free" for the client to mess around with.
140+
return this._redisPub;
141+
}
142+
134143
protected async _redisSubscribe(channel: string): Promise<void> {
135144
await this._redisSub.subscribe(this._prefixChannel(channel));
136145
}

app/server/lib/attachEarlyEndpoints.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ export function attachEarlyEndpoints(options: AttachOptions) {
113113
});
114114
}
115115
// We're going down, so we're no longer ready to serve requests.
116-
gristServer.setReady(false);
116+
gristServer.ready = false;
117117
return res.status(200).send({ msg: "ok" });
118118
})
119119
);

test/gen-server/lib/HealthCheck.ts

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { assert } from 'chai';
22
import fetch from 'node-fetch';
33
import { TestServer } from 'test/gen-server/apiUtils';
4-
import { TcpForwarder } from 'test/server/tcpForwarder';
4+
import { RedisForwarder } from 'test/server/tcpForwarder';
55
import * as testUtils from 'test/server/testUtils';
66
import { waitForIt } from 'test/server/wait';
77

@@ -12,22 +12,14 @@ describe('HealthCheck', function() {
1212
describe(serverType, function() {
1313
let server: TestServer;
1414
let oldEnv: testUtils.EnvironmentSnapshot;
15-
let redisForwarder: TcpForwarder;
15+
let redisForwarder: RedisForwarder;
1616

1717
before(async function() {
1818
oldEnv = new testUtils.EnvironmentSnapshot();
1919

20-
// We set up Redis via a TcpForwarder, so that we can simulate disconnects.
21-
if (!process.env.TEST_REDIS_URL) {
22-
throw new Error("TEST_REDIS_URL is expected");
23-
}
24-
const redisUrl = new URL(process.env.TEST_REDIS_URL);
25-
const redisPort = parseInt(redisUrl.port, 10) || 6379;
26-
redisForwarder = new TcpForwarder(redisPort, redisUrl.host);
27-
const forwarderPort = await redisForwarder.pickForwarderPort();
28-
await redisForwarder.connect();
29-
30-
process.env.REDIS_URL = `redis://localhost:${forwarderPort}`;
20+
// We set up Redis via a forwarder, so that we can simulate disconnects.
21+
redisForwarder = await RedisForwarder.create();
22+
process.env.REDIS_URL = `redis://localhost:${redisForwarder.port}`;
3123
server = new TestServer(this);
3224
await server.start([serverType]);
3325
});

test/server/lib/Authorizer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ async function activateServer(home: FlexServer, docManager: DocManager) {
4343
home.addApiErrorHandlers();
4444
home.finalizeEndpoints();
4545
await home.finalizePlugins(null);
46-
home.setReady(true);
46+
home.ready = true;
4747
serverUrl = home.getOwnUrl();
4848
}
4949

0 commit comments

Comments
 (0)