Skip to content

Commit 0754134

Browse files
mtolmanmatthewtolman
authored andcommitted
bug fix, code clean-up
Fixed bug in stream parser
1 parent e08aefc commit 0754134

File tree

4 files changed

+107
-24
lines changed

4 files changed

+107
-24
lines changed

src/column.zig

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,9 @@ pub fn Parser(comptime Reader: type) type {
277277
/// Returns the next row in the CSV file
278278
/// Heap memory is owend by the Row, so call Row.deinit()
279279
pub fn next(self: *@This()) ?Row {
280+
if (self._done) {
281+
return null;
282+
}
280283
return self.nextImpl() catch |err| {
281284
switch (err) {
282285
Error.EndOfInput => {
@@ -299,16 +302,22 @@ pub fn Parser(comptime Reader: type) type {
299302
}
300303

301304
if (self._buffer.done()) {
302-
return null;
305+
return Error.EndOfInput;
303306
}
304307

308+
std.debug.assert(!self._done);
309+
305310
var row = Row{
306311
._fields = std.ArrayList(RowField).init(self._allocator),
307312
._bytes = std.ArrayList(u8).init(self._allocator),
308313
};
309314

310-
// Doing this defer since we don't want to access a deinitialized
311-
// row when there's an error
315+
// Doing this defer since so we can track our max field for
316+
// fewer allocation in the future
317+
// Not worried about excess memory as much since rows should be
318+
// short-lived
319+
// Rows that are long-lived should have a `clone()` called which
320+
// will trim excess memory on copy
312321
defer {
313322
self._row_field_count = @max(self._row_field_count, row._fields.items.len);
314323
self._row_byte_count = @max(self._row_byte_count, row._bytes.items.len);

src/common.zig

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,13 @@ const std = @import("std");
2828
/// Unquotes a quoted CSV field
2929
/// Assumes the field is a valid CSV field
3030
pub fn unquoteQuoted(data: []const u8) []const u8 {
31-
if (data.len > 1 and data[0] == '"') {
31+
if (data.len > 2 and data[0] == '"') {
32+
// If we get here, we should be working on quoted CSV data
33+
std.debug.assert(data[data.len - 1] == '"');
3234
return data[1..(data.len - 1)];
3335
} else {
36+
// If we get here, we should be working on unquoted CSV data
37+
std.debug.assert(std.mem.indexOf(u8, data, "\"") == null);
3438
return data;
3539
}
3640
}

src/stream.zig

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ pub fn Parser(
7979

8080
/// Consume a character and move forward our reader
8181
fn consume(self: *@This()) ReadError!void {
82+
// While we should behave properly, we shouldn't be consuming
83+
// when we're done
84+
std.debug.assert(!self.done());
8285
self._cur = self._next;
8386

8487
// This checks to see if we already hit the end of the stream
@@ -128,31 +131,40 @@ pub fn Parser(
128131
return;
129132
}
130133

134+
// When we start next we always are at the start or middle of a row
131135
self._flags._row_end = false;
132136

133137
// We won't technically hit an infinite loop,
134138
// but we practically will since this is a lot
135139
const MAX_ITER = self._opts.max_iter;
136140
var index: usize = 0;
137-
for (0..(MAX_ITER + 1)) |i| {
138-
index = i;
139-
141+
while (index < MAX_ITER) : (index += 1) {
140142
if (self._flags._in_quote) {
141143
// Handle quoted strings
142144
if (self.current()) |cur| {
143145
switch (cur) {
144146
'"' => {
145-
if (self.peek() == ',' or self.peek() == '\n' or self.peek() == '\r') {
147+
if (self.peek()) |p| {
148+
switch (p) {
149+
',', '\r', '\n' => {
150+
self._flags._in_quote = false;
151+
try self.consume();
152+
continue;
153+
},
154+
'"' => {
155+
try self.consume();
156+
try self.consume();
157+
try writer.writeByte('"');
158+
},
159+
else => {
160+
return CsvReadError.QuotePrematurelyTerminated;
161+
},
162+
}
163+
} else {
146164
self._flags._in_quote = false;
147165
try self.consume();
148166
continue;
149167
}
150-
if (self.peek() != '"') {
151-
return CsvReadError.QuotePrematurelyTerminated;
152-
}
153-
try self.consume();
154-
try self.consume();
155-
try writer.writeByte('"');
156168
},
157169
else => |c| {
158170
try writer.writeByte(c);
@@ -380,3 +392,33 @@ test "crlf,\" at 63" {
380392

381393
try testing.expectEqual(fieldCount, cnt);
382394
}
395+
396+
test "End with quote" {
397+
const testing = @import("std").testing;
398+
var buff = std.ArrayList(u8).init(std.testing.allocator);
399+
defer buff.deinit();
400+
401+
var input = std.io.fixedBufferStream("\"hello, world\"");
402+
403+
const fieldCount = 1;
404+
405+
const reader = input.reader();
406+
var stream = init(
407+
@TypeOf(reader),
408+
@TypeOf(buff.writer()),
409+
reader,
410+
.{},
411+
.{},
412+
);
413+
414+
var cnt: usize = 0;
415+
while (!stream.done()) {
416+
try stream.next(buff.writer());
417+
defer {
418+
buff.clearRetainingCapacity();
419+
}
420+
cnt += 1;
421+
}
422+
423+
try testing.expectEqual(fieldCount, cnt);
424+
}

src/stream_fast.zig

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ pub fn Parser(comptime Reader: type, comptime Writer: type) type {
4545
}
4646

4747
pub fn atRowEnd(self: *const @This()) bool {
48-
const res = self.done() or self._row_end;
49-
return res;
48+
return self.done() or self._row_end;
5049
}
5150

5251
/// Returns if a parser is done
@@ -90,12 +89,18 @@ pub fn Parser(comptime Reader: type, comptime Writer: type) type {
9089
return;
9190
}
9291

92+
// Don't read from a reader when we're done
93+
std.debug.assert(!self.done());
9394
const amt = try self._reader.readAll(&self._state.next_bytes);
9495
self._state.next_bytes_len = amt;
9596
}
9697

9798
/// Gets the next CSV field
9899
pub fn next(self: *@This(), writer: Writer) Error!void {
100+
self._state.field_start = false;
101+
if (self.done()) {
102+
return;
103+
}
99104
self.nextImpl(writer) catch |err| {
100105
self._state._erred = true;
101106
return err;
@@ -104,24 +109,20 @@ pub fn Parser(comptime Reader: type, comptime Writer: type) type {
104109

105110
/// Gets the next CSV field
106111
fn nextImpl(self: *@This(), writer: Writer) !void {
107-
self._state.field_start = false;
112+
std.debug.assert(!self.done());
108113
// lazy init our parser
109114
if (self._state._need_init) {
110115
// Consume twice to populate our cur and next registers
111116
try self.nextChunk();
112117
self._state.cur_bytes_pos = 64;
113118
std.debug.assert(!self._state._need_init);
114-
} else if (self.done()) {
115-
return;
116119
}
117120

118121
self._row_end = false;
119122

120123
const MAX_ITER = self._opts.max_iter;
121124
var index: usize = 0;
122-
for (0..(MAX_ITER + 1)) |i| {
123-
index = i;
124-
125+
while (index < MAX_ITER) : (index += 1) {
125126
// If we have a field to print, print it
126127
if (self._state.field_separators != 0) {
127128
const chunk_end = @ctz(self._state.field_separators);
@@ -267,10 +268,9 @@ pub fn Parser(comptime Reader: type, comptime Writer: type) type {
267268
fn quotedRegions(m: u64) u64 {
268269
var x: u64 = m;
269270
var res: u64 = x;
270-
while (x != 0) {
271+
while (x != 0) : (x = x & x - 1) {
271272
const x1: u64 = @bitCast(-%@as(i64, @bitCast(x)));
272273
res = res ^ (x1 ^ x);
273-
x = x & x - 1;
274274
}
275275
return res;
276276
}
@@ -613,3 +613,31 @@ test "crlfa, at 63" {
613613

614614
try testing.expectEqual(fieldCount, cnt);
615615
}
616+
617+
test "End with quote" {
618+
const testing = @import("std").testing;
619+
var buff = std.ArrayList(u8).init(std.testing.allocator);
620+
defer buff.deinit();
621+
622+
var input = std.io.fixedBufferStream("\"hello, world\"");
623+
624+
const fieldCount = 1;
625+
626+
const reader = input.reader();
627+
var stream = init(
628+
reader,
629+
@TypeOf(buff.writer()),
630+
.{},
631+
);
632+
633+
var cnt: usize = 0;
634+
while (!stream.done()) {
635+
try stream.next(buff.writer());
636+
defer {
637+
buff.clearRetainingCapacity();
638+
}
639+
cnt += 1;
640+
}
641+
642+
try testing.expectEqual(fieldCount, cnt);
643+
}

0 commit comments

Comments
 (0)