Skip to content

Commit 8eca12e

Browse files
authored
[Issue #1024] implement S3QS storage for intermediate data passing (#1060)
1 parent 048d2c2 commit 8eca12e

File tree

33 files changed

+1072
-664
lines changed

33 files changed

+1072
-664
lines changed

pixels-common/src/main/java/io/pixelsdb/pixels/common/physical/ObjectPath.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ else if (path.startsWith("/"))
7272
}
7373
this.valid = true;
7474
}
75-
else if (path.length() > 0)
75+
else if (!path.isEmpty())
7676
{
7777
// this is a bucket.
7878
this.bucket = path;

pixels-common/src/main/java/io/pixelsdb/pixels/common/physical/PhysicalWriter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@ public interface PhysicalWriter extends Closeable
4646
*/
4747
long append(ByteBuffer buffer) throws IOException;
4848

49+
/**
50+
* Append content to the file.
51+
* @param buffer content buffer
52+
* @return start offset of content in the file.
53+
* @throws IOException
54+
*/
55+
default long append(byte[] buffer) throws IOException
56+
{
57+
return append(buffer, 0, buffer.length);
58+
}
59+
4960
/**
5061
* Append content to the file.
5162
*

pixels-common/src/main/java/io/pixelsdb/pixels/common/physical/Storage.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ enum Scheme
4747
redis, // Redis
4848
gcs, // google cloud storage
4949
httpstream, // http (netty) based stream storage
50-
sqsstream, // AWS SQS+S3 based stream storage
50+
s3qs, // AWS SQS+S3 based storage for intermediate data shuffle
5151
mock; // mock
5252

5353
/**
@@ -160,7 +160,7 @@ public boolean equals(Scheme other)
160160
* By default, this method returns false.
161161
*
162162
* <p><b>Note:</b> if this method returns true, then getLocations
163-
* and getHosts must be override to return the physical locality
163+
* and getHosts must be overridden to return the physical locality
164164
* information of the given path.</p>
165165
* @return
166166
*/
@@ -228,8 +228,7 @@ default String[] getHosts(String path) throws IOException
228228
* @return
229229
* @throws IOException if path is a directory.
230230
*/
231-
DataOutputStream create(String path, boolean overwrite,
232-
int bufferSize) throws IOException;
231+
DataOutputStream create(String path, boolean overwrite, int bufferSize) throws IOException;
233232

234233
/**
235234
* For local fs, path is considered as local.
@@ -241,7 +240,7 @@ DataOutputStream create(String path, boolean overwrite,
241240
* @throws IOException if path is a directory.
242241
*/
243242
default DataOutputStream create(String path, boolean overwrite,
244-
int bufferSize, short replication) throws IOException
243+
int bufferSize, short replication) throws IOException
245244
{
246245
return create(path, overwrite, bufferSize);
247246
}

pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/ConfigFactory.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919
*/
2020
package io.pixelsdb.pixels.common.utils;
2121

22-
import java.io.FileInputStream;
2322
import java.io.IOException;
2423
import java.io.InputStream;
2524
import java.net.URL;
25+
import java.nio.file.Files;
26+
import java.nio.file.Paths;
2627
import java.util.HashMap;
2728
import java.util.Map;
2829
import java.util.Properties;
@@ -63,7 +64,7 @@ public interface UpdateCallback
6364
void update(String value);
6465
}
6566

66-
private Map<String, UpdateCallback> callbacks;
67+
private final Map<String, UpdateCallback> callbacks;
6768

6869
private ConfigFactory()
6970
{
@@ -111,7 +112,7 @@ private ConfigFactory()
111112
}
112113
else
113114
{
114-
in = new FileInputStream(pixelsConfig);
115+
in = Files.newInputStream(Paths.get(pixelsConfig));
115116
}
116117
} catch (IOException e)
117118
{
@@ -153,7 +154,7 @@ public synchronized void loadProperties(String propFilePath) throws IOException
153154
}
154155
else
155156
{
156-
in = new FileInputStream(propFilePath);
157+
in = Files.newInputStream(Paths.get(propFilePath));
157158
}
158159
this.prop.load(in);
159160
for (Map.Entry<String, UpdateCallback> entry : this.callbacks.entrySet())

pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ public final class Constants
3232
public static final int S3_BUFFER_SIZE = 8 * 1024 * 1024;
3333
public static final int REDIS_BUFFER_SIZE = 8 * 1024 * 1024;
3434
public static final int GCS_BUFFER_SIZE = 8 * 1024 * 1024;
35+
public static final int S3QS_BUFFER_SIZE = 8 * 1024 * 1024;
3536
public static final int HTTP_STREAM_BUFFER_SIZE = 8 * 1024 * 1024;
36-
public static final int SQS_STREAM_BUFFER_SIZE = 8 * 1024 * 1024;
3737
public static final int STREAM_READER_RG_BUFFER_SIZE = 1024 * 1024;
3838
public static final int STREAM_READER_RG_FOOTER_BUFFER_SIZE = 1024;
3939

