Skip to content

Commit

Permalink
Fix Commitments Check (#14493)
Browse files Browse the repository at this point in the history
* Fix Commitments Check

* `highestFinalizedEpoch`: Refactor (no functional change).

* `retrieveMissingDataColumnsFromPeers`: Fix logs.

* `VerifyDataColumnSidecarKZGProofs`: Optimise with capacity.

* Save data columns when initial syncing.

* `dataColumnSidecarsByRangeRPCHandler`: Add logs when a request enters.

* Improve logging.

* Improve logging.

* `peersWithDataColumns: Do not filter any more on peer head slot.

* Fix Nishant's comment.

---------

Co-authored-by: Manu NALEPA <[email protected]>
  • Loading branch information
nisdas and nalepae committed Oct 7, 2024
1 parent aa8dc8d commit 1488044
Show file tree
Hide file tree
Showing 25 changed files with 400 additions and 97 deletions.
4 changes: 3 additions & 1 deletion beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
return err
}
}
if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), b); err != nil {

nodeID := s.cfg.P2P.NodeID()
if err := avs.IsDataAvailable(ctx, nodeID, s.CurrentSlot(), b); err != nil {
return errors.Wrapf(err, "could not validate blob data availability at slot %d", b.Block().Slot())
}
args := &forkchoicetypes.BlockAndCheckpoints{Block: b.Block(),
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ func (s *Service) handleDA(
if err != nil {
return 0, err
}
if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), rob); err != nil {

nodeID := s.cfg.P2P.NodeID()
if err := avs.IsDataAvailable(ctx, nodeID, s.CurrentSlot(), rob); err != nil {
return 0, errors.Wrap(err, "could not validate blob data availability (AvailabilityStore.IsDataAvailable)")
}
} else {
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/blockchain/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service {
WithStateGen(stateGen),
WithPayloadIDCache(cache.NewPayloadIDCache()),
WithClockSynchronizer(startup.NewClockSynchronizer()),
WithP2PBroadcaster(&mockAccesser{}),
}

chainService, err := NewService(ctx, opts...)
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/blockchain/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func minimalTestService(t *testing.T, opts ...Option) (*Service, *testServiceReq
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
WithSyncChecker(mock.MockChecker{}),
WithExecutionEngineCaller(&mockExecution.EngineClient{}),
WithP2PBroadcaster(&mockAccesser{}),
}
// append the variadic opts so they override the defaults by being processed afterwards
opts = append(defOpts, opts...)
Expand Down
18 changes: 12 additions & 6 deletions beacon-chain/core/peerdas/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func CustodyColumnSubnets(nodeId enode.ID, custodySubnetCount uint64) (map[uint6
func CustodyColumns(nodeId enode.ID, custodySubnetCount uint64) (map[uint64]bool, error) {
dataColumnSidecarSubnetCount := params.BeaconConfig().DataColumnSidecarSubnetCount

// Compute the custodied subnets.
// Compute the custody subnets.
subnetIds, err := CustodyColumnSubnets(nodeId, custodySubnetCount)
if err != nil {
return nil, errors.Wrap(err, "custody subnets")
Expand Down Expand Up @@ -408,17 +408,23 @@ func DataColumnSidecarsForReconstruct(
// VerifyDataColumnSidecarKZGProofs verifies the provided KZG Proofs for the particular
// data column.
func VerifyDataColumnSidecarKZGProofs(sc blocks.RODataColumn) (bool, error) {
if sc.ColumnIndex >= params.BeaconConfig().NumberOfColumns {
numberOfColumns := params.BeaconConfig().NumberOfColumns

if sc.ColumnIndex >= numberOfColumns {
return false, errIndexTooLarge
}

if len(sc.DataColumn) != len(sc.KzgCommitments) || len(sc.KzgCommitments) != len(sc.KzgProof) {
return false, errMismatchLength
}

var commitments []kzg.Bytes48
var indices []uint64
var cells []kzg.Cell
var proofs []kzg.Bytes48
count := len(sc.DataColumn)

commitments := make([]kzg.Bytes48, 0, count)
indices := make([]uint64, 0, count)
cells := make([]kzg.Cell, 0, count)
proofs := make([]kzg.Bytes48, 0, count)

for i := range sc.DataColumn {
commitments = append(commitments, kzg.Bytes48(sc.KzgCommitments[i]))
indices = append(indices, sc.ColumnIndex)
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/das/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/das",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
Expand All @@ -30,13 +31,15 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"availability_columns_test.go",
"availability_test.go",
"cache_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
Expand All @@ -45,6 +48,7 @@ go_test(
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
)
3 changes: 2 additions & 1 deletion beacon-chain/das/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/p2p/enode"
errors "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (s *LazilyPersistentStore) Persist(current primitives.Slot, sc ...blocks.RO

// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified.
// BlobSidecars already in the db are assumed to have been previously verified against the block.
func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, _ enode.ID, current primitives.Slot, b blocks.ROBlock) error {
blockCommitments, err := commitmentsToCheck(b, current)
if err != nil {
return errors.Wrapf(err, "could check data availability for block %#x", b.Root())
Expand Down
105 changes: 77 additions & 28 deletions beacon-chain/das/availability_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ethereum/go-ethereum/p2p/enode"
errors "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/params"
Expand Down Expand Up @@ -75,39 +76,58 @@ func (s *LazilyPersistentStoreColumn) PersistColumns(current primitives.Slot, sc

// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified.
// BlobSidecars already in the db are assumed to have been previously verified against the block.
func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
blockCommitments, err := fullCommitmentsToCheck(b, current)
func (s *LazilyPersistentStoreColumn) IsDataAvailable(
ctx context.Context,
nodeID enode.ID,
currentSlot primitives.Slot,
block blocks.ROBlock,
) error {
blockCommitments, err := fullCommitmentsToCheck(nodeID, block, currentSlot)
if err != nil {
return errors.Wrapf(err, "could check data availability for block %#x", b.Root())
return errors.Wrapf(err, "full commitments to check with block root `%#x` and current slot `%d`", block.Root(), currentSlot)
}
// Return early for blocks that are pre-deneb or which do not have any commitments.

