Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save pending beacon attestation without using aggregate form #14462

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion beacon-chain/rpc/core/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,10 @@ func (s *Service) SubmitSignedAggregateSelectionProof(
if agg == nil {
return &RpcError{Err: errors.New("signed aggregate request can't be nil"), Reason: BadRequest}
}
attAndProof := agg.AggregateAttestationAndProof()
attAndProof, err := agg.AggregateAttestationAndProof()
if err != nil {
return &RpcError{Err: err, Reason: BadRequest}
}
if attAndProof == nil {
return &RpcError{Err: errors.New("signed aggregate request can't be nil"), Reason: BadRequest}
}
Expand Down
35 changes: 28 additions & 7 deletions beacon-chain/sync/pending_attestations_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ func (s *Service) processPendingAtts(ctx context.Context) error {

func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.SignedAggregateAttAndProof) {
for _, signedAtt := range attestations {
aggregate := signedAtt.AggregateAttestationAndProof().AggregateVal()
a, err := signedAtt.AggregateAttestationAndProof()
if err != nil {
continue
}
aggregate := a.AggregateVal()
data := aggregate.GetData()
// The pending attestations can arrive in both aggregated and unaggregated forms,
// each from has distinct validation steps.
Expand All @@ -107,7 +111,7 @@ func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.
log.WithError(err).Debug("Could not save aggregate attestation")
continue
}
s.setAggregatorIndexEpochSeen(data.Target.Epoch, signedAtt.AggregateAttestationAndProof().GetAggregatorIndex())
s.setAggregatorIndexEpochSeen(data.Target.Epoch, a.GetAggregatorIndex())

// Broadcasting the signed attestation again once a node is able to process it.
if err := s.cfg.p2p.Broadcast(ctx, signedAtt); err != nil {
Expand Down Expand Up @@ -163,7 +167,12 @@ func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.
// that voted for that block root. The caller of this function is responsible
// for not sending repeated attestations to the pending queue.
func (s *Service) savePendingAtt(att ethpb.SignedAggregateAttAndProof) {
root := bytesutil.ToBytes32(att.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot)
a, err := att.AggregateAttestationAndProof()
if err != nil {
log.WithError(err).Debug("Could not get aggregate attestation and proof")
return
}
root := bytesutil.ToBytes32(a.AggregateVal().GetData().BeaconBlockRoot)

s.pendingAttsLock.Lock()
defer s.pendingAttsLock.Unlock()
Expand Down Expand Up @@ -195,15 +204,23 @@ func (s *Service) savePendingAtt(att ethpb.SignedAggregateAttAndProof) {
}

func attsAreEqual(a, b ethpb.SignedAggregateAttAndProof) bool {
aAtt, err := a.AggregateAttestationAndProof()
if err != nil {
return false
}
bAtt, err := b.AggregateAttestationAndProof()
if err != nil {
return false
}
if a.GetSignature() != nil {
return b.GetSignature() != nil && a.AggregateAttestationAndProof().GetAggregatorIndex() == b.AggregateAttestationAndProof().GetAggregatorIndex()
return b.GetSignature() != nil && aAtt.GetAggregatorIndex() == bAtt.GetAggregatorIndex()
}
if b.GetSignature() != nil {
return false
}

aAggregate := a.AggregateAttestationAndProof().AggregateVal()
bAggregate := b.AggregateAttestationAndProof().AggregateVal()
aAggregate := aAtt.AggregateVal()
bAggregate := bAtt.AggregateVal()
aData := aAggregate.GetData()
bData := bAggregate.GetData()

Expand Down Expand Up @@ -235,7 +252,11 @@ func (s *Service) validatePendingAtts(ctx context.Context, slot primitives.Slot)

for bRoot, atts := range s.blkRootToPendingAtts {
for i := len(atts) - 1; i >= 0; i-- {
if slot >= atts[i].AggregateAttestationAndProof().AggregateVal().GetData().Slot+params.BeaconConfig().SlotsPerEpoch {
att, err := atts[i].AggregateAttestationAndProof()
if err != nil {
continue
}
if slot >= att.AggregateVal().GetData().Slot+params.BeaconConfig().SlotsPerEpoch {
// Remove the pending attestation from the list in place.
atts = append(atts[:i], atts[i+1:]...)
}
Expand Down
6 changes: 5 additions & 1 deletion beacon-chain/sync/subscriber_beacon_aggregate_proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ func (s *Service) beaconAggregateProofSubscriber(_ context.Context, msg proto.Me
return fmt.Errorf("message was not type ethpb.SignedAggregateAttAndProof, type=%T", msg)
}

aggregate := a.AggregateAttestationAndProof().AggregateVal()
agg, err := a.AggregateAttestationAndProof()
if err != nil {
return err
}
aggregate := agg.AggregateVal()

if aggregate == nil || aggregate.GetData() == nil {
return errors.New("nil aggregate")
Expand Down
28 changes: 21 additions & 7 deletions beacon-chain/sync/validate_aggregate_proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,15 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
if !ok {
return pubsub.ValidationReject, errors.Errorf("invalid message type: %T", raw)
}
if m.AggregateAttestationAndProof() == nil {
agg, err := m.AggregateAttestationAndProof()
if err != nil {
return pubsub.ValidationIgnore, err
}
if agg == nil {
return pubsub.ValidationReject, errNilMessage
}

aggregate := m.AggregateAttestationAndProof().AggregateVal()
aggregate := agg.AggregateVal()
data := aggregate.GetData()

if err := helpers.ValidateNilAttestation(aggregate); err != nil {
Expand Down Expand Up @@ -98,7 +102,7 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
}

// Verify this is the first aggregate received from the aggregator with index and slot.
if s.hasSeenAggregatorIndexEpoch(data.Target.Epoch, m.AggregateAttestationAndProof().GetAggregatorIndex()) {
if s.hasSeenAggregatorIndexEpoch(data.Target.Epoch, agg.GetAggregatorIndex()) {
return pubsub.ValidationIgnore, nil
}
// Check that the block being voted on isn't invalid.
Expand Down Expand Up @@ -127,7 +131,7 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
return validationRes, err
}

s.setAggregatorIndexEpochSeen(data.Target.Epoch, m.AggregateAttestationAndProof().GetAggregatorIndex())
s.setAggregatorIndexEpochSeen(data.Target.Epoch, agg.GetAggregatorIndex())

msg.ValidatorData = m

Expand All @@ -140,7 +144,10 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed
ctx, span := trace.StartSpan(ctx, "sync.validateAggregatedAtt")
defer span.End()

aggregateAndProof := signed.AggregateAttestationAndProof()
aggregateAndProof, err := signed.AggregateAttestationAndProof()
if err != nil {
return pubsub.ValidationIgnore, err
}
aggregatorIndex := aggregateAndProof.GetAggregatorIndex()
aggregate := aggregateAndProof.AggregateVal()
data := aggregate.GetData()
Expand Down Expand Up @@ -225,7 +232,11 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed

func (s *Service) validateBlockInAttestation(ctx context.Context, satt ethpb.SignedAggregateAttAndProof) bool {
// Verify the block being voted and the processed state is in beaconDB. The block should have passed validation if it's in the beaconDB.
blockRoot := bytesutil.ToBytes32(satt.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot)
a, err := satt.AggregateAttestationAndProof()
if err != nil {
return false
}
blockRoot := bytesutil.ToBytes32(a.AggregateVal().GetData().BeaconBlockRoot)
if !s.hasBlockAndState(ctx, blockRoot) {
// A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue.
s.savePendingAtt(satt)
Expand Down Expand Up @@ -333,7 +344,10 @@ func validateSelectionIndex(

// This returns aggregator signature set which can be used to batch verify.
func aggSigSet(s state.ReadOnlyBeaconState, a ethpb.SignedAggregateAttAndProof) (*bls.SignatureBatch, error) {
aggregateAndProof := a.AggregateAttestationAndProof()
aggregateAndProof, err := a.AggregateAttestationAndProof()
if err != nil {
return nil, err
}

v, err := s.ValidatorAtIndex(aggregateAndProof.GetAggregatorIndex())
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/validate_beacon_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
if !ok {
return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", &eth.AttestationElectra{}, att)
}
s.savePendingAtt(&eth.SignedAggregateAttestationAndProofElectra{Message: &eth.AggregateAttestationAndProofElectra{Aggregate: a}})
s.savePendingAtt(a)
} else {
a, ok := att.(*eth.Attestation)
// This will never fail in practice because we asserted the version
if !ok {
return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", &eth.Attestation{}, att)
}
s.savePendingAtt(&eth.SignedAggregateAttestationAndProof{Message: &eth.AggregateAttestationAndProof{Aggregate: a}})
s.savePendingAtt(a)
}
return pubsub.ValidationIgnore, nil
}
Expand Down
20 changes: 15 additions & 5 deletions proto/prysm/v1alpha1/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type SignedAggregateAttAndProof interface {
ssz.Unmarshaler
ssz.HashRoot
Version() int
AggregateAttestationAndProof() AggregateAttAndProof
AggregateAttestationAndProof() (AggregateAttAndProof, error)
GetSignature() []byte
}

Expand Down Expand Up @@ -135,6 +135,11 @@ func (a *Attestation) GetCommitteeIndex() (primitives.CommitteeIndex, error) {
return a.Data.CommitteeIndex, nil
}

func (a *Attestation) AggregateAttestationAndProof() (AggregateAttAndProof, error) {
//TODO implement me
panic("implement me")
}

// Version --
func (a *PendingAttestation) Version() int {
return version.Phase0
Expand Down Expand Up @@ -222,6 +227,11 @@ func (a *AttestationElectra) GetCommitteeIndex() (primitives.CommitteeIndex, err
return primitives.CommitteeIndex(uint64(indices[0])), nil
}

func (a *AttestationElectra) AggregateAttestationAndProof() (AggregateAttAndProof, error) {
//TODO implement me
panic("implement me")
}

// Version --
func (a *IndexedAttestation) Version() int {
return version.Phase0
Expand Down Expand Up @@ -341,8 +351,8 @@ func (a *SignedAggregateAttestationAndProof) Version() int {
}

// AggregateAttestationAndProof --
func (a *SignedAggregateAttestationAndProof) AggregateAttestationAndProof() AggregateAttAndProof {
return a.Message
func (a *SignedAggregateAttestationAndProof) AggregateAttestationAndProof() (AggregateAttAndProof, error) {
return a.Message, nil
}

// Version --
Expand All @@ -351,6 +361,6 @@ func (a *SignedAggregateAttestationAndProofElectra) Version() int {
}

// AggregateAttestationAndProof --
func (a *SignedAggregateAttestationAndProofElectra) AggregateAttestationAndProof() AggregateAttAndProof {
return a.Message
func (a *SignedAggregateAttestationAndProofElectra) AggregateAttestationAndProof() (AggregateAttAndProof, error) {
return a.Message, nil
}
Loading