-
Notifications
You must be signed in to change notification settings - Fork 2.9k
add proto and s3 to client #3168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: chore/use-new-proto-types-2
Are you sure you want to change the base?
add proto and s3 to client #3168
Conversation
PR SummaryAdds RPC support to FuelClient for fetching/streaming blocks via block-aggregator and S3, and exposes rpc_address on FuelService; tests updated to use new client APIs.
Written by Cursor Bugbot for commit 54f45d6. This will update automatically on new commits. Configure here. |
crates/client/src/client.rs
Outdated
| @@ -1,3 +1,5 @@ | |||
| #![allow(unused_imports)] | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
| require_height: ConsistencyPolicy, | ||
| chain_state_info: ChainStateInfo, | ||
| #[cfg(feature = "rpc")] | ||
| rpc_client: Option<ProtoBlockAggregatorClient<Channel>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably a good chance to create a FuelClientBuilder now, all the constructors are duped code right now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's quite there. I'm a big fan of builders, but essentially it's just 1 optional param.
I've deduped the code (and one of them I didn't end up using so I removed): 5681c3e
|
|
||
| #[cfg(feature = "rpc")] | ||
| impl FuelClient { | ||
| fn rpc_client(&self) -> io::Result<ProtoBlockAggregatorClient<Channel>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
io::Result ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know. It's what all the other responses from the client give. I'm not sure why.
crates/client/src/client.rs
Outdated
| fn unzip_bytes(bytes: &[u8]) -> Vec<u8> { | ||
| let mut decoder = GzDecoder::new(bytes); | ||
| let mut output = Vec::new(); | ||
| decoder.read_to_end(&mut output).unwrap(); | ||
| output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not guaranteed that we used GzEncoder to push to s3, i think this part needs some reworking to make it a bit more stricter. you can check the header to see if it can be gzip decompressed ~ unwrap probably not a good idea here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are making 2 assumptions here:
- the bytes are the serialized ProtoBlock
- the bytes have been gzipped
Would you prefer the protobuf message to include compression and serialization enums to allow expansion of this in the future? Or we could add more clarfication to the Location type:
/// Nested message and enum types in `RemoteBlockResponse`.
pub mod remote_block_response {
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::large_enum_variant)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Location {
#[prost(message, tag = "1")]
Http(super::RemoteHttpEndpoint),
#[prost(message, tag = "2")]
S3(super::RemoteS3Bucket),
}
}
Maybe change S3 to S3ProtoGZip lol. Then we could just add new variants if they ever happen.
I understand the concern, but I'm also OK with the current design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should parse the HTTP response headers and check if the Content-Encoding header has been set. Then, if it's set to gzip, we ungzip it, otherwise we just assume it's not encoded.
This is similar to how reqwest works - https://github.com/seanmonstar/reqwest/blob/master/src/async_impl/client.rs#L1062
crates/client/src/client.rs
Outdated
| let mut url = reqwest::Url::parse(&raw_url) | ||
| .map_err(anyhow::Error::msg) | ||
| .with_context(|| format!("Invalid Fuel GraphQL URL: {raw_url}"))?; | ||
| url.set_path("/v1/graphql"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a valid assumption, other RPC providers like Quicknode and Ankr add prefixes onto the URL and don't support accessing the graphql endpoint from the root.
The URLs for both RPC & GraphQL should be explicitly passed in here, I don't think there is a way we can auto-configure this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the existing behavior. I'm not changing anything to do with the GraphQL server in this code. I think it would be a breaking change to change this here.
I did have a couple bugs in the way the rpc url was being set though, and I've fixed those.
crates/client/src/client.rs
Outdated
| .. | ||
| } = s3; | ||
| let s3_url = endpoint | ||
| .unwrap_or_else(|| "https://s3.amazonaws.com".to_string()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should not pass in manual endpoints if one is not provided. If something is not provided, let the S3 SDK figure it out based on the bucket name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crates/client/src/client.rs
Outdated
| async fn aws_client(url: &str) -> Client { | ||
| let credentials = DefaultCredentialsChain::builder().build().await; | ||
| let _aws_region = | ||
| std::env::var("AWS_REGION").expect("AWS_REGION env var must be set"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a valid assumption on the client side. Even when it's required, the AWS SDK itself will throw an error when it doesn't have the right data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Nice catch. I just moved a bunch of stuff over from the test code but didn't fix all of the differences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| let builder = aws_sdk_s3::config::Builder::from(&sdk_config); | ||
| let config = builder.force_path_style(true).build(); | ||
| Client::from_conf(config) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should create a global client that gets reused after the first init. Need to verify, but I'm pretty sure everything is thread safe in the AWS SDK, so cross-thread access shouldn't be a problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbh i don't think it's worth adding these dependencies if we can fetch using http
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbh i don't think it's worth adding these dependencies if we can fetch using http
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copying from my convo with Cursor below:
We handle each response in a stream and we'd have to clone the shared client into each thread of that stream:
.then(|res| async move {
let resp =
res.map_err(|e| io::Error::other(format!("RPC error: {:?}", e)))?;
Self::convert_block_response(resp).await
});We could wrap it in a Arc and clone and move it into each thread, but idk if we are getting much at that point. I don't think the aws client is particularly expensive to construct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbh i don't think it's worth adding these dependencies if we can fetch using http
It sounds like we have a whole stack of AWS credential management setup and we will be reusing that with this new code. That's more complicated than just simple HTTP requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to include a basic S3 client (not just HTTP) since we want to support requestor pays and potentially other authenticated calls. We don't need a ton of logic, but we do need basic AWS SDK support for authenticated calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a shared client. It looks like it's the intended use of the aws client, as it has an Arc inside and is Clone.
tests/tests/rpc_s3.rs
Outdated
| }; | ||
| use aws_sdk_s3::Client; | ||
| use flate2::read::GzDecoder; | ||
| // use flate2::read::GzDecoder; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| .config | ||
| .rpc_config | ||
| .clone() | ||
| .map(|config| config.addr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: RPC address uses config value not actual bound address
The rpc_address field is populated from config.rpc_config.addr (the configured address) rather than the actual bound address. In contrast, bound_address for GraphQL is obtained from runner.shared.graph_ql.bound_address (the actual bound address). If the RPC server is configured with port 0 (OS auto-assigns), rpc_address would contain port 0 instead of the actual assigned port. Tests pass because free_local_addr() returns a real port, but production use with port 0 would break client connection attempts using rpc_address.
crates/client/src/client.rs
Outdated
| Err(io::Error::other("Literal payloads are not supported yet")) | ||
| } | ||
| Payload::Bytes(bytes) => { | ||
| let proto_block = ProtoBlock::decode(bytes.as_slice()).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Inconsistent error handling causes panic on malformed data
The Payload::Bytes branch in convert_block_response uses .unwrap() on ProtoBlock::decode(), which will panic if the RPC server sends malformed protobuf bytes. This is inconsistent with the Payload::Remote branch at lines 1764-1767, which properly handles the decode error using map_err() and returns it. Since the function signature returns io::Result, malformed data from the bytes payload path causes a panic instead of returning a proper error.
| .config | ||
| .rpc_config | ||
| .clone() | ||
| .map(|config| config.addr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: RPC address reads from config instead of bound address
The rpc_address field is populated from the configuration (config.rpc_config.addr) rather than the actual bound address from the RPC server. This is inconsistent with how bound_address is obtained for GraphQL (line 169), which correctly reads from runner.shared.graph_ql.bound_address. While the current test helpers use free_local_addr() which pre-allocates a specific port, this creates a time-of-check-time-of-use race condition where another process could claim the port between allocation and binding. The RPC service should expose its actual bound address similar to the GraphQL pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine. We have two addresses here.
| "Failed to convert RPC block to internal block: {e:?}" | ||
| )) | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Block reconstruction uses empty message IDs and default root
The convert_block_response function calls fuel_block_from_protobuf with hardcoded empty msg_ids (&[]) and Bytes32::default() for event_inbox_root. The existing roundtrip test in the codebase demonstrates these parameters are required for correct block reconstruction - it passes actual values and asserts block == deserialized_block. For blocks containing L1 relay messages, passing empty values causes the reconstructed block's computed roots to differ from the original, resulting in an invalid block that won't match the original.
Additional Locations (1)
crates/client/src/client.rs
Outdated
| } else { | ||
| s3_client.ok_or(io::Error::other("No AWS client configured"))? | ||
| }; | ||
| tracing::info!("getting block from bucket: {} with key {}", bucket, key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd put this at debug - it's going to be very noisy if it's info
crates/client/src/client.rs
Outdated
| s3_client.ok_or(io::Error::other("No AWS client configured"))? | ||
| }; | ||
| tracing::debug!("getting block from bucket: {} with key {}", bucket, key); | ||
| let req = client.get_object().bucket(bucket).key(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to add a check for requester pays here & add support. Something like:
let req = if requester_pays {
req.request_payer(RequestPayer::Requester)
}
| "Failed to convert RPC block to internal block: {e:?}" | ||
| )) | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Hardcoded empty values cause incorrect block reconstruction
The convert_block_response function always passes empty msg_ids (&[]) and Bytes32::default() for event_inbox_root to fuel_block_from_protobuf. These parameters are used to regenerate the block header's message_outbox_root and event_inbox_root fields. For any block that originally had outbox messages or a non-default event_inbox_root, the reconstructed block will have an incorrect header with wrong merkle roots, resulting in a different block ID than the original. The protobuf format stores these computed values but the deserialization function ignores them and regenerates from the caller-provided parameters.
Description
This is adding support for accessing the new block aggregation RPC and subsequent S3 calls using the Fuel Client.