Skip to content

Commit b8939aa

Browse files
fubhyclaude
andcommitted
node, chain: Add extensible compression support for RPC requests
- Replace boolean compression_enabled with Compression enum (None, Gzip) - Support per-provider compression configuration via "compression" field - Add placeholders for future compression methods (Brotli, Deflate) - Update transport layer to handle compression enum with match statement - Add comprehensive unit tests for compression configuration parsing - Update example configuration and documentation Configuration examples: compression = "gzip" # Enable gzip compression compression = "none" # Disable compression (default) Addresses issue #5671 with future-extensible design. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent d4ddfaf commit b8939aa

File tree

8 files changed

+228
-7
lines changed

8 files changed

+228
-7
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chain/ethereum/src/transport.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use graph::components::network_provider::ProviderName;
2-
use graph::endpoint::{EndpointMetrics, RequestLabels};
2+
use graph::endpoint::{Compression, EndpointMetrics, RequestLabels};
33
use jsonrpc_core::types::Call;
44
use jsonrpc_core::Value;
55

@@ -54,12 +54,25 @@ impl Transport {
5454
headers: graph::http::HeaderMap,
5555
metrics: Arc<EndpointMetrics>,
5656
provider: impl AsRef<str>,
57+
compression: Compression,
5758
) -> Self {
5859
// Unwrap: This only fails if something is wrong with the system's TLS config.
59-
let client = reqwest::Client::builder()
60-
.default_headers(headers)
61-
.build()
62-
.unwrap();
60+
let mut client_builder = reqwest::Client::builder().default_headers(headers);
61+
62+
match compression {
63+
Compression::Gzip => {
64+
// Enable gzip compression/decompression for requests and responses
65+
client_builder = client_builder.gzip(true);
66+
}
67+
Compression::None => {
68+
// No compression
69+
} // Future compression methods can be handled here:
70+
// Compression::Brotli => {
71+
// client_builder = client_builder.brotli(true);
72+
// }
73+
}
74+
75+
let client = client_builder.build().unwrap();
6376

6477
Transport::RPC {
6578
client: http::Http::with_client(client, rpc),

graph/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ chrono = "0.4.41"
2626
envconfig = "0.11.0"
2727
Inflector = "0.11.3"
2828
atty = "0.2"
29-
reqwest = { version = "0.12.15", features = ["json", "stream", "multipart"] }
29+
reqwest = { version = "0.12.15", features = ["json", "stream", "multipart", "gzip"] }
3030
ethabi = "17.2"
3131
hex = "0.4.3"
3232
http0 = { version = "0", package = "http" }

graph/src/endpoint.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
};
88

99
use prometheus::IntCounterVec;
10+
use serde::{Deserialize, Serialize};
1011
use slog::{warn, Logger};
1112

1213
use crate::components::network_provider::ProviderName;
@@ -17,6 +18,26 @@ use crate::{components::metrics::MetricsRegistry, data::value::Word};
1718
/// avoid locking since we don't need to modify the entire struture.
1819
type ProviderCount = Arc<HashMap<ProviderName, AtomicU64>>;
1920

21+
/// Compression methods for RPC transports
22+
#[derive(Copy, Clone, Debug, Deserialize, Serialize, PartialEq)]
23+
pub enum Compression {
24+
#[serde(rename = "none")]
25+
None,
26+
#[serde(rename = "gzip")]
27+
Gzip,
28+
// Future compression methods can be added here:
29+
// #[serde(rename = "brotli")]
30+
// Brotli,
31+
// #[serde(rename = "deflate")]
32+
// Deflate,
33+
}
34+
35+
impl Default for Compression {
36+
fn default() -> Self {
37+
Compression::None
38+
}
39+
}
40+
2041
/// This struct represents all the current labels except for the result
2142
/// which is added separately. If any new labels are necessary they should
2243
/// remain in the same order as added in [`EndpointMetrics::new`]
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# Plan: Implement Extensible Compression for RPC Requests
2+
3+
## Overview
4+
Add extensible compression support for Graph Node's outgoing RPC requests to upstream providers, configurable on a per-provider basis with future compression methods in mind.
5+
6+
## Implementation Steps (COMPLETED)
7+
8+
### 1. ✅ Create Compression Enum (`node/src/config.rs`)
9+
- Added `Compression` enum with `None` and `Gzip` variants
10+
- Commented placeholders for future compression methods (Brotli, Deflate)
11+
- Default implementation returns `Compression::None`
12+
13+
### 2. ✅ Update Configuration Structure (`node/src/config.rs`)
14+
- Replaced `compression_enabled: bool` with `compression: Compression` field in `Web3Provider` struct
15+
- Updated all existing code to use new enum
16+
- Added unit tests for both "gzip" and "none" compression options
17+
18+
### 3. ✅ Modify HTTP Transport (`chain/ethereum/src/transport.rs`)
19+
- Updated `Transport::new_rpc()` to accept `Compression` enum parameter
20+
- Implemented match statement for different compression types
21+
- Added comments showing where future compression methods can be added
22+
- Uses reqwest's `.gzip(true)` for automatic compression/decompression
23+
24+
### 4. ✅ Update Transport Creation (`node/src/chain.rs`)
25+
- Pass compression enum from config to transport
26+
- Updated logging to show compression method using debug format
27+
28+
### 5. ✅ Update Dependencies (`graph/Cargo.toml`)
29+
- Added "gzip" feature to reqwest dependency
30+
31+
### 6. ✅ Update Test Configuration
32+
- Updated `full_config.toml` example to use new enum format
33+
- Added comprehensive unit tests for compression parsing
34+
35+
## Configuration Examples
36+
37+
### Gzip Compression
38+
```toml
39+
[chains.mainnet]
40+
provider = [
41+
{
42+
label = "mainnet-rpc",
43+
details = {
44+
type = "web3",
45+
url = "http://rpc.example.com",
46+
features = ["archive"],
47+
compression = "gzip"
48+
}
49+
}
50+
]
51+
```
52+
53+
### No Compression (Default)
54+
```toml
55+
[chains.mainnet]
56+
provider = [
57+
{
58+
label = "mainnet-rpc",
59+
details = {
60+
type = "web3",
61+
url = "http://rpc.example.com",
62+
features = ["archive"],
63+
compression = "none" # or omit entirely
64+
}
65+
}
66+
]
67+
```
68+
69+
### Future Extension Example
70+
```rust
71+
// Future compression methods can be easily added:
72+
#[derive(Copy, Clone, Debug, Deserialize, Serialize, PartialEq)]
73+
pub enum Compression {
74+
#[serde(rename = "none")]
75+
None,
76+
#[serde(rename = "gzip")]
77+
Gzip,
78+
#[serde(rename = "brotli")]
79+
Brotli,
80+
#[serde(rename = "deflate")]
81+
Deflate,
82+
}
83+
84+
// And handled in transport:
85+
match compression {
86+
Compression::Gzip => client_builder = client_builder.gzip(true),
87+
Compression::Brotli => client_builder = client_builder.brotli(true),
88+
Compression::Deflate => client_builder = client_builder.deflate(true),
89+
Compression::None => {} // No compression
90+
}
91+
```
92+
93+
## Benefits of This Implementation
94+
- **Extensible**: Easy to add new compression methods without breaking changes
95+
- **Backward Compatible**: Defaults to no compression, existing configs work unchanged
96+
- **Type Safe**: Enum prevents invalid compression method strings
97+
- **Future Proof**: Clear pattern for adding Brotli, Deflate, etc.
98+
- **Per-Provider**: Each RPC provider can have different compression settings

node/resources/tests/full_config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ shard = "primary"
4848
provider = [
4949
{ label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] },
5050
{ label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }},
51+
{ label = "mainnet-2", details = { type = "web3", url = "http://rpc.mainnet.io", features = ["archive"], compression = "gzip" }},
5152
{ label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }},
5253
{ label = "substreams", details = { type = "substreams", url = "http://localhost:9000", features = [] }},
5354
]

