Skip to content
This repository was archived by the owner on Sep 2, 2024. It is now read-only.

Commit fd98914

Browse files
committed
updated realtime broker to handle distributed deployment
1 parent 577e0d9 commit fd98914

File tree

6 files changed

+74
-32
lines changed

6 files changed

+74
-32
lines changed

cache/cache.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ func (c *Cache) Set(key string, value string) error {
4242
return nil
4343
}
4444

45+
func (c *Cache) Inc(key string, by int64) (int64, error) {
46+
return c.Rdb.IncrBy(c.Ctx, key, by).Result()
47+
}
48+
49+
func (c *Cache) Dec(key string, by int64) (int64, error) {
50+
return c.Rdb.DecrBy(c.Ctx, key, by).Result()
51+
}
52+
4553
func (c *Cache) Subscribe(send chan internal.Command, token, channel string, close chan bool) {
4654
pubsub := c.Rdb.Subscribe(c.Ctx, channel)
4755

internal/data.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ const (
3939
MsgTypeToken = "token"
4040
MsgTypeJoin = "join"
4141
MsgTypeJoined = "joined"
42+
MsgTypePresence = "presence"
4243
MsgTypeChanIn = "chan_in"
4344
MsgTypeChanOut = "chan_out"
4445
MsgTypeDBCreated = "db_created"

internal/pubsuber.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package internal
2+
3+
// PubSuber contains functions to make realtime communication distributed
4+
type PubSuber interface {
5+
Get(key string) (string, error)
6+
Set(key string, value string) error
7+
Inc(key string, by int64) (int64, error)
8+
Dec(key string, by int64) (int64, error)
9+
Subscribe(send chan Command, token, channel string, close chan bool)
10+
Publish(msg Command) error
11+
PublishDocument(channel, typ string, v interface{})
12+
}

main_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func deleteAndSetupTestAccount() {
9595
SBID: acctID,
9696
Name: dbName,
9797
Whitelist: []string{"localhost"},
98-
Valid: true,
98+
IsActive: true,
9999
}
100100

101101
if _, err := sysDB.Collection("bases").InsertOne(ctx, base); err != nil {

realtime/broker.go

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,23 @@ type Broker struct {
2626
clients map[chan internal.Command]string
2727
ids map[string]chan internal.Command
2828
conf map[string]context.Context
29-
subscriptions map[string][]string
29+
subscriptions map[string][]chan bool
3030
validateAuth Validator
31+
32+
pubsub internal.PubSuber
3133
}
3234

33-
func NewBroker(v Validator) *Broker {
35+
func NewBroker(v Validator, pubsub internal.PubSuber) *Broker {
3436
b := &Broker{
3537
Broadcast: make(chan internal.Command, 1),
3638
newConnections: make(chan ConnectionData),
3739
closingConnections: make(chan chan internal.Command),
3840
clients: make(map[chan internal.Command]string),
3941
ids: make(map[string]chan internal.Command),
4042
conf: make(map[string]context.Context),
41-
subscriptions: make(map[string][]string),
43+
subscriptions: make(map[string][]chan bool),
4244
validateAuth: v,
45+
pubsub: pubsub,
4346
}
4447

4548
go b.start()
@@ -85,6 +88,13 @@ func (b *Broker) unsub(c chan internal.Command) {
8588
fmt.Println("cannot find connection id")
8689
}
8790

91+
subs, ok := b.subscriptions[id]
92+
if ok {
93+
for _, ch := range subs {
94+
ch <- true
95+
}
96+
}
97+
8898
delete(b.ids, id)
8999
}
90100

@@ -140,12 +150,15 @@ func (b *Broker) Accept(w http.ResponseWriter, r *http.Request) {
140150
}
141151

142152
func (b *Broker) getTargets(msg internal.Command) (sockets []chan internal.Command, payload internal.Command) {
153+
var sender chan internal.Command
154+
143155
if msg.SID != internal.SystemID {
144-
sender, ok := b.ids[msg.SID]
156+
s, ok := b.ids[msg.SID]
145157
if !ok {
146158
fmt.Println("cannot find sender socket", msg.SID)
147159
return
148160
}
161+
sender = s
149162
sockets = append(sockets, sender)
150163
}
151164

@@ -167,15 +180,27 @@ func (b *Broker) getTargets(msg internal.Command) (sockets []chan internal.Comma
167180

168181
payload = internal.Command{Type: internal.MsgTypeToken, Data: msg.Data}
169182
case internal.MsgTypeJoin:
170-
members, ok := b.subscriptions[msg.Data]
183+
subs, ok := b.subscriptions[msg.SID]
171184
if !ok {
172-
members = make([]string, 0)
185+
subs = make([]chan bool, 0)
173186
}
174187

175-
members = append(members, msg.SID)
176-
b.subscriptions[msg.Data] = members
188+
closesub := make(chan bool)
189+
190+
subs = append(subs, closesub)
191+
b.subscriptions[msg.SID] = subs
192+
193+
go b.pubsub.Subscribe(sender, msg.Token, msg.Data, closesub)
177194

178195
payload = internal.Command{Type: internal.MsgTypeJoined, Data: msg.Data}
196+
case internal.MsgTypePresence:
197+
v, err := b.pubsub.Get(msg.Data)
198+
if err != nil {
199+
//TODO: Make sure it's because the channel key does not exists
200+
v = "0"
201+
}
202+
203+
payload = internal.Command{Type: internal.MsgTypePresence, Data: v}
179204
case internal.MsgTypeChanIn:
180205
if len(msg.Channel) == 0 {
181206
payload = internal.Command{Type: internal.MsgTypeError, Data: "no channel was specified"}
@@ -188,7 +213,8 @@ func (b *Broker) getTargets(msg internal.Command) (sockets []chan internal.Comma
188213
return
189214
}
190215

191-
go b.Publish(msg, msg.Channel)
216+
go b.pubsub.Publish(msg)
217+
//go b.Publish(msg, msg.Channel)
192218

193219
payload = internal.Command{Type: internal.MsgTypeOk}
194220
default:
@@ -198,24 +224,3 @@ func (b *Broker) getTargets(msg internal.Command) (sockets []chan internal.Comma
198224

199225
return
200226
}
201-
202-
// Publish sends a message to all socket in that channel
203-
func (b *Broker) Publish(msg internal.Command, channel string) {
204-
if msg.Type == internal.MsgTypeChanIn {
205-
msg.Type = internal.MsgTypeChanOut
206-
}
207-
208-
members, ok := b.subscriptions[channel]
209-
if !ok {
210-
return
211-
}
212-
213-
for _, sid := range members {
214-
c, ok := b.ids[sid]
215-
if !ok {
216-
continue
217-
}
218-
219-
c <- msg
220-
}
221-
}

server.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"staticbackend/cache"
2020

2121
"github.com/stripe/stripe-go/v71"
22+
"go.mongodb.org/mongo-driver/bson/primitive"
2223
"go.mongodb.org/mongo-driver/mongo"
2324
"go.mongodb.org/mongo-driver/mongo/options"
2425
"go.mongodb.org/mongo-driver/mongo/readpref"
@@ -53,11 +54,26 @@ func Start(dbHost, port string) {
5354

5455
// Server Send Event, alternative to websocket
5556
b := realtime.NewBroker(func(ctx context.Context, key string) (string, error) {
57+
//TODO: Experimental, let un-authenticated user connect
58+
// useful for an Intercom-like SaaS I'm building.
59+
if strings.HasPrefix(key, "__tmp__experimental_public_19378246_") {
60+
// let's create the most minimal authentication possible
61+
a := internal.Auth{
62+
AccountID: primitive.NewObjectID(),
63+
UserID: primitive.NewObjectID(),
64+
Email: "exp@tmp.com",
65+
Role: 0,
66+
}
67+
68+
internal.Tokens[key] = a
69+
return key, nil
70+
}
71+
5672
if _, err := middleware.ValidateAuthKey(client, ctx, key); err != nil {
5773
return "", err
5874
}
5975
return key, nil
60-
})
76+
}, volatile)
6177

6278
database := &Database{
6379
client: client,

0 commit comments

Comments
 (0)