Skip to content
This repository was archived by the owner on Sep 2, 2024. It is now read-only.

Commit f13699f

Browse files
committed
added function execution history and fixed bugs
1 parent a390814 commit f13699f

File tree

8 files changed

+161
-16
lines changed

8 files changed

+161
-16
lines changed

function/management.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func Add(db *mongo.Database, data ExecData) (string, error) {
3838
data.ID = primitive.NewObjectID()
3939
data.Version = 1
4040
data.LastUpdated = time.Now()
41+
data.History = make([]ExecHistory, 0)
4142

4243
ctx := context.Background()
4344
res, err := db.Collection("sb_functions").InsertOne(ctx, data)
@@ -190,3 +191,17 @@ func Delete(db *mongo.Database, name string) error {
190191

191192
return nil
192193
}
194+
195+
func Ran(db *mongo.Database, id primitive.ObjectID, rh ExecHistory) error {
196+
filter := bson.M{internal.FieldID: id}
197+
update := bson.M{
198+
"$set": bson.M{"lr": time.Now()},
199+
"$push": bson.M{"h": rh},
200+
}
201+
ctx := context.Background()
202+
res := db.Collection("sb_functions").FindOneAndUpdate(ctx, filter, update)
203+
if err := res.Err(); err != nil {
204+
return err
205+
}
206+
return nil
207+
}

function/runtime.go

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@ package function
22

33
import (
44
"encoding/json"
5+
"errors"
56
"fmt"
7+
"log"
8+
"net/http"
69
"staticbackend/db"
710
"staticbackend/internal"
11+
"time"
812

913
"github.com/dop251/goja"
1014
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -17,41 +21,89 @@ type ExecutionEnvironment struct {
1721
Base *db.Base
1822
Volatile internal.PubSuber
1923
Data ExecData
24+
25+
CurrentRun ExecHistory
2026
}
2127

2228
type Result struct {
2329
OK bool `json:"ok"`
2430
Content interface{} `json:"content"`
2531
}
2632

27-
func (env *ExecutionEnvironment) Execute() error {
33+
func (env *ExecutionEnvironment) Execute(data interface{}) error {
2834
vm := goja.New()
2935
vm.SetFieldNameMapper(goja.TagFieldNameMapper("json", true))
3036

3137
env.addHelpers(vm)
3238
env.addDatabaseFunctions(vm)
3339
env.addVolatileFunctions(vm)
3440

35-
result, err := vm.RunString(env.Data.Code)
36-
if err != nil {
41+
if _, err := vm.RunString(env.Data.Code); err != nil {
3742
return err
3843
}
3944

4045
handler, ok := goja.AssertFunction(vm.Get("handle"))
4146
if !ok {
42-
return fmt.Errorf(`unable to find function "handle": %v`, err)
47+
return errors.New(`unable to find function "handle"`)
48+
}
49+
50+
args, err := env.prepareArguments(vm, data)
51+
if err != nil {
52+
return fmt.Errorf("error preparing argument: %v", err)
53+
}
54+
55+
env.CurrentRun = ExecHistory{
56+
ID: primitive.NewObjectID().Hex(),
57+
Version: env.Data.Version,
58+
Started: time.Now(),
59+
Output: make([]string, 0),
4360
}
4461

45-
resp, err := handler(goja.Undefined())
62+
env.CurrentRun.Output = append(env.CurrentRun.Output, "Function started")
63+
64+
_, err = handler(goja.Undefined(), args...)
65+
go env.complete(err)
4666
if err != nil {
4767
return fmt.Errorf("error executing your function: %v", err)
4868
}
4969

50-
fmt.Println("resp", resp)
51-
fmt.Println("result", result)
5270
return nil
5371
}
5472

73+
func (env *ExecutionEnvironment) prepareArguments(vm *goja.Runtime, data interface{}) ([]goja.Value, error) {
74+
var args []goja.Value
75+
76+
// for "web" trigger we prepare the body, query string and headers
77+
r, ok := data.(*http.Request)
78+
if ok {
79+
defer r.Body.Close()
80+
81+
// let's ready the HTTP body
82+
if r.Header.Get("Content-Type") == "application/json" {
83+
var v interface{}
84+
if err := json.NewDecoder(r.Body).Decode(&v); err != nil {
85+
return nil, err
86+
}
87+
88+
args = append(args, vm.ToValue(v))
89+
} else if r.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
90+
if err := r.ParseForm(); err != nil {
91+
return nil, err
92+
}
93+
args = append(args, vm.ToValue(r.Form))
94+
}
95+
96+
args = append(args, vm.ToValue(r.URL.Query()))
97+
args = append(args, vm.ToValue(r.Header))
98+
99+
return args, nil
100+
}
101+
102+
// system or custom event/topic, we send only the 1st argument (body)
103+
args = append(args, vm.ToValue(data))
104+
return args, nil
105+
}
106+
55107
func (env *ExecutionEnvironment) addHelpers(vm *goja.Runtime) {
56108
vm.Set("log", func(call goja.FunctionCall) goja.Value {
57109
if len(call.Arguments) == 0 {
@@ -62,7 +114,7 @@ func (env *ExecutionEnvironment) addHelpers(vm *goja.Runtime) {
62114
for _, v := range call.Arguments {
63115
params = append(params, v.Export())
64116
}
65-
fmt.Println(params...)
117+
env.CurrentRun.Output = append(env.CurrentRun.Output, fmt.Sprint(params...))
66118
return goja.Undefined()
67119
})
68120
}
@@ -292,3 +344,21 @@ func (env *ExecutionEnvironment) addVolatileFunctions(vm *goja.Runtime) {
292344
return vm.ToValue(Result{OK: true})
293345
})
294346
}
347+
348+
func (env *ExecutionEnvironment) complete(err error) {
349+
env.CurrentRun.Completed = time.Now()
350+
env.CurrentRun.Success = err == mongo.ErrNilCursor
351+
352+
env.CurrentRun.Output = append(env.CurrentRun.Output, "Function completed")
353+
354+
// add the error in the last output entry
355+
if err != nil {
356+
env.CurrentRun.Output = append(env.CurrentRun.Output, err.Error())
357+
}
358+
359+
//TODO: this needs to be regrouped and ran un batch
360+
if err := Ran(env.DB, env.Data.ID, env.CurrentRun); err != nil {
361+
//TODO: do something with those error
362+
log.Println("error logging function complete: ", err)
363+
}
364+
}

function/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func (ts *TaskScheduler) execFunction(curDB *mongo.Database, auth internal.Auth,
152152
Data: fn,
153153
}
154154

155-
if err := exe.Execute(); err != nil {
155+
if err := exe.Execute(task.Name); err != nil {
156156
log.Printf("error executing function %s: %v", task.Value, err)
157157
}
158158
}

function/subscriber.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ func (sub *Subscriber) process(msg internal.Command) {
4646
func (sub *Subscriber) handleRealtimeEvents(msg internal.Command) {
4747
exe, err := sub.GetExecEnv(msg.Token)
4848
if err != nil {
49-
log.Println("cannot retrieve base from token")
49+
log.Println("cannot retrieve base from token", msg.Token)
50+
log.Println(err)
5051
return
5152
}
5253

@@ -81,7 +82,7 @@ func (sub *Subscriber) handleRealtimeEvents(msg internal.Command) {
8182

8283
exe.Data = fn
8384
go func(ex ExecutionEnvironment) {
84-
if err := ex.Execute(); err != nil {
85+
if err := ex.Execute(msg); err != nil {
8586
log.Printf(`executing "%s" function failed: %v"`, ex.Data.FunctionName, err)
8687
}
8788
}(exe)

functions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (f *functions) exec(w http.ResponseWriter, r *http.Request) {
111111
Base: f.base,
112112
Data: fn,
113113
}
114-
if err := env.Execute(); err != nil {
114+
if err := env.Execute(r); err != nil {
115115
http.Error(w, err.Error(), http.StatusInternalServerError)
116116
return
117117
}

membership.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,17 @@ func login(w http.ResponseWriter, r *http.Request) {
8787
Role: tok.Role,
8888
Token: tok.Token,
8989
}
90+
91+
//TODO: find a good way to find all occurences of those two
92+
// and make them easily callable via a shared function
9093
if err := volatile.SetTyped(token, auth); err != nil {
9194
http.Error(w, err.Error(), http.StatusInternalServerError)
9295
return
9396
}
97+
if err := volatile.SetTyped("base:"+token, conf); err != nil {
98+
http.Error(w, err.Error(), http.StatusInternalServerError)
99+
return
100+
}
94101

95102
respond(w, http.StatusOK, string(jwtBytes))
96103
}
@@ -141,12 +148,32 @@ func register(w http.ResponseWriter, r *http.Request) {
141148
return
142149
}
143150

144-
jwtBytes, _, err := createAccountAndUser(db, l.Email, l.Password, 0)
151+
jwtBytes, tok, err := createAccountAndUser(db, l.Email, l.Password, 0)
145152
if err != nil {
146153
http.Error(w, err.Error(), http.StatusInternalServerError)
147154
return
148155
}
149-
respond(w, http.StatusOK, string(jwtBytes))
156+
157+
token := string(jwtBytes)
158+
159+
auth := internal.Auth{
160+
AccountID: tok.AccountID,
161+
UserID: tok.ID,
162+
Email: tok.Email,
163+
Role: tok.Role,
164+
Token: tok.Token,
165+
}
166+
167+
if err := volatile.SetTyped(token, auth); err != nil {
168+
http.Error(w, err.Error(), http.StatusInternalServerError)
169+
return
170+
}
171+
if err := volatile.SetTyped("base:"+token, conf); err != nil {
172+
http.Error(w, err.Error(), http.StatusInternalServerError)
173+
return
174+
}
175+
176+
respond(w, http.StatusOK, token)
150177
}
151178

152179
func createAccountAndUser(db *mongo.Database, email, password string, role int) ([]byte, internal.Token, error) {

realtime/broker.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net/http"
99
"staticbackend/internal"
1010
"strings"
11+
"time"
1112

1213
"github.com/google/uuid"
1314
)
@@ -192,7 +193,18 @@ func (b *Broker) getTargets(msg internal.Command) (sockets []chan internal.Comma
192193

193194
go b.pubsub.Subscribe(sender, msg.Token, msg.Data, closesub)
194195

195-
payload = internal.Command{Type: internal.MsgTypeJoined, Data: msg.Data}
196+
joinedMsg := internal.Command{
197+
Type: internal.MsgTypeJoined,
198+
Data: msg.SID,
199+
Channel: msg.Data,
200+
}
201+
// make sure the subscription had time to kick-off
202+
go func(m internal.Command) {
203+
time.Sleep(250 * time.Millisecond)
204+
b.pubsub.Publish(joinedMsg)
205+
}(joinedMsg)
206+
207+
payload = internal.Command{Type: internal.MsgTypeOk, Data: msg.Data}
196208
case internal.MsgTypePresence:
197209
v, err := b.pubsub.Get(msg.Data)
198210
if err != nil {

server.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package staticbackend
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"log"
89
"net/http"
@@ -72,9 +73,25 @@ func Start(dbHost, port string) {
7273
return key, nil
7374
}
7475

75-
if _, err := middleware.ValidateAuthKey(client, volatile, ctx, key); err != nil {
76+
auth, err := middleware.ValidateAuthKey(client, volatile, ctx, key)
77+
if err != nil {
7678
return "", err
7779
}
80+
81+
// set base:token useful when executing pubsub event message / function
82+
conf, ok := ctx.Value(middleware.ContextBase).(internal.BaseConfig)
83+
if !ok {
84+
return "", errors.New("could not find base config")
85+
}
86+
87+
//TODO: Lots of repetition of this, needs to be refactor
88+
if err := volatile.SetTyped(key, auth); err != nil {
89+
return "", err
90+
}
91+
if err := volatile.SetTyped("base:"+key, conf); err != nil {
92+
return "", err
93+
}
94+
7895
return key, nil
7996
}, volatile)
8097

@@ -162,6 +179,7 @@ func Start(dbHost, port string) {
162179
http.Handle("/fn/add", middleware.Chain(http.HandlerFunc(f.add), stdRoot...))
163180
http.Handle("/fn/update", middleware.Chain(http.HandlerFunc(f.update), stdRoot...))
164181
http.Handle("/fn/delete/", middleware.Chain(http.HandlerFunc(f.del), stdRoot...))
182+
http.Handle("/fn/del/", middleware.Chain(http.HandlerFunc(f.del), stdRoot...))
165183
http.Handle("/fn/info/", middleware.Chain(http.HandlerFunc(f.info), stdRoot...))
166184
http.Handle("/fn/exec", middleware.Chain(http.HandlerFunc(f.exec), stdAuth...))
167185
http.Handle("/fn", middleware.Chain(http.HandlerFunc(f.list), stdRoot...))
@@ -213,11 +231,13 @@ func initServices(dbHost string) {
213231

214232
var conf internal.BaseConfig
215233
if err := volatile.GetTyped("base:"+token, &conf); err != nil {
234+
log.Println("cannot find base")
216235
return exe, err
217236
}
218237

219238
var auth internal.Auth
220239
if err := volatile.GetTyped(token, &auth); err != nil {
240+
log.Println("cannot find auth")
221241
return exe, err
222242
}
223243

0 commit comments

Comments
 (0)