node/src/chain.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,8 @@ pub async fn create_ethereum_networks_for_chain(
282282
logger,
283283
"Creating transport";
284284
"url" => &web3.url,
285-
"capabilities" => capabilities
285+
"capabilities" => capabilities,
286+
"compression" => ?web3.compression
286287
);
287288

288289
use crate::config::Transport::*;
@@ -293,6 +294,7 @@ pub async fn create_ethereum_networks_for_chain(
293294
web3.headers.clone(),
294295
endpoint_metrics.cheap_clone(),
295296
&provider.label,
297+
web3.compression,
296298
),
297299
Ipc => Transport::new_ipc(&web3.url).await,
298300
Ws => Transport::new_ws(&web3.url).await,

node/src/config.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use graph::{
22
anyhow::Error,
33
blockchain::BlockchainKind,
44
components::network_provider::ChainName,
5+
endpoint::Compression,
56
env::ENV_VARS,
67
firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN},
78
itertools::Itertools,
@@ -502,6 +503,7 @@ impl ChainSection {
502503
features,
503504
headers: Default::default(),
504505
rules: vec![],
506+
compression: Compression::None,
505507
}),
506508
};
507509
let entry = chains.entry(name.to_string()).or_insert_with(|| Chain {
@@ -705,6 +707,10 @@ pub struct Web3Provider {
705707

706708
#[serde(default, rename = "match")]
707709
rules: Vec<Web3Rule>,
710+
711+
/// Compression method for RPC requests and responses
712+
#[serde(default)]
713+
pub compression: Compression,
708714
}
709715

710716
impl Web3Provider {
@@ -901,6 +907,7 @@ impl<'de> Deserialize<'de> for Provider {
901907
.ok_or_else(|| serde::de::Error::missing_field("features"))?,
902908
headers: headers.unwrap_or_else(HeaderMap::new),
903909
rules: nodes,
910+
compression: Compression::None,
904911
}),
905912
};
906913

@@ -1307,6 +1314,7 @@ mod tests {
13071314
features: BTreeSet::new(),
13081315
headers: HeaderMap::new(),
13091316
rules: Vec::new(),
1317+
compression: Compression::None,
13101318
}),
13111319
},
13121320
actual
@@ -1333,6 +1341,7 @@ mod tests {
13331341
features: BTreeSet::new(),
13341342
headers: HeaderMap::new(),
13351343
rules: Vec::new(),
1344+
compression: Compression::None,
13361345
}),
13371346
},
13381347
actual
@@ -1440,6 +1449,7 @@ mod tests {
14401449
features,
14411450
headers,
14421451
rules: Vec::new(),
1452+
compression: Compression::None,
14431453
}),
14441454
},
14451455
actual
@@ -1465,6 +1475,7 @@ mod tests {
14651475
features: BTreeSet::new(),
14661476
headers: HeaderMap::new(),
14671477
rules: Vec::new(),
1478+
compression: Compression::None,
14681479
}),
14691480
},
14701481
actual
@@ -1834,6 +1845,7 @@ mod tests {
18341845
features: BTreeSet::new(),
18351846
headers: HeaderMap::new(),
18361847
rules: Vec::new(),
1848+
compression: Compression::None,
18371849
}),
18381850
},
18391851
actual
@@ -1846,6 +1858,66 @@ mod tests {
18461858
assert!(SubgraphLimit::Limit(10) > SubgraphLimit::Disabled);
18471859
}
18481860

