Skip to content
This repository was archived by the owner on Sep 2, 2024. It is now read-only.

Commit 2a027bb

Browse files
committed
initial proposal for pubsub/event message close #8
1 parent 97b9a85 commit 2a027bb

File tree

12 files changed

+291
-38
lines changed

12 files changed

+291
-38
lines changed

cache/cache.go

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,23 @@ func (c *Cache) Set(key string, value string) error {
4242
return nil
4343
}
4444

45+
func (c *Cache) GetTyped(key string, v interface{}) error {
46+
s, err := c.Get(key)
47+
if err != nil {
48+
return err
49+
}
50+
51+
return json.Unmarshal([]byte(s), v)
52+
}
53+
54+
func (c *Cache) SetTyped(key string, v interface{}) error {
55+
b, err := json.Marshal(v)
56+
if err != nil {
57+
return err
58+
}
59+
return c.Set(key, string(b))
60+
}
61+
4562
func (c *Cache) Inc(key string, by int64) (int64, error) {
4663
return c.Rdb.IncrBy(c.Ctx, key, by).Result()
4764
}
@@ -70,14 +87,16 @@ func (c *Cache) Subscribe(send chan internal.Command, token, channel string, clo
7087
return
7188
}
7289

73-
// for non DB events we change the type to MsgTypeChanOut
74-
if !msg.IsDBEvent() {
90+
// TODO: this will need more thinking
91+
if msg.Type == internal.MsgTypeChanIn {
7592
msg.Type = internal.MsgTypeChanOut
76-
} else if c.HasPermission(token, channel, msg.Data) == false {
93+
} else if msg.IsSystemEvent {
94+
95+
} else if msg.IsDBEvent() && c.HasPermission(token, channel, msg.Data) == false {
7796
continue
7897
}
7998
send <- msg
80-
case _ = <-close:
99+
case <-close:
81100
_ = pubsub.Close()
82101
return
83102
}
@@ -93,6 +112,22 @@ func (c *Cache) Publish(msg internal.Command) error {
93112
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
94113
defer cancel()
95114

115+
// Publish the event to system so server-side function can trigger
116+
go func(sysmsg internal.Command) {
117+
sysmsg.IsSystemEvent = true
118+
b, err := json.Marshal(sysmsg)
119+
if err != nil {
120+
log.Println("error marshaling the system msg: ", err)
121+
return
122+
}
123+
124+
sysctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
125+
defer cancel()
126+
if err := c.Rdb.Publish(sysctx, "sbsys", string(b)).Err(); err != nil {
127+
log.Println("error publishing to system channel: ", err)
128+
}
129+
}(msg)
130+
96131
return c.Rdb.Publish(ctx, msg.Channel, string(b)).Err()
97132
}
98133

@@ -129,8 +164,8 @@ func (c *Cache) PublishDocument(channel, typ string, v interface{}) {
129164
}
130165

