Skip to content
This repository was archived by the owner on Mar 5, 2024. It is now read-only.

Commit 9cf5aff

Browse files
committed
Merge pull request #7 from stripe/manifests
Add the ability to save and read manifests
2 parents 4d0648a + 813bcf8 commit 9cf5aff

File tree

7 files changed

+216
-33
lines changed

7 files changed

+216
-33
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
sequins
22
testdata
3+
.index
4+
.manifest

hdfs_sequins_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package main
33
import (
44
"encoding/json"
55
"github.com/colinmarc/hdfs"
6-
"github.com/stretchr/testify/require"
76
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
88
"github.com/stripe/sequins/backend"
99
"io/ioutil"
1010
"net/http"

index/index.go

Lines changed: 109 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,96 @@ func New(path string) *Index {
4343
return &index
4444
}
4545

46-
// BuildIndex reads each of the files and adds a key -> (file, offset) pair
47-
// to the master index for every key in every file.
48-
func (index *Index) BuildIndex() error {
46+
// Load reads each of the files and adds a key -> (file, offset) pair to the
47+
// master index for every key in every file. If a manifest file and index
48+
// already exist in the directory, it'll use that.
49+
func (index *Index) Load() error {
4950
err := index.buildFileList()
5051
if err != nil {
5152
return err
5253
}
5354

55+
// Try loading and checking the manifest first.
56+
manifestPath := filepath.Join(index.Path, ".manifest")
57+
manifest, err := readManifest(manifestPath)
58+
if err == nil {
59+
log.Println("Loading index from existing manifest at", manifestPath)
60+
err = index.loadIndexFromManifest(manifest)
61+
if err != nil {
62+
log.Println("Failed to load existing manifest with error:", err)
63+
} else {
64+
index.Ready = true
65+
return nil
66+
}
67+
}
68+
69+
err = index.buildNewIndex()
70+
if err != nil {
71+
return err
72+
}
73+
74+
index.Ready = true
75+
return nil
76+
}
77+
78+
func (index *Index) buildFileList() error {
79+
infos, err := ioutil.ReadDir(index.Path)
80+
if err != nil {
81+
return err
82+
}
83+
84+
index.files = make([]indexFile, 0, len(infos))
85+
index.readLocks = make([]sync.Mutex, len(infos))
86+
87+
for _, info := range infos {
88+
if !info.IsDir() && !strings.HasPrefix(info.Name(), "_") && !strings.HasPrefix(info.Name(), ".") {
89+
err := index.addFile(info.Name())
90+
if err != nil {
91+
return err
92+
}
93+
}
94+
}
95+
96+
return nil
97+
}
98+
99+
func (index *Index) loadIndexFromManifest(m manifest) error {
100+
for i, entry := range m.Files {
101+
indexFile := index.files[i]
102+
baseName := filepath.Base(indexFile.file.Name())
103+
if baseName != filepath.Base(entry.Name) {
104+
return fmt.Errorf("unmatched file: %s", entry.Name)
105+
}
106+
107+
crc, err := fileCrc(indexFile.file.Name())
108+
if err != nil {
109+
return err
110+
}
111+
112+
if crc != entry.CRC {
113+
return fmt.Errorf("local file %s has an invalid CRC, according to the manifest", baseName)
114+
}
115+
}
116+
54117
indexPath := filepath.Join(index.Path, ".index")
55-
err = os.RemoveAll(indexPath)
118+
info, err := os.Stat(indexPath)
119+
if err != nil || !info.IsDir() {
120+
return fmt.Errorf("missing or invalid ldb index at %s", indexPath)
121+
}
122+
123+
ldb, err := leveldb.OpenFile(indexPath, nil)
124+
if err != nil {
125+
return err
126+
}
127+
128+
index.ldb = ldb
129+
index.count = m.Count
130+
return nil
131+
}
132+
133+
func (index *Index) buildNewIndex() error {
134+
indexPath := filepath.Join(index.Path, ".index")
135+
err := os.RemoveAll(indexPath)
56136
if err != nil {
57137
return err
58138
}
@@ -76,29 +156,40 @@ func (index *Index) BuildIndex() error {
76156
log.Println("Finished indexing", path)
77157
}
78158

79-
index.Ready = true
159+
manifest, err := index.buildManifest()
160+
if err != nil {
161+
return fmt.Errorf("error building manifest: %s", err)
162+
}
163+
164+
manifestPath := filepath.Join(index.Path, ".manifest")
165+
log.Println("Writing manifest file to", manifestPath)
166+
err = writeManifest(manifestPath, manifest)
167+
if err != nil {
168+
return fmt.Errorf("error writing manifest: %s", err)
169+
}
170+
80171
return nil
81172
}
82173

83-
func (index *Index) buildFileList() error {
84-
infos, err := ioutil.ReadDir(index.Path)
85-
if err != nil {
86-
return err
174+
func (index *Index) buildManifest() (manifest, error) {
175+
m := manifest{
176+
Files: make([]manifestEntry, len(index.files)),
177+
Count: index.count,
87178
}
88179

89-
index.files = make([]indexFile, 0, len(infos))
90-
index.readLocks = make([]sync.Mutex, len(infos))
180+
for i, f := range index.files {
181+
crc, err := fileCrc(f.file.Name())
182+
if err != nil {
183+
return m, err
184+
}
91185

92-
for _, info := range infos {
93-
if !info.IsDir() && !strings.HasPrefix(info.Name(), "_") {
94-
err := index.addFile(info.Name())
95-
if err != nil {
96-
return err
97-
}
186+
m.Files[i] = manifestEntry{
187+
Name: filepath.Base(f.file.Name()),
188+
CRC: crc,
98189
}
99190
}
100191

101-
return nil
192+
return m, nil
102193
}
103194

104195
func (index *Index) addFile(subPath string) error {

index/index_test.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,49 @@ package index
33
import (
44
"github.com/stretchr/testify/assert"
55
"github.com/stretchr/testify/require"
6+
"os"
67
"testing"
78
)
89

9-
func TestDB(t *testing.T) {
10+
func TestIndex(t *testing.T) {
11+
os.Remove("../test_data/0/.manifest")
1012
index := New("../test_data/0")
11-
err := index.BuildIndex()
12-
require.Nil(t, err)
13-
if err != nil {
14-
t.FailNow()
15-
}
13+
err := index.Load()
14+
require.NoError(t, err)
1615

1716
assert.Equal(t, index.Path, "../test_data/0")
1817
assert.Equal(t, len(index.files), 2)
1918
assert.Equal(t, index.files[0].file.Name(), "../test_data/0/part-00000")
2019
assert.Equal(t, index.files[1].file.Name(), "../test_data/0/part-00001")
2120

2221
val, err := index.Get("Alice")
23-
require.Nil(t, err)
22+
require.NoError(t, err)
2423
assert.Equal(t, string(val), "Practice")
2524

2625
val, err = index.Get("foo")
2726
assert.Equal(t, ErrNotFound, err)
2827

2928
count, err := index.Count()
30-
require.Nil(t, err)
29+
require.NoError(t, err)
3130
assert.Equal(t, 3, count)
3231
}
32+
33+
func TestIndexManifest(t *testing.T) {
34+
os.Remove("../test_data/0/.manifest")
35+
index := New("../test_data/0")
36+
err := index.Load()
37+
require.NoError(t, err)
38+
39+
count, err := index.Count()
40+
require.NoError(t, err)
41+
assert.Equal(t, 3, count)
42+
43+
index.Close()
44+
index = New("../test_data/0")
45+
err = index.Load()
46+
require.NoError(t, err)
47+
48+
newCount, err := index.Count()
49+
require.NoError(t, err)
50+
assert.Equal(t, count, newCount)
51+
}

index/manifest.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package index
2+
3+
import (
4+
"encoding/json"
5+
"hash/crc32"
6+
"io"
7+
"io/ioutil"
8+
"os"
9+
)
10+
11+
type manifest struct {
12+
Files []manifestEntry `json:"files"`
13+
Count int `json:"count"`
14+
}
15+
16+
type manifestEntry struct {
17+
Name string `json:"name"`
18+
CRC uint32 `json:"crc"`
19+
}
20+
21+
func readManifest(path string) (manifest, error) {
22+
m := manifest{}
23+
24+
reader, err := os.Open(path)
25+
if err != nil {
26+
return m, err
27+
}
28+
29+
defer reader.Close()
30+
bytes, err := ioutil.ReadAll(reader)
31+
if err != nil {
32+
return m, err
33+
}
34+
35+
err = json.Unmarshal(bytes, &m)
36+
return m, err
37+
}
38+
39+
func writeManifest(path string, m manifest) error {
40+
bytes, err := json.Marshal(m)
41+
if err != nil {
42+
return err
43+
}
44+
45+
writer, err := os.Create(path)
46+
if err != nil {
47+
return err
48+
}
49+
50+
defer writer.Close()
51+
_, err = writer.Write(bytes)
52+
return err
53+
}
54+
55+
func fileCrc(path string) (uint32, error) {
56+
file, err := os.Open(path)
57+
if err != nil {
58+
return 0, err
59+
}
60+
61+
hash := crc32.NewIEEE()
62+
_, err = io.Copy(hash, file)
63+
if err != nil {
64+
return 0, err
65+
}
66+
67+
return hash.Sum32(), nil
68+
}

sequins.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,23 +91,23 @@ func (s *sequins) refresh() error {
9191
}
9292

9393
if os.IsExist(err) {
94-
log.Printf("Version %s is already downloaded.", version)
94+
log.Printf("Version %s is already downloaded", version)
9595
} else {
96-
log.Printf("Downloading version %s from %s.", version, s.backend.DisplayPath(version))
96+
log.Printf("Downloading version %s from %s", version, s.backend.DisplayPath(version))
9797
err = s.backend.Download(version, path)
9898
if err != nil {
9999
return err
100100
}
101101
}
102102

103-
log.Printf("Building index over version %s at %s.", version, path)
103+
log.Printf("Preparing version %s at %s", version, path)
104104
index := index.New(path)
105-
err = index.BuildIndex()
105+
err = index.Load()
106106
if err != nil {
107107
return fmt.Errorf("Error while indexing: %s", err)
108108
}
109109

110-
log.Println("Switching to new version!")
110+
log.Printf("Switching to version %s!", version)
111111
oldIndex := s.index
112112
s.currentVersion = version
113113
s.index = index

sequins_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@ import (
77
"github.com/stripe/sequins/backend"
88
"net/http"
99
"net/http/httptest"
10+
"os"
1011
"testing"
1112
"time"
1213
)
1314

1415
func getSequins(t *testing.T, opts sequinsOptions) *sequins {
16+
os.RemoveAll("test_data/0/.manifest")
17+
os.RemoveAll("test_data/1/.manifest")
1518
backend := backend.NewLocalBackend("test_data")
1619
s := newSequins(backend, opts)
1720

@@ -51,7 +54,7 @@ func TestSequins(t *testing.T) {
5154
status := &status{}
5255
err := json.Unmarshal(w.Body.Bytes(), status)
5356
require.NoError(t, err)
54-
assert.Equal(t, 200, w.Code, 200)
57+
assert.Equal(t, 200, w.Code)
5558
assert.Equal(t, "test_data/1", status.Path)
5659
assert.True(t, status.Started >= now)
5760
assert.Equal(t, 3, status.Count)

0 commit comments

Comments
 (0)