// Return early for blocks that do not have any commitments.
if blockCommitments.count() == 0 {
return nil
}

key := keyFromBlock(b)
// Build the cache key for the block.
key := keyFromBlock(block)

// Retrieve the cache entry for the block, or create an empty one if it doesn't exist.
entry := s.cache.ensure(key)

// Delete the cache entry for the block at the end.
defer s.cache.delete(key)
root := b.Root()
sumz, err := s.store.WaitForSummarizer(ctx)

// Get the root of the block.
blockRoot := block.Root()

// Wait for the summarizer to be ready before proceeding.
summarizer, err := s.store.WaitForSummarizer(ctx)
if err != nil {
log.WithField("root", fmt.Sprintf("%#x", b.Root())).
log.
WithField("root", fmt.Sprintf("%#x", blockRoot)).
WithError(err).
Debug("Failed to receive BlobStorageSummarizer within IsDataAvailable")
} else {
entry.setDiskSummary(sumz.Summary(root))
// Get the summary for the block, and set it in the cache entry.
summary := summarizer.Summary(blockRoot)
entry.setDiskSummary(summary)
}

// Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent.
// We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather
// ignore their response and decrease their peer score.
sidecars, err := entry.filterColumns(root, &blockCommitments)
sidecars, err := entry.filterColumns(blockRoot, blockCommitments)
if err != nil {
return errors.Wrap(err, "incomplete BlobSidecar batch")
}
// Do thorough verifications of each BlobSidecar for the block.
// Same as above, we don't save BlobSidecars if there are any problems with the batch.
vscs, err := s.verifier.VerifiedRODataColumns(ctx, b, sidecars)

// Do thorough verifications of each RODataColumns for the block.
// Same as above, we don't save DataColumnsSidecars if there are any problems with the batch.
vscs, err := s.verifier.VerifiedRODataColumns(ctx, block, sidecars)
if err != nil {
var me verification.VerificationMultiError
ok := errors.As(err, &me)
Expand All @@ -120,33 +140,62 @@ func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, curre
log.WithFields(lf).
Debug("invalid ColumnSidecars received")
}
return errors.Wrapf(err, "invalid ColumnSidecars received for block %#x", root)
return errors.Wrapf(err, "invalid ColumnSidecars received for block %#x", blockRoot)
}

// Ensure that each column sidecar is written to disk.
for i := range vscs {
if err := s.store.SaveDataColumn(vscs[i]); err != nil {
return errors.Wrapf(err, "failed to save ColumnSidecar index %d for block %#x", vscs[i].ColumnIndex, root)
return errors.Wrapf(err, "save data columns for index `%d` for block `%#x`", vscs[i].ColumnIndex, blockRoot)
}
}
// All ColumnSidecars are persisted - da check succeeds.