131166
func (c *Cache) HasPermission(token, repo, payload string) bool {
132-
me, ok := internal.Tokens[token]
133-
if !ok {
167+
var me internal.Auth
168+
if err := c.GetTyped(token, &me); err != nil {
134169
return false
135170
}
136171

function/management.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,32 @@ func List(db *mongo.Database) ([]ExecData, error) {
138138
return results, nil
139139
}
140140

141+
func ListByTrigger(db *mongo.Database, trigger string) ([]ExecData, error) {
142+
opt := &options.FindOptions{}
143+
opt.SetProjection(bson.M{"h": 0})
144+
145+
filter := bson.M{"tr": trigger}
146+
ctx := context.Background()
147+
cur, err := db.Collection("sb_functions").Find(ctx, filter, opt)
148+
if err != nil {
149+
return nil, err
150+
}
151+
defer cur.Close(ctx)
152+
153+
var results []ExecData
154+
for cur.Next(ctx) {
155+
var ed ExecData
156+
err := cur.Decode(&ed)
157+
if err != nil {
158+
return nil, err
159+
}
160+
161+
results = append(results, ed)
162+
}
163+
164+
return results, nil
165+
}
166+
141167
func Delete(db *mongo.Database, name string) error {
142168
filter := bson.M{"name": name}
143169

function/runtime.go

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package function
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"staticbackend/db"
67
"staticbackend/internal"
@@ -11,10 +12,11 @@ import (
1112
)
1213

1314
type ExecutionEnvironment struct {
14-
Auth internal.Auth
15-
DB *mongo.Database
16-
Base *db.Base
17-
Data ExecData
15+
Auth internal.Auth
16+
DB *mongo.Database
17+
Base *db.Base
18+
Volatile internal.PubSuber
19+
Data ExecData
1820
}
1921

2022
type Result struct {
@@ -28,6 +30,7 @@ func (env *ExecutionEnvironment) Execute() error {
2830

2931
env.addHelpers(vm)
3032
env.addDatabaseFunctions(vm)
33+
env.addVolatileFunctions(vm)
3134

3235
result, err := vm.RunString(env.Data.Code)
3336
if err != nil {
@@ -255,3 +258,37 @@ func (*ExecutionEnvironment) clean(doc map[string]interface{}) error {
255258

256259
return nil
257260
}
261+
262+
func (env *ExecutionEnvironment) addVolatileFunctions(vm *goja.Runtime) {
263+
vm.Set("send", func(call goja.FunctionCall) goja.Value {
264+
if len(call.Arguments) != 3 {
265+
return vm.ToValue(Result{Content: "argument missmatch: you need 3 arguments for send(type, data, channel)"})
266+
}
267+
268+
var typ, channel string
269+
if err := vm.ExportTo(call.Argument(0), &typ); err != nil {
270+
return vm.ToValue(Result{Content: "the first argument should be a string"})
271+
} else if err := vm.ExportTo(call.Argument(2), &channel); err != nil {
272+
return vm.ToValue(Result{Content: "the third argument should be a string"})
273+
}
274+
275+
b, err := json.Marshal(call.Argument(1).Export())
276+
if err != nil {
277+
return vm.ToValue(Result{Content: fmt.Sprintf("error converting your data: %v", err)})
278+
}
279+
280+
msg := internal.Command{
281+
SID: env.Data.ID.Hex(),
282+
Type: typ,
283+
Data: string(b),
284+
Channel: channel,
285+
Token: env.Auth.ReconstructToken(),
286+
}
287+
288+
if err := env.Volatile.Publish(msg); err != nil {
289+
return vm.ToValue(Result{Content: fmt.Sprintf("error publishing your message: %v", err)})
290+
}
291+
292+
return vm.ToValue(Result{OK: true})
293+
})
294+
}

function/subscriber.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package function
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"staticbackend/internal"
7+
8+
"go.mongodb.org/mongo-driver/bson/primitive"
9+
)
10+
11+
type Subscriber struct {
12+
PubSub internal.PubSuber
13+
GetExecEnv func(token string) (ExecutionEnvironment, error)
14+
}
15+
16+
// Start starts the system event subscription.
17+
// This channel is responsible of executing functions that match the
18+
// topic/trigger
19+
func (sub *Subscriber) Start() {
20+
receiver := make(chan internal.Command)
21+
close := make(chan bool)
22+
23+
go sub.PubSub.Subscribe(receiver, "", "sbsys", close)
24+
25+
for {
26+
select {
27+
case msg := <-receiver:
28+
go sub.process(msg)
29+
case _ = <-close:
30+
log.Println("system event channel closed?!?")
31+
break
32+
}
33+
}
34+
}
35+
36+
func (sub *Subscriber) process(msg internal.Command) {
37+
switch msg.Type {
38+
case internal.MsgTypeChanOut,
39+
internal.MsgTypeDBCreated,
40+
internal.MsgTypeDBUpdated,
41+
internal.MsgTypeDBDeleted:
42+
sub.handleRealtimeEvents(msg)
43+
}
44+
}
45+
46+
func (sub *Subscriber) handleRealtimeEvents(msg internal.Command) {
47+
exe, err := sub.GetExecEnv(msg.Token)
48+
if err != nil {
49+
log.Println("cannot retrieve base from token")
50+
return
51+
}
52+
53+
var ids []primitive.ObjectID
54+
55+
key := fmt.Sprintf("%s:%s", exe.DB.Name(), msg.Type)
56+
if err := sub.PubSub.GetTyped(key, &ids); err != nil {
57+
funcs, err := ListByTrigger(exe.DB, msg.Type)
58+
if err != nil {
59+
log.Println("error getting functions by trigger: ", err)
60+
return
61+
}
62+
63+
for _, fn := range funcs {
64+
if err := sub.PubSub.SetTyped("fn_"+fn.ID.Hex(), fn); err != nil {
65+
log.Println("error adding function to cache: ", err)
66+
return
67+
}
68+
69+
ids = append(ids, fn.ID)
70+
}
71+
72+
sub.PubSub.SetTyped(key, ids)
73+
}
74+
75+
for _, id := range ids {
76+
var fn ExecData
77+
if err := sub.PubSub.GetTyped("fn_"+id.Hex(), &fn); err != nil {
78+
log.Println("error getting function out of cache: ", err)
79+
return
80+
}
81+
82+
exe.Data = fn
83+
go func(ex ExecutionEnvironment) {
84+
if err := ex.Execute(); err != nil {
85+
log.Printf(`executing "%s" function failed: %v"`, ex.Data.FunctionName, err)
86+
}
87+
}(exe)
88+
}
89+
}

hub.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ func (h *Hub) getTargets(msg internal.Command) (sockets []*Socket, payload inter
106106
return
107107
}
108108

109-
_, ok := internal.Tokens[pl.Token]
110-
if !ok {
109+
var a internal.Auth
110+
if err := volatile.GetTyped(pl.Token, &a); err != nil {
111111
payload = internal.Command{Type: internal.MsgTypeError, Data: "invalid token"}
112112
} else {
113113
payload = internal.Command{Type: internal.MsgTypeToken, Data: pl.Token}

internal/account.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package internal
22

33
import (
44
"context"
5+
"fmt"
56
"time"
67

78
"github.com/gbrlsnchs/jwt/v3"
@@ -16,6 +17,11 @@ type Auth struct {
1617
UserID primitive.ObjectID
1718
Email string
1819
Role int
20+
Token string
21+
}
22+
23+
func (auth Auth) ReconstructToken() string {
24+
return fmt.Sprintf("%s|%s", auth.UserID.Hex(), auth.Token)
1925
}
2026

2127
// JWTPayload contains the current user token

internal/data.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ type BaseConfig struct {
1616
}
1717

1818
var (
19-
Tokens map[string]Auth = make(map[string]Auth)
20-
Bases map[string]BaseConfig = make(map[string]BaseConfig)
21-
HashSecret = jwt.NewHS256([]byte(os.Getenv("JWT_SECRET")))
19+
//Tokens map[string]Auth = make(map[string]Auth)
20+
//Bases map[string]BaseConfig = make(map[string]BaseConfig)
21+
HashSecret = jwt.NewHS256([]byte(os.Getenv("JWT_SECRET")))
2222
)
2323

2424
const (
@@ -48,11 +48,12 @@ const (
4848
)
4949

5050
type Command struct {
51-
SID string `json:"sid"`
52-
Type string `json:"type"`
53-
Data string `json:"data"`
54-
Channel string `json:"channel"`
55-
Token string `json:"token"`
51+
SID string `json:"sid"`
52+
Type string `json:"type"`
53+
Data string `json:"data"`
54+
Channel string `json:"channel"`
55+
Token string `json:"token"`
56+
IsSystemEvent bool `json:"-"`
5657
}
5758

5859
func (msg Command) IsDBEvent() bool {

internal/pubsuber.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ package internal
44
type PubSuber interface {
55
Get(key string) (string, error)
66
Set(key string, value string) error
7+
GetTyped(key string, v interface{}) error
8+
SetTyped(key string, v interface{}) error
79
Inc(key string, by int64) (int64, error)
810
Dec(key string, by int64) (int64, error)
911
Subscribe(send chan Command, token, channel string, close chan bool)

membership.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,16 @@ func login(w http.ResponseWriter, r *http.Request) {
8080
return
8181
}
8282

83-
internal.Tokens[token] = internal.Auth{
83+
auth := internal.Auth{
8484
AccountID: tok.AccountID,
8585
UserID: tok.ID,
8686
Email: tok.Email,
8787
Role: tok.Role,
88+
Token: tok.Token,
89+
}
90+
if err := volatile.SetTyped(token, auth); err != nil {
91+
http.Error(w, err.Error(), http.StatusInternalServerError)
92+
return
8893
}
8994

9095
respond(w, http.StatusOK, string(jwtBytes))
@@ -195,11 +200,15 @@ func createUser(db *mongo.Database, accountID primitive.ObjectID, email, passwor
195200
return nil, tok, err
196201
}
197202

198-
internal.Tokens[token] = internal.Auth{
203+
auth := internal.Auth{
199204
AccountID: tok.AccountID,
200205
UserID: tok.ID,
201206
Email: tok.Email,
202207
Role: role,
208+
Token: tok.Token,
209+
}
210+
if err := volatile.SetTyped(token, auth); err != nil {
211+
return nil, tok, err
203212
}
204213

205214
return jwtBytes, tok, nil
@@ -445,11 +454,16 @@ func sudoGetTokenFromAccountID(w http.ResponseWriter, r *http.Request) {
445454
return
446455
}
447456

448-
internal.Tokens[token] = internal.Auth{
457+
auth := internal.Auth{
449458
AccountID: tok.AccountID,
450459
UserID: tok.ID,
451460
Email: tok.Email,
452461
Role: tok.Role,
462+
Token: tok.Token,
463+
}
464+
if err := volatile.SetTyped(token, auth); err != nil {
465+
http.Error(w, err.Error(), http.StatusInternalServerError)
466+
return
453467
}
454468

455469
respond(w, http.StatusOK, string(jwtBytes))

0 commit comments

Comments
 (0)