pixels-common/src/main/resources/pixels.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ minio.path.prefix=s3://pixels/retina/
136136
redis.endpoint=localhost:6379
137137
redis.access.key=redis-user-dummy
138138
redis.secret.key=redis-password-dummy
139+
# the number of messages that the receiver tries to poll each time, it should be >= 1 and <= 10
140+
s3qs.poll.batch.size=3
139141

140142
###### query engine settings ######
141143

pixels-storage/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Queries will load and call the providers to get access to the underlying storage
1111
- `pixels-storage-mock` provides a mock storage system for debugging.
1212
- `pixels-storage-redis` provides redis key-value storage.
1313
- `pixels-storage-http` provides a streaming storage based on http (netty).
14-
- `pixels-storage-sqs` provides a streaming storage based on AWS SQS and S3.
14+
- `pixels-storage-s3qs` provides a storage based on AWS SQS and S3 for intermediate data shuffle.
1515

1616
## Usage
1717
Storage provider can be used in either of the following ways:

pixels-storage/pixels-storage-http/src/main/java/io/pixelsdb/pixels/storage/http/PhysicalHttpStreamWriter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.io.DataOutputStream;
2727
import java.io.IOException;
28+
import java.nio.Buffer;
2829
import java.nio.ByteBuffer;
2930

3031
/**
@@ -71,7 +72,14 @@ public long prepare(int length) throws IOException
7172
@Override
7273
public long append(ByteBuffer buffer) throws IOException
7374
{
74-
buffer.flip();
75+
/**
76+
* Issue #217:
77+
* For compatibility reasons if this code is compiled by jdk>=9 but executed in jvm8.
78+
*
79+
* In jdk8, ByteBuffer.flip() is extended from Buffer.flip(), but in jdk11, different kind of ByteBuffer
80+
* has its own flip implementation and may lead to errors.
81+
*/
82+
((Buffer)buffer).flip();
7583
int length = buffer.remaining();
7684
return append(buffer.array(), buffer.arrayOffset() + buffer.position(), length);
7785
}

pixels-storage/pixels-storage-s3/src/main/java/io/pixelsdb/pixels/storage/s3/AbstractS3.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ public boolean isDirectory(String path) throws IOException
477477
return new ObjectPath(path).isFolder;
478478
}
479479

480-
public S3Client getClient()
480+
public S3Client getS3Client()
481481
{
482482
return s3;
483483
}

pixels-storage/pixels-storage-s3/src/main/java/io/pixelsdb/pixels/storage/s3/AbstractS3Reader.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public AbstractS3Reader(Storage storage, String path) throws IOException
108108
this.length = this.s3.getStatus(path).getLength();
109109
this.numRequests = 1;
110110
this.position = 0L;
111-
this.client = this.s3.getClient();
111+
this.client = this.s3.getS3Client();
112112
this.enableAsync = Boolean.parseBoolean(ConfigFactory.Instance()
113113
.getProperty("s3.enable.async"));
114114
this.useAsyncClient = Boolean.parseBoolean(ConfigFactory.Instance()
@@ -137,8 +137,7 @@ public void seek(long desired) throws IOException
137137
position = desired;
138138
return;
139139
}
140-
throw new IOException("Desired offset " + desired +
141-
" is out of bound (" + 0 + "," + length + ")");
140+
throw new IOException("Desired offset " + desired + " is out of bound (" + 0 + "," + length + ")");
142141
}
143142

144143
@Override
@@ -158,7 +157,7 @@ public ByteBuffer readFully(int len) throws IOException
158157
client.getObject(request, ResponseTransformer.toBytes());
159158
this.numRequests++;
160159
this.position += len;
161-
return ByteBuffer.wrap(response.asByteArrayUnsafe());
160+
return response.asByteBuffer();
162161
} catch (Exception e)
163162
{
164163
throw new IOException("Failed to read object.", e);
@@ -168,15 +167,15 @@ public ByteBuffer readFully(int len) throws IOException
168167
@Override
169168
public void readFully(byte[] buffer) throws IOException
170169
{
171-
ByteBuffer byteBuffer = readFully(buffer.length);
172-
System.arraycopy(byteBuffer.array(), 0, buffer, 0, buffer.length);
170+
readFully(buffer, 0, buffer.length);
173171
}
174172

175173
@Override
176174
public void readFully(byte[] buffer, int off, int len) throws IOException
177175
{
178176
ByteBuffer byteBuffer = readFully(len);
179-
System.arraycopy(byteBuffer.array(), 0, buffer, off, len);
177+
// This is more efficient than byteBuffer.put(buffer, off, len).
178+
System.arraycopy(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), buffer, off, len);
180179
}
181180

182181
/**

0 commit comments

Comments
 (0)