// All ColumnSidecars are persisted - data availability check succeeds.
return nil
}

func fullCommitmentsToCheck(b blocks.ROBlock, current primitives.Slot) (safeCommitmentsArray, error) {
var ar safeCommitmentsArray
if b.Version() < version.Deneb {
return ar, nil
// fullCommitmentsToCheck returns the commitments to check for a given block.
func fullCommitmentsToCheck(nodeID enode.ID, block blocks.ROBlock, currentSlot primitives.Slot) (*safeCommitmentsArray, error) {
// Return early for blocks that are pre-deneb.
if block.Version() < version.Deneb {
return &safeCommitmentsArray{}, nil
}
// We are only required to check within MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS
if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) {
return ar, nil

// Compute the block epoch.
blockSlot := block.Block().Slot()
blockEpoch := slots.ToEpoch(blockSlot)

// Compute the current spoch.
currentEpoch := slots.ToEpoch(currentSlot)

// Return early if the request is out of the MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS window.
if !params.WithinDAPeriod(blockEpoch, currentEpoch) {
return &safeCommitmentsArray{}, nil
}
kc, err := b.Block().Body().BlobKzgCommitments()

// Retrieve the KZG commitments for the block.
kzgCommitments, err := block.Block().Body().BlobKzgCommitments()
if err != nil {
return ar, err
return nil, errors.Wrap(err, "blob KZG commitments")
}
for i := range ar {
copy(ar[i], kc)

// Return early if there are no commitments in the block.
if len(kzgCommitments) == 0 {
return &safeCommitmentsArray{}, nil
}
return ar, nil

// Retrieve the custody columns.
custodySubnetCount := peerdas.CustodySubnetCount()
custodyColumns, err := peerdas.CustodyColumns(nodeID, custodySubnetCount)
if err != nil {
return nil, errors.Wrap(err, "custody columns")
}

// Create a safe commitments array for the custody columns.
commitmentsArray := &safeCommitmentsArray{}
for column := range custodyColumns {
commitmentsArray[column] = kzgCommitments
}

return commitmentsArray, nil
}
94 changes: 94 additions & 0 deletions beacon-chain/das/availability_columns_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package das

import (
"testing"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)

func TestFullCommitmentsToCheck(t *testing.T) {
windowSlots, err := slots.EpochEnd(params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest)
require.NoError(t, err)
commits := [][]byte{
bytesutil.PadTo([]byte("a"), 48),
bytesutil.PadTo([]byte("b"), 48),
bytesutil.PadTo([]byte("c"), 48),
bytesutil.PadTo([]byte("d"), 48),
}
cases := []struct {
name string
commits [][]byte
block func(*testing.T) blocks.ROBlock
slot primitives.Slot
err error
}{
{
name: "pre deneb",
block: func(t *testing.T) blocks.ROBlock {
bb := util.NewBeaconBlockBellatrix()
sb, err := blocks.NewSignedBeaconBlock(bb)
require.NoError(t, err)
rb, err := blocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
},
{
name: "commitments within da",
block: func(t *testing.T) blocks.ROBlock {
d := util.NewBeaconBlockDeneb()
d.Block.Body.BlobKzgCommitments = commits
d.Block.Slot = 100
sb, err := blocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := blocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
commits: commits,
slot: 100,
},
{
name: "commitments outside da",
block: func(t *testing.T) blocks.ROBlock {
d := util.NewBeaconBlockDeneb()
// block is from slot 0, "current slot" is window size +1 (so outside the window)
d.Block.Body.BlobKzgCommitments = commits
sb, err := blocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := blocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
slot: windowSlots + 1,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.SubscribeToAllSubnets = true
flags.Init(gFlags)
defer flags.Init(resetFlags)

b := c.block(t)
co, err := fullCommitmentsToCheck(enode.ID{}, b, c.slot)
if c.err != nil {
require.ErrorIs(t, err, c.err)
} else {
require.NoError(t, err)
}
for i := 0; i < len(co); i++ {
require.DeepEqual(t, c.commits, co[i])
}
})
}
}
Loading

0 comments on commit 1488044

Please sign in to comment.