1861+
#[test]
1862+
fn it_parses_web3_provider_with_compression() {
1863+
let actual = toml::from_str(
1864+
r#"
1865+
label = "compressed"
1866+
details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "gzip" }
1867+
"#,
1868+
)
1869+
.unwrap();
1870+
1871+
assert_eq!(
1872+
Provider {
1873+
label: "compressed".to_owned(),
1874+
details: ProviderDetails::Web3(Web3Provider {
1875+
transport: Transport::Rpc,
1876+
url: "http://localhost:8545".to_owned(),
1877+
features: {
1878+
let mut features = BTreeSet::new();
1879+
features.insert("archive".to_string());
1880+
features
1881+
},
1882+
headers: HeaderMap::new(),
1883+
rules: Vec::new(),
1884+
compression: Compression::Gzip,
1885+
}),
1886+
},
1887+
actual
1888+
);
1889+
}
1890+
1891+
#[test]
1892+
fn it_parses_web3_provider_with_no_compression() {
1893+
let actual = toml::from_str(
1894+
r#"
1895+
label = "uncompressed"
1896+
details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "none" }
1897+
"#,
1898+
)
1899+
.unwrap();
1900+
1901+
assert_eq!(
1902+
Provider {
1903+
label: "uncompressed".to_owned(),
1904+
details: ProviderDetails::Web3(Web3Provider {
1905+
transport: Transport::Rpc,
1906+
url: "http://localhost:8545".to_owned(),
1907+
features: {
1908+
let mut features = BTreeSet::new();
1909+
features.insert("archive".to_string());
1910+
features
1911+
},
1912+
headers: HeaderMap::new(),
1913+
rules: Vec::new(),
1914+
compression: Compression::None,
1915+
}),
1916+
},
1917+
actual
1918+
);
1919+
}
1920+
18491921
#[test]
18501922
fn duplicated_labels_are_not_allowed_within_chain() {
18511923
let mut actual = toml::from_str::<ChainSection>(

0 commit comments

Comments
 (0)