Skip to content

Conversation

@MitchTurner
Copy link
Member

@MitchTurner MitchTurner commented Dec 16, 2025

Description

This is adding support for accessing the new block aggregation RPC and subsequent S3 calls using the Fuel Client.

@MitchTurner MitchTurner self-assigned this Dec 16, 2025
@MitchTurner MitchTurner added the no changelog Skip the CI check of the changelog modification label Dec 16, 2025
@MitchTurner MitchTurner marked this pull request as ready for review December 17, 2025 01:20
@MitchTurner MitchTurner requested review from a team, Dentosal, rymnc and xgreenx as code owners December 17, 2025 01:20
@cursor
Copy link

cursor bot commented Dec 17, 2025

PR Summary

Adds RPC support to FuelClient for fetching/streaming blocks via block-aggregator and S3, and exposes rpc_address on FuelService; tests updated to use new client APIs.

  • Client (crates/client):
    • Feature flag: New rpc feature with deps (fuel-core-block-aggregator-api, prost, tonic, aws-sdk-s3, aws-config, flate2).
    • RPC integration:
      • New constructor FuelClient::new_with_rpc(...) and optional fields rpc_client/aws_client.
      • New APIs: get_block_range(...), new_block_subscription(...), get_aggregated_height().
      • Handles block responses from bytes or remote S3; downloads via AWS S3 client and gzip-decompresses; converts protobuf blocks to internal types.
  • Service (crates/fuel-core):
    • Exposes FuelService::rpc_address (when rpc feature enabled).
  • Tests (tests/):
    • Update RPC/S3 tests to use FuelClient RPC methods and new streaming APIs.
    • Enable fuel-core-client/rpc in tests/Cargo.toml feature set.

Written by Cursor Bugbot for commit 54f45d6. This will update automatically on new commits. Configure here.

@@ -1,3 +1,5 @@
#![allow(unused_imports)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

require_height: ConsistencyPolicy,
chain_state_info: ChainStateInfo,
#[cfg(feature = "rpc")]
rpc_client: Option<ProtoBlockAggregatorClient<Channel>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably a good chance to create a FuelClientBuilder now, all the constructors are duped code right now

Copy link
Member Author

@MitchTurner MitchTurner Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's quite there. I'm a big fan of builders, but essentially it's just 1 optional param.

I've deduped the code (and one of them I didn't end up using so I removed): 5681c3e


