diff --git a/beacon-chain/rpc/core/validator.go b/beacon-chain/rpc/core/validator.go index 472eb8265293..56c2b8461d59 100644 --- a/beacon-chain/rpc/core/validator.go +++ b/beacon-chain/rpc/core/validator.go @@ -384,7 +384,10 @@ func (s *Service) SubmitSignedAggregateSelectionProof( if agg == nil { return &RpcError{Err: errors.New("signed aggregate request can't be nil"), Reason: BadRequest} } - attAndProof := agg.AggregateAttestationAndProof() + attAndProof, err := agg.AggregateAttestationAndProof() + if err != nil { + return &RpcError{Err: err, Reason: BadRequest} + } if attAndProof == nil { return &RpcError{Err: errors.New("signed aggregate request can't be nil"), Reason: BadRequest} } diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index 448875c203c2..c020cb78480f 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -90,7 +90,11 @@ func (s *Service) processPendingAtts(ctx context.Context) error { func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.SignedAggregateAttAndProof) { for _, signedAtt := range attestations { - aggregate := signedAtt.AggregateAttestationAndProof().AggregateVal() + a, err := signedAtt.AggregateAttestationAndProof() + if err != nil { + continue + } + aggregate := a.AggregateVal() data := aggregate.GetData() // The pending attestations can arrive in both aggregated and unaggregated forms, // each from has distinct validation steps. @@ -107,7 +111,7 @@ func (s *Service) processAttestations(ctx context.Context, attestations []ethpb. log.WithError(err).Debug("Could not save aggregate attestation") continue } - s.setAggregatorIndexEpochSeen(data.Target.Epoch, signedAtt.AggregateAttestationAndProof().GetAggregatorIndex()) + s.setAggregatorIndexEpochSeen(data.Target.Epoch, a.GetAggregatorIndex()) // Broadcasting the signed attestation again once a node is able to process it. if err := s.cfg.p2p.Broadcast(ctx, signedAtt); err != nil { @@ -163,7 +167,12 @@ func (s *Service) processAttestations(ctx context.Context, attestations []ethpb. // that voted for that block root. The caller of this function is responsible // for not sending repeated attestations to the pending queue. func (s *Service) savePendingAtt(att ethpb.SignedAggregateAttAndProof) { - root := bytesutil.ToBytes32(att.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot) + a, err := att.AggregateAttestationAndProof() + if err != nil { + log.WithError(err).Debug("Could not get aggregate attestation and proof") + return + } + root := bytesutil.ToBytes32(a.AggregateVal().GetData().BeaconBlockRoot) s.pendingAttsLock.Lock() defer s.pendingAttsLock.Unlock() @@ -195,15 +204,23 @@ func (s *Service) savePendingAtt(att ethpb.SignedAggregateAttAndProof) { } func attsAreEqual(a, b ethpb.SignedAggregateAttAndProof) bool { + aAtt, err := a.AggregateAttestationAndProof() + if err != nil { + return false + } + bAtt, err := b.AggregateAttestationAndProof() + if err != nil { + return false + } if a.GetSignature() != nil { - return b.GetSignature() != nil && a.AggregateAttestationAndProof().GetAggregatorIndex() == b.AggregateAttestationAndProof().GetAggregatorIndex() + return b.GetSignature() != nil && aAtt.GetAggregatorIndex() == bAtt.GetAggregatorIndex() } if b.GetSignature() != nil { return false } - aAggregate := a.AggregateAttestationAndProof().AggregateVal() - bAggregate := b.AggregateAttestationAndProof().AggregateVal() + aAggregate := aAtt.AggregateVal() + bAggregate := bAtt.AggregateVal() aData := aAggregate.GetData() bData := bAggregate.GetData() @@ -235,7 +252,11 @@ func (s *Service) validatePendingAtts(ctx context.Context, slot primitives.Slot) for bRoot, atts := range s.blkRootToPendingAtts { for i := len(atts) - 1; i >= 0; i-- { - if slot >= atts[i].AggregateAttestationAndProof().AggregateVal().GetData().Slot+params.BeaconConfig().SlotsPerEpoch { + att, err := atts[i].AggregateAttestationAndProof() + if err != nil { + continue + } + if slot >= att.AggregateVal().GetData().Slot+params.BeaconConfig().SlotsPerEpoch { // Remove the pending attestation from the list in place. atts = append(atts[:i], atts[i+1:]...) } diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go index 06380ec948f2..9b7235a606d3 100644 --- a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go @@ -18,7 +18,11 @@ func (s *Service) beaconAggregateProofSubscriber(_ context.Context, msg proto.Me return fmt.Errorf("message was not type ethpb.SignedAggregateAttAndProof, type=%T", msg) } - aggregate := a.AggregateAttestationAndProof().AggregateVal() + agg, err := a.AggregateAttestationAndProof() + if err != nil { + return err + } + aggregate := agg.AggregateVal() if aggregate == nil || aggregate.GetData() == nil { return errors.New("nil aggregate") diff --git a/beacon-chain/sync/validate_aggregate_proof.go b/beacon-chain/sync/validate_aggregate_proof.go index ba32cbe11bc8..c0e90dd90fdb 100644 --- a/beacon-chain/sync/validate_aggregate_proof.go +++ b/beacon-chain/sync/validate_aggregate_proof.go @@ -52,11 +52,15 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms if !ok { return pubsub.ValidationReject, errors.Errorf("invalid message type: %T", raw) } - if m.AggregateAttestationAndProof() == nil { + agg, err := m.AggregateAttestationAndProof() + if err != nil { + return pubsub.ValidationIgnore, err + } + if agg == nil { return pubsub.ValidationReject, errNilMessage } - aggregate := m.AggregateAttestationAndProof().AggregateVal() + aggregate := agg.AggregateVal() data := aggregate.GetData() if err := helpers.ValidateNilAttestation(aggregate); err != nil { @@ -98,7 +102,7 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms } // Verify this is the first aggregate received from the aggregator with index and slot. - if s.hasSeenAggregatorIndexEpoch(data.Target.Epoch, m.AggregateAttestationAndProof().GetAggregatorIndex()) { + if s.hasSeenAggregatorIndexEpoch(data.Target.Epoch, agg.GetAggregatorIndex()) { return pubsub.ValidationIgnore, nil } // Check that the block being voted on isn't invalid. @@ -127,7 +131,7 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms return validationRes, err } - s.setAggregatorIndexEpochSeen(data.Target.Epoch, m.AggregateAttestationAndProof().GetAggregatorIndex()) + s.setAggregatorIndexEpochSeen(data.Target.Epoch, agg.GetAggregatorIndex()) msg.ValidatorData = m @@ -140,7 +144,10 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed ctx, span := trace.StartSpan(ctx, "sync.validateAggregatedAtt") defer span.End() - aggregateAndProof := signed.AggregateAttestationAndProof() + aggregateAndProof, err := signed.AggregateAttestationAndProof() + if err != nil { + return pubsub.ValidationIgnore, err + } aggregatorIndex := aggregateAndProof.GetAggregatorIndex() aggregate := aggregateAndProof.AggregateVal() data := aggregate.GetData() @@ -225,7 +232,11 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed func (s *Service) validateBlockInAttestation(ctx context.Context, satt ethpb.SignedAggregateAttAndProof) bool { // Verify the block being voted and the processed state is in beaconDB. The block should have passed validation if it's in the beaconDB. - blockRoot := bytesutil.ToBytes32(satt.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot) + a, err := satt.AggregateAttestationAndProof() + if err != nil { + return false + } + blockRoot := bytesutil.ToBytes32(a.AggregateVal().GetData().BeaconBlockRoot) if !s.hasBlockAndState(ctx, blockRoot) { // A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue. s.savePendingAtt(satt) @@ -333,7 +344,10 @@ func validateSelectionIndex( // This returns aggregator signature set which can be used to batch verify. func aggSigSet(s state.ReadOnlyBeaconState, a ethpb.SignedAggregateAttAndProof) (*bls.SignatureBatch, error) { - aggregateAndProof := a.AggregateAttestationAndProof() + aggregateAndProof, err := a.AggregateAttestationAndProof() + if err != nil { + return nil, err + } v, err := s.ValidatorAtIndex(aggregateAndProof.GetAggregatorIndex()) if err != nil { diff --git a/beacon-chain/sync/validate_beacon_attestation.go b/beacon-chain/sync/validate_beacon_attestation.go index 135812bb8454..059a53317f98 100644 --- a/beacon-chain/sync/validate_beacon_attestation.go +++ b/beacon-chain/sync/validate_beacon_attestation.go @@ -126,14 +126,14 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p if !ok { return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", ð.AttestationElectra{}, att) } - s.savePendingAtt(ð.SignedAggregateAttestationAndProofElectra{Message: ð.AggregateAttestationAndProofElectra{Aggregate: a}}) + s.savePendingAtt(a) } else { a, ok := att.(*eth.Attestation) // This will never fail in practice because we asserted the version if !ok { return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", ð.Attestation{}, att) } - s.savePendingAtt(ð.SignedAggregateAttestationAndProof{Message: ð.AggregateAttestationAndProof{Aggregate: a}}) + s.savePendingAtt(a) } return pubsub.ValidationIgnore, nil } diff --git a/proto/prysm/v1alpha1/attestation.go b/proto/prysm/v1alpha1/attestation.go index 81d37f3fbb7f..64cabfe9e03f 100644 --- a/proto/prysm/v1alpha1/attestation.go +++ b/proto/prysm/v1alpha1/attestation.go @@ -46,7 +46,7 @@ type SignedAggregateAttAndProof interface { ssz.Unmarshaler ssz.HashRoot Version() int - AggregateAttestationAndProof() AggregateAttAndProof + AggregateAttestationAndProof() (AggregateAttAndProof, error) GetSignature() []byte } @@ -135,6 +135,11 @@ func (a *Attestation) GetCommitteeIndex() (primitives.CommitteeIndex, error) { return a.Data.CommitteeIndex, nil } +func (a *Attestation) AggregateAttestationAndProof() (AggregateAttAndProof, error) { + //TODO implement me + panic("implement me") +} + // Version -- func (a *PendingAttestation) Version() int { return version.Phase0 @@ -222,6 +227,11 @@ func (a *AttestationElectra) GetCommitteeIndex() (primitives.CommitteeIndex, err return primitives.CommitteeIndex(uint64(indices[0])), nil } +func (a *AttestationElectra) AggregateAttestationAndProof() (AggregateAttAndProof, error) { + //TODO implement me + panic("implement me") +} + // Version -- func (a *IndexedAttestation) Version() int { return version.Phase0 @@ -341,8 +351,8 @@ func (a *SignedAggregateAttestationAndProof) Version() int { } // AggregateAttestationAndProof -- -func (a *SignedAggregateAttestationAndProof) AggregateAttestationAndProof() AggregateAttAndProof { - return a.Message +func (a *SignedAggregateAttestationAndProof) AggregateAttestationAndProof() (AggregateAttAndProof, error) { + return a.Message, nil } // Version -- @@ -351,6 +361,6 @@ func (a *SignedAggregateAttestationAndProofElectra) Version() int { } // AggregateAttestationAndProof -- -func (a *SignedAggregateAttestationAndProofElectra) AggregateAttestationAndProof() AggregateAttAndProof { - return a.Message +func (a *SignedAggregateAttestationAndProofElectra) AggregateAttestationAndProof() (AggregateAttAndProof, error) { + return a.Message, nil }