Skip to content
This repository was archived by the owner on Sep 2, 2024. It is now read-only.

Commit 54fff13

Browse files
committed
first version of a task scheduler close #9
1 parent 2a027bb commit 54fff13

File tree

5 files changed

+226
-0
lines changed

5 files changed

+226
-0
lines changed

function/scheduler.go

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package function
2+
3+
import (
4+
"context"
5+
"log"
6+
"staticbackend/db"
7+
"staticbackend/internal"
8+
"time"
9+
10+
"github.com/go-co-op/gocron"
11+
"go.mongodb.org/mongo-driver/bson"
12+
"go.mongodb.org/mongo-driver/bson/primitive"
13+
"go.mongodb.org/mongo-driver/mongo"
14+
)
15+
16+
type TaskScheduler struct {
17+
Client *mongo.Client
18+
Volatile internal.PubSuber
19+
Scheduler *gocron.Scheduler
20+
}
21+
22+
const (
23+
TaskTypeFunction = "function"
24+
TaskTypeMessage = "message"
25+
)
26+
27+
type Task struct {
28+
ID primitive.ObjectID `bson:"_id" json:"id"`
29+
Name string `bson:"name" json:"name"`
30+
Type string `bson:"type" json:"type"`
31+
Value string `bson:"value" json:"value"`
32+
Meta interface{} `bson:"meta" json:"meta"`
33+
Interval string `bson:"invertal" json:"interval"`
34+
LastRun time.Time `bson:"last" json:"last"`
35+
36+
BaseName string `bson:"-" json:"base"`
37+
}
38+
39+
type MetaMessage struct {
40+
Data string `bson:"data" json:"data"`
41+
Channel string `bson:"channel" json:"channel"`
42+
}
43+
44+
func (ts *TaskScheduler) Start() {
45+
tasks, err := ts.listTasks()
46+
if err != nil {
47+
log.Println("error loading tasks: %v", err)
48+
return
49+
}
50+
51+
ts.Scheduler = gocron.NewScheduler(time.UTC)
52+
ts.Scheduler.TagsUnique()
53+
54+
for _, task := range tasks {
55+
_, err := ts.Scheduler.Cron(task.Interval).Tag(task.ID.Hex()).Do(ts.run, task)
56+
if err != nil {
57+
log.Println("error scheduling this task: %s -> %v", task.ID.Hex(), err)
58+
}
59+
}
60+
}
61+
62+
func (ts *TaskScheduler) listTasks() ([]Task, error) {
63+
bases, err := internal.ListDatabases(ts.Client.Database("sbsys"))
64+
if err != nil {
65+
return nil, err
66+
}
67+
68+
filter := bson.M{}
69+
70+
//TODO: Might be worth doing this concurrently
71+
var results []Task
72+
73+
ctx := context.Background()
74+
75+
for _, base := range bases {
76+
db := ts.Client.Database(base.Name)
77+
cur, err := db.Collection("sb_tasks").Find(ctx, filter)
78+
if err != nil {
79+
return nil, err
80+
}
81+
defer cur.Close(ctx)
82+
83+
var tasks []Task
84+
85+
for cur.Next(ctx) {
86+
var t Task
87+
if err := cur.Decode(&t); err != nil {
88+
return nil, err
89+
}
90+
91+
t.BaseName = base.Name
92+
93+
tasks = append(tasks, t)
94+
}
95+
if err := cur.Err(); err != nil {
96+
return nil, err
97+
}
98+
99+
results = append(results, tasks...)
100+
}
101+
102+
return results, nil
103+
}
104+
105+
func (ts *TaskScheduler) run(task Task) {
106+
curDB := ts.Client.Database(task.BaseName)
107+
108+
// the task must run as the root base user
109+
var auth internal.Auth
110+
if err := ts.Volatile.GetTyped("root:"+task.BaseName, &auth); err != nil {
111+
tok, err := internal.GetRootForBase(curDB)
112+
if err != nil {
113+
log.Printf("error finding root token for base %s: %v\n", task.BaseName, err)
114+
return
115+
}
116+
117+
auth = internal.Auth{
118+
AccountID: tok.AccountID,
119+
UserID: tok.ID,
120+
Email: tok.Email,
121+
Role: tok.Role,
122+
Token: tok.Token,
123+
}
124+
125+
if err := ts.Volatile.SetTyped("root:"+task.BaseName, auth); err != nil {
126+
log.Printf("error setting auth inside TaskScheduler.run: ", err)
127+
return
128+
}
129+
}
130+
131+
switch task.Type {
132+
case TaskTypeFunction:
133+
ts.execFunction(curDB, auth, task)
134+
case TaskTypeMessage:
135+
ts.sendMessage(curDB, auth, task)
136+
}
137+
}
138+
139+
func (ts *TaskScheduler) execFunction(curDB *mongo.Database, auth internal.Auth, task Task) {
140+
141+
fn, err := GetForExecution(curDB, task.Value)
142+
if err != nil {
143+
log.Printf("cannot find function %s on task %s", task.Value, task.ID.Hex())
144+
return
145+
}
146+
147+
exe := &ExecutionEnvironment{
148+
Auth: auth,
149+
DB: curDB,
150+
Base: &db.Base{PublishDocument: ts.Volatile.PublishDocument},
151+
Volatile: ts.Volatile,
152+
Data: fn,
153+
}
154+
155+
if err := exe.Execute(); err != nil {
156+
log.Printf("error executing function %s: %v", task.Value, err)
157+
}
158+
}
159+
160+
func (ts *TaskScheduler) sendMessage(curDB *mongo.Database, auth internal.Auth, task Task) {
161+
token := auth.ReconstructToken()
162+
163+
meta, ok := task.Meta.(MetaMessage)
164+
if !ok {
165+
log.Println("unable to get meta data for type MetaMessage for task: ", task.ID.Hex())
166+
return
167+
}
168+
169+
msg := internal.Command{
170+
SID: task.ID.Hex(),
171+
Type: task.Value,
172+
Data: meta.Data,
173+
Channel: meta.Channel,
174+
Token: token,
175+
}
176+
177+
if err := ts.Volatile.Publish(msg); err != nil {
178+
log.Println("error publishing message from task", task.ID.Hex(), err)
179+
}
180+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/aws/aws-sdk-go v1.27.2
77
github.com/dop251/goja v0.0.0-20210804101310-32956a348b49
88
github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.1
9+
github.com/go-co-op/gocron v1.6.2 // indirect
910
github.com/go-redis/redis/v8 v8.4.4
1011
github.com/golang/snappy v0.0.4 // indirect
1112
github.com/google/uuid v1.1.4

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo
1919
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
2020
github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.1 h1:/opyYiz6HZoBVAU8ypemFOTtzuKFE9kiKstP6RYE1Z4=
2121
github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.1/go.mod h1:JEL7eYb4ETfz9AYni+/4BV09MrMgGwju0G/k4XF8QMg=
22+
github.com/go-co-op/gocron v1.6.2 h1:x5g1tWnWcXIZesdosJJcbziRi4XG6tKB92yKLUpoBkU=
23+
github.com/go-co-op/gocron v1.6.2/go.mod h1:DbJm9kdgr1sEvWpHCA7dFFs/PGHPMil9/97EXCRPr4k=
2224
github.com/go-redis/redis/v8 v8.4.4 h1:fGqgxCTR1sydaKI00oQf3OmkU/DIe/I/fYXvGklCIuc=
2325
github.com/go-redis/redis/v8 v8.4.4/go.mod h1:nA0bQuF0i5JFx4Ta9RZxGKXFrQ8cRWntra97f0196iY=
2426
github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU=
@@ -112,6 +114,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
112114
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
113115
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
114116
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
117+
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
118+
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
115119
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
116120
github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
117121
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
@@ -127,6 +131,7 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
127131
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
128132
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
129133
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
134+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
130135
github.com/stripe/stripe-go/v71 v71.44.0 h1:kACtWvhEOQ0THj5okxVqEN+YxcUzRK3ls5vuuMxX4xA=
131136
github.com/stripe/stripe-go/v71 v71.44.0/go.mod h1:BXYwMQe+xjYomcy5/qaTGyoyVMTP3wDCHa7DVFvg8+Y=
132137
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
@@ -228,3 +233,4 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
228233
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
229234
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
230235
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
236+
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/account.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,15 @@ func FindRootToken(db *mongo.Database, id, accountID primitive.ObjectID, token s
8888
return
8989
}
9090

91+
func GetRootForBase(db *mongo.Database) (tok Token, err error) {
92+
filter := bson.M{
93+
FieldRole: 100,
94+
}
95+
sr := db.Collection("sb_tokens").FindOne(ctx, filter)
96+
err = sr.Decode(&tok)
97+
return
98+
}
99+
91100
func FindTokenByEmail(db *mongo.Database, email string) (tok Token, err error) {
92101
sr := db.Collection("sb_tokens").FindOne(ctx, bson.M{"email": email})
93102
err = sr.Decode(&tok)
@@ -128,3 +137,31 @@ func DatabaseExists(db *mongo.Database, name string) (bool, error) {
128137
}
129138
return count > 0, nil
130139
}
140+
141+
func ListDatabases(db *mongo.Database) ([]BaseConfig, error) {
142+
filter := bson.M{FieldIsActive: true}
143+
144+
ctx := context.Background()
145+
146+
cur, err := db.Collection("bases").Find(ctx, filter)
147+
if err != nil {
148+
return nil, err
149+
}
150+
defer cur.Close(ctx)
151+
152+
var results []BaseConfig
153+
154+
for cur.Next(ctx) {
155+
var bc BaseConfig
156+
if err := cur.Decode(&bc); err != nil {
157+
return nil, err
158+
}
159+
160+
results = append(results, bc)
161+
}
162+
if err := cur.Err(); err != nil {
163+
return nil, err
164+
}
165+
166+
return results, nil
167+
}

internal/data.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ const (
2626
FieldAccountID = "accountId"
2727
FieldOwnerID = "sb_owner"
2828
FieldToken = "token"
29+
FieldIsActive = "active"
30+
FieldRole = "role"
2931
)
3032

3133
const (

0 commit comments

Comments
 (0)