#[cfg(feature = "rpc")]
impl FuelClient {
fn rpc_client(&self) -> io::Result<ProtoBlockAggregatorClient<Channel>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io::Result ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know. It's what all the other responses from the client give. I'm not sure why.

Comment on lines 1861 to 1865
fn unzip_bytes(bytes: &[u8]) -> Vec<u8> {
let mut decoder = GzDecoder::new(bytes);
let mut output = Vec::new();
decoder.read_to_end(&mut output).unwrap();
output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not guaranteed that we used GzEncoder to push to s3, i think this part needs some reworking to make it a bit more stricter. you can check the header to see if it can be gzip decompressed ~ unwrap probably not a good idea here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are making 2 assumptions here:

  • the bytes are the serialized ProtoBlock
  • the bytes have been gzipped

Would you prefer the protobuf message to include compression and serialization enums to allow expansion of this in the future? Or we could add more clarfication to the Location type:

/// Nested message and enum types in `RemoteBlockResponse`.
pub mod remote_block_response {
    #[derive(serde::Serialize, serde::Deserialize)]
    #[allow(clippy::large_enum_variant)]
    #[derive(Clone, PartialEq, ::prost::Oneof)]
    pub enum Location {
        #[prost(message, tag = "1")]
        Http(super::RemoteHttpEndpoint),
        #[prost(message, tag = "2")]
        S3(super::RemoteS3Bucket),
    }
}

Maybe change S3 to S3ProtoGZip lol. Then we could just add new variants if they ever happen.

I understand the concern, but I'm also OK with the current design.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should parse the HTTP response headers and check if the Content-Encoding header has been set. Then, if it's set to gzip, we ungzip it, otherwise we just assume it's not encoded.

This is similar to how reqwest works - https://github.com/seanmonstar/reqwest/blob/master/src/async_impl/client.rs#L1062

let mut url = reqwest::Url::parse(&raw_url)
.map_err(anyhow::Error::msg)
.with_context(|| format!("Invalid Fuel GraphQL URL: {raw_url}"))?;
url.set_path("/v1/graphql");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a valid assumption, other RPC providers like Quicknode and Ankr add prefixes onto the URL and don't support accessing the graphql endpoint from the root.

The URLs for both RPC & GraphQL should be explicitly passed in here, I don't think there is a way we can auto-configure this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the existing behavior. I'm not changing anything to do with the GraphQL server in this code. I think it would be a breaking change to change this here.

I did have a couple bugs in the way the rpc url was being set though, and I've fixed those.

..
} = s3;
let s3_url = endpoint
.unwrap_or_else(|| "https://s3.amazonaws.com".to_string());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not pass in manual endpoints if one is not provided. If something is not provided, let the S3 SDK figure it out based on the bucket name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async fn aws_client(url: &str) -> Client {
let credentials = DefaultCredentialsChain::builder().build().await;
let _aws_region =
std::env::var("AWS_REGION").expect("AWS_REGION env var must be set");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a valid assumption on the client side. Even when it's required, the AWS SDK itself will throw an error when it doesn't have the right data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Nice catch. I just moved a bunch of stuff over from the test code but didn't fix all of the differences.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let builder = aws_sdk_s3::config::Builder::from(&sdk_config);
let config = builder.force_path_style(true).build();
Client::from_conf(config)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should create a global client that gets reused after the first init. Need to verify, but I'm pretty sure everything is thread safe in the AWS SDK, so cross-thread access shouldn't be a problem

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh i don't think it's worth adding these dependencies if we can fetch using http

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh i don't think it's worth adding these dependencies if we can fetch using http

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying from my convo with Cursor below:

We handle each response in a stream and we'd have to clone the shared client into each thread of that stream:

            .then(|res| async move {
                let resp =
                    res.map_err(|e| io::Error::other(format!("RPC error: {:?}", e)))?;
                Self::convert_block_response(resp).await
            });

We could wrap it in a Arc and clone and move it into each thread, but idk if we are getting much at that point. I don't think the aws client is particularly expensive to construct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh i don't think it's worth adding these dependencies if we can fetch using http

It sounds like we have a whole stack of AWS credential management setup and we will be reusing that with this new code. That's more complicated than just simple HTTP requests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to include a basic S3 client (not just HTTP) since we want to support requestor pays and potentially other authenticated calls. We don't need a ton of logic, but we do need basic AWS SDK support for authenticated calls.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a shared client. It looks like it's the intended use of the aws client, as it has an Arc inside and is Clone.

};
use aws_sdk_s3::Client;
use flate2::read::GzDecoder;
// use flate2::read::GzDecoder;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Commented-out import left in test file

A commented-out import // use flate2::read::GzDecoder; appears to be leftover from the refactoring where the unzip_bytes function was moved from the test file into the FuelClient. This dead code was accidentally left in during the changes.

Fix in Cursor Fix in Web

.config
.rpc_config
.clone()
.map(|config| config.addr);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: RPC address uses config value not actual bound address

The rpc_address field is populated from config.rpc_config.addr (the configured address) rather than the actual bound address. In contrast, bound_address for GraphQL is obtained from runner.shared.graph_ql.bound_address (the actual bound address). If the RPC server is configured with port 0 (OS auto-assigns), rpc_address would contain port 0 instead of the actual assigned port. Tests pass because free_local_addr() returns a real port, but production use with port 0 would break client connection attempts using rpc_address.

Fix in Cursor Fix in Web

Err(io::Error::other("Literal payloads are not supported yet"))
}
Payload::Bytes(bytes) => {
let proto_block = ProtoBlock::decode(bytes.as_slice()).unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Inconsistent error handling causes panic on malformed data

The Payload::Bytes branch in convert_block_response uses .unwrap() on ProtoBlock::decode(), which will panic if the RPC server sends malformed protobuf bytes. This is inconsistent with the Payload::Remote branch at lines 1764-1767, which properly handles the decode error using map_err() and returns it. Since the function signature returns io::Result, malformed data from the bytes payload path causes a panic instead of returning a proper error.

Fix in Cursor Fix in Web

.config
.rpc_config
.clone()
.map(|config| config.addr);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: RPC address reads from config instead of bound address

The rpc_address field is populated from the configuration (config.rpc_config.addr) rather than the actual bound address from the RPC server. This is inconsistent with how bound_address is obtained for GraphQL (line 169), which correctly reads from runner.shared.graph_ql.bound_address. While the current test helpers use free_local_addr() which pre-allocates a specific port, this creates a time-of-check-time-of-use race condition where another process could claim the port between allocation and binding. The RPC service should expose its actual bound address similar to the GraphQL pattern.

Fix in Cursor Fix in Web

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine. We have two addresses here.

"Failed to convert RPC block to internal block: {e:?}"
))
},
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Block reconstruction uses empty message IDs and default root

The convert_block_response function calls fuel_block_from_protobuf with hardcoded empty msg_ids (&[]) and Bytes32::default() for event_inbox_root. The existing roundtrip test in the codebase demonstrates these parameters are required for correct block reconstruction - it passes actual values and asserts block == deserialized_block. For blocks containing L1 relay messages, passing empty values causes the reconstructed block's computed roots to differ from the original, resulting in an invalid block that won't match the original.

Additional Locations (1)

Fix in Cursor Fix in Web

} else {
s3_client.ok_or(io::Error::other("No AWS client configured"))?
};
tracing::info!("getting block from bucket: {} with key {}", bucket, key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put this at debug - it's going to be very noisy if it's info

s3_client.ok_or(io::Error::other("No AWS client configured"))?
};
tracing::debug!("getting block from bucket: {} with key {}", bucket, key);
let req = client.get_object().bucket(bucket).key(key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add a check for requester pays here & add support. Something like:

let req = if requester_pays {
    req.request_payer(RequestPayer::Requester)
}

"Failed to convert RPC block to internal block: {e:?}"
))
},
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Hardcoded empty values cause incorrect block reconstruction

The convert_block_response function always passes empty msg_ids (&[]) and Bytes32::default() for event_inbox_root to fuel_block_from_protobuf. These parameters are used to regenerate the block header's message_outbox_root and event_inbox_root fields. For any block that originally had outbox messages or a non-default event_inbox_root, the reconstructed block will have an incorrect header with wrong merkle roots, resulting in a different block ID than the original. The protobuf format stores these computed values but the deserialization function ignores them and regenerates from the caller-provided parameters.

Additional Locations (1)

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

no changelog Skip the CI check of the changelog modification

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants