Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use engine api get-blobs for block subscriber #14513

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- HTTP endpoint for PublishBlobs
- GetBlockV2, GetBlindedBlock, ProduceBlockV2, ProduceBlockV3: add Electra case.
- SSE implementation that sheds stuck clients. [pr](https://github.com/prysmaticlabs/prysm/pull/14413)
- Use engine api get-blobs for block subscriber

### Changed

Expand Down
1 change: 1 addition & 0 deletions beacon-chain/core/helpers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ go_test(
"validators_test.go",
"weak_subjectivity_test.go",
],
data = glob(["testdata/**"]),
embed = [":go_default_library"],
shard_count = 2,
tags = ["CI_race_detection"],
Expand Down
76 changes: 74 additions & 2 deletions beacon-chain/execution/engine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
pb "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -85,6 +86,8 @@ const (
GetPayloadBodiesByRangeV2 = "engine_getPayloadBodiesByRangeV2"
// ExchangeCapabilities request string for JSON-RPC.
ExchangeCapabilities = "engine_exchangeCapabilities"
// GetBlobsV1 request string for JSON-RPC.
GetBlobsV1 = "engine_getBlobsV1"
// Defines the seconds before timing out engine endpoints with non-block execution semantics.
defaultEngineTimeout = time.Second
)
Expand All @@ -99,16 +102,17 @@ type ForkchoiceUpdatedResponse struct {
ValidationError string `json:"validationError"`
}

// PayloadReconstructor defines a service that can reconstruct a full beacon
// Reconstructor defines a service that can reconstruct a full beacon
// block with an execution payload from a signed beacon block and a connection
// to an execution client's engine API.
type PayloadReconstructor interface {
type Reconstructor interface {
ReconstructFullBlock(
ctx context.Context, blindedBlock interfaces.ReadOnlySignedBeaconBlock,
) (interfaces.SignedBeaconBlock, error)
ReconstructFullBellatrixBlockBatch(
ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock,
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) ([]blocks.VerifiedROBlob, error)
}

// EngineCaller defines a client that can interact with an Ethereum
Expand Down Expand Up @@ -483,6 +487,16 @@ func (s *Service) HeaderByNumber(ctx context.Context, number *big.Int) (*types.H
return hdr, err
}

// GetBlobs returns the blob and proof from the execution engine for the given versioned hashes.
func (s *Service) GetBlobs(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProof, error) {
ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobs")
defer span.End()

result := make([]*pb.BlobAndProof, len(versionedHashes))
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV1, versionedHashes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the json methodset defined on this type, you probably need to define unmarshaling for the encoding to come through corrrectly.

return result, handleRPCError(err)
}

// ReconstructFullBlock takes in a blinded beacon block and reconstructs
// a beacon block with a full execution payload via the engine API.
func (s *Service) ReconstructFullBlock(
Expand Down Expand Up @@ -511,6 +525,64 @@ func (s *Service) ReconstructFullBellatrixBlockBatch(
return unb, nil
}

// ReconstructBlobSidecars reconstructs the blob sidecars for a given beacon block.
// It retrieves the KZG commitments from the block body, fetches the associated blobs and proofs,
// and constructs the corresponding verified read-only blob sidecars.
func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) ([]blocks.VerifiedROBlob, error) {
blockBody := block.Block().Body()

// Get KZG commitments from the block body
kzgCommitments, err := blockBody.BlobKzgCommitments()
if err != nil {
return nil, errors.Wrap(err, "could not get blob KZG commitments")
}

// Initialize KZG hashes and retrieve blobs
kzgHashes := make([]common.Hash, len(kzgCommitments))
blobs, err := s.GetBlobs(ctx, kzgHashes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check if the EL supports this API (cached exchange capabilities result?) before attempting to call it? Otherwise the logs could be noisy for anyone who upgrades to this version with an EL that does not support getBlobs.

if err != nil {
return nil, errors.Wrap(err, "could not get blobs")
}

// Get the block header and its hash tree root
header, err := block.Header()
if err != nil {
return nil, errors.Wrap(err, "could not get header")
}
// Loop through the blobs and reconstruct blob sidecars
verifiedBlobs := make([]blocks.VerifiedROBlob, 0, len(blobs))
for index, blob := range blobs {
if blob == nil {
continue
}

// Get the Merkle proof for the KZG commitment
proof, err := blocks.MerkleProofKZGCommitment(blockBody, index)
if err != nil {
return nil, errors.Wrap(err, "could not get Merkle proof KZG commitment")
}

// Create the BlobSidecar object
sidecar := &ethpb.BlobSidecar{
Index: uint64(index),
Blob: blob.Blob,
KzgCommitment: kzgCommitments[index],
KzgProof: blob.KzgProof,
SignedBlockHeader: header,
CommitmentInclusionProof: proof,
}

// Create a read-only blob with the header root
roBlob, err := blocks.NewROBlobWithRoot(sidecar, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "could not create RO blob with root")
}
verifiedBlobs = append(verifiedBlobs, blocks.NewVerifiedROBlob(roBlob))
Copy link
Contributor

@kasey kasey Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're calling NewVerifiedROBlob and minting VerifiedROBlobs here, rather than getting those values from a verifier. That is risky. Would it be possible to define a verifier for this? Maybe something like the initial sync verifier that can deal with a batch.

On that note, I don't see you verifying the commitment inclusion proof anywhere in this PR.

Terence reminded me we are constructing the proof so we don't have to verify it, derp.

}

return verifiedBlobs, nil
}

func fullPayloadFromPayloadBody(
header interfaces.ExecutionData, body *pb.ExecutionPayloadBody, bVersion int,
) (interfaces.ExecutionData, error) {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/execution/engine_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import (
)

var (
_ = PayloadReconstructor(&Service{})
_ = Reconstructor(&Service{})
_ = EngineCaller(&Service{})
_ = PayloadReconstructor(&Service{})
_ = Reconstructor(&Service{})
_ = EngineCaller(&mocks.EngineClient{})
)

Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/execution/testing/mock_engine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (e *EngineClient) ReconstructFullBellatrixBlockBatch(
return fullBlocks, nil
}

func (e *EngineClient) ReconstructBlobSidecars(context.Context, interfaces.ReadOnlySignedBeaconBlock, [32]byte) ([]blocks.VerifiedROBlob, error) {
return nil, nil
}

// GetTerminalBlockHash --
func (e *EngineClient) GetTerminalBlockHash(ctx context.Context, transitionTime uint64) ([]byte, bool, error) {
ttd := new(big.Int)
Expand Down
100 changes: 50 additions & 50 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
regularsync.WithStateGen(b.stateGen),
regularsync.WithSlasherAttestationsFeed(b.slasherAttestationsFeed),
regularsync.WithSlasherBlockHeadersFeed(b.slasherBlockHeadersFeed),
regularsync.WithPayloadReconstructor(web3Service),
regularsync.WithReconstructor(web3Service),
regularsync.WithClockWaiter(b.clockWaiter),
regularsync.WithInitialSyncComplete(initialSyncComplete),
regularsync.WithStateNotifier(b),
Expand Down Expand Up @@ -953,55 +953,55 @@ func (b *BeaconNode) registerRPCService(router *http.ServeMux) error {

p2pService := b.fetchP2P()
rpcService := rpc.NewService(b.ctx, &rpc.Config{
ExecutionEngineCaller: web3Service,
ExecutionPayloadReconstructor: web3Service,
Host: host,
Port: port,
BeaconMonitoringHost: beaconMonitoringHost,
BeaconMonitoringPort: beaconMonitoringPort,
CertFlag: cert,
KeyFlag: key,
BeaconDB: b.db,
Broadcaster: p2pService,
PeersFetcher: p2pService,
PeerManager: p2pService,
MetadataProvider: p2pService,
ChainInfoFetcher: chainService,
HeadFetcher: chainService,
CanonicalFetcher: chainService,
ForkFetcher: chainService,
ForkchoiceFetcher: chainService,
FinalizationFetcher: chainService,
BlockReceiver: chainService,
BlobReceiver: chainService,
AttestationReceiver: chainService,
GenesisTimeFetcher: chainService,
GenesisFetcher: chainService,
OptimisticModeFetcher: chainService,
AttestationsPool: b.attestationPool,
ExitPool: b.exitPool,
SlashingsPool: b.slashingsPool,
BLSChangesPool: b.blsToExecPool,
SyncCommitteeObjectPool: b.syncCommitteePool,
ExecutionChainService: web3Service,
ExecutionChainInfoFetcher: web3Service,
ChainStartFetcher: chainStartFetcher,
MockEth1Votes: mockEth1DataVotes,
SyncService: syncService,
DepositFetcher: depositFetcher,
PendingDepositFetcher: b.depositCache,
BlockNotifier: b,
StateNotifier: b,
OperationNotifier: b,
StateGen: b.stateGen,
EnableDebugRPCEndpoints: enableDebugRPCEndpoints,
MaxMsgSize: maxMsgSize,
BlockBuilder: b.fetchBuilderService(),
Router: router,
ClockWaiter: b.clockWaiter,
BlobStorage: b.BlobStorage,
TrackedValidatorsCache: b.trackedValidatorsCache,
PayloadIDCache: b.payloadIDCache,
ExecutionEngineCaller: web3Service,
ExecutionReconstructor: web3Service,
Host: host,
Port: port,
BeaconMonitoringHost: beaconMonitoringHost,
BeaconMonitoringPort: beaconMonitoringPort,
CertFlag: cert,
KeyFlag: key,
BeaconDB: b.db,
Broadcaster: p2pService,
PeersFetcher: p2pService,
PeerManager: p2pService,
MetadataProvider: p2pService,
ChainInfoFetcher: chainService,
HeadFetcher: chainService,
CanonicalFetcher: chainService,
ForkFetcher: chainService,
ForkchoiceFetcher: chainService,
FinalizationFetcher: chainService,
BlockReceiver: chainService,
BlobReceiver: chainService,
AttestationReceiver: chainService,
GenesisTimeFetcher: chainService,
GenesisFetcher: chainService,
OptimisticModeFetcher: chainService,
AttestationsPool: b.attestationPool,
ExitPool: b.exitPool,
SlashingsPool: b.slashingsPool,
BLSChangesPool: b.blsToExecPool,
SyncCommitteeObjectPool: b.syncCommitteePool,
ExecutionChainService: web3Service,
ExecutionChainInfoFetcher: web3Service,
ChainStartFetcher: chainStartFetcher,
MockEth1Votes: mockEth1DataVotes,
SyncService: syncService,
DepositFetcher: depositFetcher,
PendingDepositFetcher: b.depositCache,
BlockNotifier: b,
StateNotifier: b,
OperationNotifier: b,
StateGen: b.stateGen,
EnableDebugRPCEndpoints: enableDebugRPCEndpoints,
MaxMsgSize: maxMsgSize,
BlockBuilder: b.fetchBuilderService(),
Router: router,
ClockWaiter: b.clockWaiter,
BlobStorage: b.BlobStorage,
TrackedValidatorsCache: b.trackedValidatorsCache,
PayloadIDCache: b.payloadIDCache,
})

return b.services.RegisterService(rpcService)
Expand Down
48 changes: 24 additions & 24 deletions beacon-chain/rpc/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,30 +454,30 @@ func (s *Service) beaconEndpoints(
coreService *core.Service,
) []endpoint {
server := &beacon.Server{
CanonicalHistory: ch,
BeaconDB: s.cfg.BeaconDB,
AttestationsPool: s.cfg.AttestationsPool,
SlashingsPool: s.cfg.SlashingsPool,
ChainInfoFetcher: s.cfg.ChainInfoFetcher,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
BlockNotifier: s.cfg.BlockNotifier,
OperationNotifier: s.cfg.OperationNotifier,
Broadcaster: s.cfg.Broadcaster,
BlockReceiver: s.cfg.BlockReceiver,
StateGenService: s.cfg.StateGen,
Stater: stater,
Blocker: blocker,
OptimisticModeFetcher: s.cfg.OptimisticModeFetcher,
HeadFetcher: s.cfg.HeadFetcher,
TimeFetcher: s.cfg.GenesisTimeFetcher,
VoluntaryExitsPool: s.cfg.ExitPool,
V1Alpha1ValidatorServer: validatorServer,
SyncChecker: s.cfg.SyncService,
ExecutionPayloadReconstructor: s.cfg.ExecutionPayloadReconstructor,
BLSChangesPool: s.cfg.BLSChangesPool,
FinalizationFetcher: s.cfg.FinalizationFetcher,
ForkchoiceFetcher: s.cfg.ForkchoiceFetcher,
CoreService: coreService,
CanonicalHistory: ch,
BeaconDB: s.cfg.BeaconDB,
AttestationsPool: s.cfg.AttestationsPool,
SlashingsPool: s.cfg.SlashingsPool,
ChainInfoFetcher: s.cfg.ChainInfoFetcher,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
BlockNotifier: s.cfg.BlockNotifier,
OperationNotifier: s.cfg.OperationNotifier,
Broadcaster: s.cfg.Broadcaster,
BlockReceiver: s.cfg.BlockReceiver,
StateGenService: s.cfg.StateGen,
Stater: stater,
Blocker: blocker,
OptimisticModeFetcher: s.cfg.OptimisticModeFetcher,
HeadFetcher: s.cfg.HeadFetcher,
TimeFetcher: s.cfg.GenesisTimeFetcher,
VoluntaryExitsPool: s.cfg.ExitPool,
V1Alpha1ValidatorServer: validatorServer,
SyncChecker: s.cfg.SyncService,
ExecutionReconstructor: s.cfg.ExecutionReconstructor,
BLSChangesPool: s.cfg.BLSChangesPool,
FinalizationFetcher: s.cfg.FinalizationFetcher,
ForkchoiceFetcher: s.cfg.ForkchoiceFetcher,
CoreService: coreService,
}

const namespace = "beacon"
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/rpc/eth/beacon/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *Server) GetBlockV2(w http.ResponseWriter, r *http.Request) {

// Deal with block unblinding.
if blk.Version() >= version.Bellatrix && blk.IsBlinded() {
blk, err = s.ExecutionPayloadReconstructor.ReconstructFullBlock(ctx, blk)
blk, err = s.ExecutionReconstructor.ReconstructFullBlock(ctx, blk)
if err != nil {
httputil.HandleError(w, errors.Wrapf(err, "could not reconstruct full execution payload to create signed beacon block").Error(), http.StatusBadRequest)
return
Expand Down
48 changes: 24 additions & 24 deletions beacon-chain/rpc/eth/beacon/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,28 @@ import (
// Server defines a server implementation of the gRPC Beacon Chain service,
// providing RPC endpoints to access data relevant to the Ethereum Beacon Chain.
type Server struct {
BeaconDB db.ReadOnlyDatabase
ChainInfoFetcher blockchain.ChainInfoFetcher
GenesisTimeFetcher blockchain.TimeFetcher
BlockReceiver blockchain.BlockReceiver
BlockNotifier blockfeed.Notifier
OperationNotifier operation.Notifier
Broadcaster p2p.Broadcaster
AttestationsPool attestations.Pool
SlashingsPool slashings.PoolManager
VoluntaryExitsPool voluntaryexits.PoolManager
StateGenService stategen.StateManager
Stater lookup.Stater
Blocker lookup.Blocker
HeadFetcher blockchain.HeadFetcher
TimeFetcher blockchain.TimeFetcher
OptimisticModeFetcher blockchain.OptimisticModeFetcher
V1Alpha1ValidatorServer eth.BeaconNodeValidatorServer
SyncChecker sync.Checker
CanonicalHistory *stategen.CanonicalHistory
ExecutionPayloadReconstructor execution.PayloadReconstructor
FinalizationFetcher blockchain.FinalizationFetcher
BLSChangesPool blstoexec.PoolManager
ForkchoiceFetcher blockchain.ForkchoiceFetcher
CoreService *core.Service
BeaconDB db.ReadOnlyDatabase
ChainInfoFetcher blockchain.ChainInfoFetcher
GenesisTimeFetcher blockchain.TimeFetcher
BlockReceiver blockchain.BlockReceiver
BlockNotifier blockfeed.Notifier
OperationNotifier operation.Notifier
Broadcaster p2p.Broadcaster
AttestationsPool attestations.Pool
SlashingsPool slashings.PoolManager
VoluntaryExitsPool voluntaryexits.PoolManager
StateGenService stategen.StateManager
Stater lookup.Stater
Blocker lookup.Blocker
HeadFetcher blockchain.HeadFetcher
TimeFetcher blockchain.TimeFetcher
OptimisticModeFetcher blockchain.OptimisticModeFetcher
V1Alpha1ValidatorServer eth.BeaconNodeValidatorServer
SyncChecker sync.Checker
CanonicalHistory *stategen.CanonicalHistory
ExecutionReconstructor execution.Reconstructor
FinalizationFetcher blockchain.FinalizationFetcher
BLSChangesPool blstoexec.PoolManager
ForkchoiceFetcher blockchain.ForkchoiceFetcher
CoreService *core.Service
}
Loading
Loading