Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix words #60784

Merged
merged 2 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions python/paddle/distributed/auto_parallel/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def dtensor_from_fn(fn, mesh, placements, *args, **kwargs):
*args (tuple): A tuple of arguments to be passed to the ``fn`` function.
**kwargs (dict): A dict of arguments to be passed to the ``fn`` function.

Retruns:
Returns:
Tensor: A Tensor constructed from ``fn`` with distributed attributes.

Examples:
Expand Down Expand Up @@ -304,7 +304,7 @@ def reshard(dist_tensor, mesh, placements):
be Shard, Replicate and Partial.

Returns:
Tensor: A Distributed Tensor reshared with distributed attributes.
Tensor: A Distributed Tensor resharded with distributed attributes.

Examples:
.. code-block:: python
Expand Down Expand Up @@ -465,7 +465,7 @@ def output_fn(outputs, process_mesh) -> list(paddle.Tensor)
>>> layer = dist.shard_layer(layer, mesh, shard_fn)
>>> print(layer)

>>> # This case need to be excuted in multi-card environment
>>> # This case need to be executed in multi-card environment
>>> # export CUDA_VISIBLE_DEVICES=0,1
>>> # python -m paddle.distributed.launch {test_case}.py
"""
Expand Down Expand Up @@ -642,7 +642,7 @@ def step(self):

def state_dict(self):
"""
Create and shard the optimizer states e.g., acumulators and master_weights before load_state_dict.
Create and shard the optimizer states e.g., accumulators and master_weights before load_state_dict.
If training has already started or the optimizer states are already created and sharded, do nothing.
"""
state_dict = self._inner_opt.state_dict()
Expand Down Expand Up @@ -1552,7 +1552,7 @@ def unshard_dtensor(dist_tensor):
dist_tensor
)
# in static mode, 'distributed tensor' and 'dense tensor' are all
# Varialble type, the distributed attribute is a property of the Varibale.
# Variable type, the distributed attribute is a property of the Variable.
# So, it's no need to convert the distributed tensor to a dense tensor.
# We only need to modify its distributed attribute.
empty_dist_attr = (
Expand Down
6 changes: 3 additions & 3 deletions python/paddle/distributed/auto_parallel/placement_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ def to_placements(dim_map, mesh, partial_idx=[]):

def check_placements_equal(this, that):
assert isinstance(this, list) and isinstance(that, list)
small_placemets = this if len(this) < len(that) else that
small_placements = this if len(this) < len(that) else that
large_placements = that if len(this) < len(that) else this
for i in range(len(large_placements)):
if i < len(small_placemets):
if small_placemets[i] != large_placements[i]:
if i < len(small_placements):
if small_placements[i] != large_placements[i]:
return False
else:
if large_placements[i] != Replicate():
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/auto_parallel/process_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def get_unique_id_for_process_mesh(shape, process_ids):
return unique_id


def retrive_unique_id_for_process_mesh(shape, process_ids):
def retrieve_unique_id_for_process_mesh(shape, process_ids):
key = f"shape {shape}, process_ids {process_ids}"
global _g_unique_process_mesh_map
assert key in _g_unique_process_mesh_map
Expand Down
6 changes: 3 additions & 3 deletions python/paddle/distributed/auto_parallel/random.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import paddle

from ..utils.log_utils import get_logger
from .process_mesh import retrive_unique_id_for_process_mesh
from .process_mesh import retrieve_unique_id_for_process_mesh
from .static.utils import _get_idx_in_axis

_logger = get_logger(logging.INFO)
Expand Down Expand Up @@ -57,7 +57,7 @@ def parallel_manual_seed(seed, name=""):

This function should be called only once before auto parallel compiles the computation graph (e.g. auto_parallel.engine.prepare() or fit()).

This seed only affects how randomness-relative **operators** (dropout, fuse op with dropout inside, etc) are execute amonge mesh, and would NOT affect other process like Parameter initialization.
This seed only affects how randomness-relative **operators** (dropout, fuse op with dropout inside, etc) are execute among mesh, and would NOT affect other process like Parameter initialization.

Examples:
# seed relative to training step
Expand Down Expand Up @@ -102,7 +102,7 @@ def determinate_rng(

# FIXME
# unique_id = process_mesh.unique_id
unique_id = retrive_unique_id_for_process_mesh(
unique_id = retrieve_unique_id_for_process_mesh(
process_mesh.shape, process_mesh.process_ids
)
sharding_expr = name_ + f'mesh:{unique_id}'
Expand Down
6 changes: 3 additions & 3 deletions python/paddle/distributed/auto_parallel/static/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def merge_with_dist_attr(tensor_list, dist_attr):
)
# merge the tensor with dist_attr
partition_tensor_list = []
merged_partiton = []
merged_partition = []
for process in process_group:
partition_index = Resharder.compute_partition_index(
process,
Expand All @@ -301,8 +301,8 @@ def merge_with_dist_attr(tensor_list, dist_attr):
process_group,
)
index = process_group.index(process)
if partition_index not in merged_partiton:
merged_partiton.append(partition_index)
if partition_index not in merged_partition:
merged_partition.append(partition_index)
Converter.merge(
partition_tensor_list,
tensor_list[index],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def partition_startup_program(
)
target_block._sync_with_cpp()

# set distribute atrribute
# set distribute attribute
new_op = target_block.ops[-1]
assert new_op.type == new_op_desc.type()
assert new_op.desc == new_op_desc
Expand Down
20 changes: 10 additions & 10 deletions python/paddle/distributed/auto_parallel/static/reshard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1658,8 +1658,8 @@ def find_op_desc_seq(
# TODO(zhaoyingli): Remove the method to a pass.
# Current method to get all pp_ranks' relationship must rely on reshard.
# When reshard insert send/recv pair, the process_group has the pp relationship.
# But the mothod to obtain pp_ranks' relationship is only supported in 'reshard_input',
# casue 'reshard_output' only has current process_group view instead of global view.
# But the method to obtain pp_ranks' relationship is only supported in 'reshard_input',
# cause 'reshard_output' only has current process_group view instead of global view.
op_role = dist_attr[-1]
if int(op_role) == int(OpRole.Forward):
self.dist_context.up_down_streams.add_pair_stream(
Expand Down Expand Up @@ -1695,7 +1695,7 @@ def find_op_desc_seq(
)
)

# In the same process group, it will use allgahther and slice op.
# In the same process group, it will use allgather and slice op.
else:
# NOTE: It just supports even partition scene.
partition_index_list = []
Expand Down Expand Up @@ -1868,7 +1868,7 @@ def parse_op_desc(
"""

# Parse all communicator groups for all ranks
# Ensure every rank has a global view of communicator groups for entire cluters.
# Ensure every rank has a global view of communicator groups for entire cluster.
# When initialize communicators for pipeline parallel, every rank could
# conduct a correct global synchronization.
for rank_id in op_desc_seq:
Expand Down Expand Up @@ -2449,7 +2449,7 @@ def get_op_input_attrs(self, op, var_name):
op_input_attrs = self._get_subblock_input_attrs(op, var_name)
if not op_input_attrs:
# NOTE: [hack method]
# Adapt to quantization pass, which presist_vars, including inputs and outputs, all are in global_block.
# Adapt to quantization pass, which persist_vars, including inputs and outputs, all are in global_block.
# Therefore, the while_op's inputs will contain the all persist_vars, which will be inputs or output of the quantization op in subblock.
op_input_attrs = self._get_subblock_output_attrs(op, var_name)
else:
Expand Down Expand Up @@ -2927,7 +2927,7 @@ def _is_special_op(op):
dist_tensor.dist_attr,
)
else:
# Ensure every rank has a global view of communicator groups for entire cluters.
# Ensure every rank has a global view of communicator groups for entire cluster.
# When initialize communicators for pipeline parallel, every rank could
# conduct a correct global synchronization.
new_process_group(
Expand Down Expand Up @@ -2971,7 +2971,7 @@ def _is_special_op(op):
dist_tensor.dist_attr,
)
else:
# Ensure every rank has a global view of communicator groups for entire cluters.
# Ensure every rank has a global view of communicator groups for entire cluster.
# When initialize communicators for pipeline parallel, every rank could
# conduct a correct global synchronization.
new_process_group(
Expand Down Expand Up @@ -3010,7 +3010,7 @@ def reshard(self):
self.dist_params_grads,
)

# remove no need vars and ops in the startip program
# remove no need vars and ops in the startup program
Remover.remove_no_need_in_startup(
self.auto_parallel_main_prog, self.auto_parallel_startup_prog
)
Expand Down Expand Up @@ -3249,10 +3249,10 @@ def _get_idx(comm_ranks, group_ranks):
)
elif isinstance(op_desc, ConcatOpDesc):
partition_index_list = op_desc._partition_index_list
for idx, partion_idex in enumerate(partition_index_list):
for idx, partition_idex in enumerate(partition_index_list):
self._concat_partitions_for_cost(
partition_tensor_list,
partion_idex,
partition_idex,
dtype,
key,
local_rank_comp_cost,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(
self._seed,
"mode",
self._mode,
"num_machies",
"num_machines",
self._num_machines,
"num_devices_per_machine",
self._num_devices_per_machine,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def register():
pattern = cls()
_PATTERNS[pattern.name] = pattern
# sort patterns according to the number of sharded tensors
# set its dist attr by the fisrt one when a tensor can be matched by multiple patterns.
# set its dist attr by the first one when a tensor can be matched by multiple patterns.
_PATTERNS = dict(
sorted(
_PATTERNS.items(), key=lambda x: -x[1].attrs["sharded_tensors"]
Expand Down Expand Up @@ -201,7 +201,7 @@ def build(self):
# define reshape
reshape = self.add_node(1, **{"type": "reshape2"})

# define reshape input egde
# define reshape input edge
x_edge = self.add_edge(input.id, reshape.id, **{"input_name": "X"})

# define reshape out
Expand Down Expand Up @@ -991,14 +991,14 @@ def partition_cluster(
device_mesh.append([1, partition[1]])
device_meshes.append(device_mesh)
else:
incerement = 1 if partition_result[-1] == [1] else 0
increment = 1 if partition_result[-1] == [1] else 0
for partition in partition_result:
if len(partition) < 2:
continue
device_mesh = []
for i in range(partition[0]):
device_mesh.append([partition[1], m])
device_mesh[-1][0] += incerement
device_mesh[-1][0] += increment
device_meshes.append(device_mesh)

return device_meshes
Expand Down
22 changes: 11 additions & 11 deletions python/paddle/distributed/auto_parallel/static/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def _get_idx_in_axis(processes, shape, axis, rank):
Given a rank and the processes mesh the rank belongs to,
compute the index of the rank in given axis.

Example: 27 processes managed in a 3-Dimensinal mesh with shape of [3, 3, 3].
Example: 27 processes managed in a 3-Dimensional mesh with shape of [3, 3, 3].
the index of rank 22 are:
in axis 0: 1
in axis 1: 1
Expand Down Expand Up @@ -861,7 +861,7 @@ def merge_and_slice_parameter(dist_param_dict, pre_dist_attr, cur_dist_attr):
"""
Merge parameters with previous dist_attr and slice parameters with current dist_attr

Arags:
Args:
dist_param_dict(dict): parameters' value of all ranks.
pre_dist_attr(dict): parameters' dist_attr of last training process.
cur_dist_attr(dict): parameters' dist_attr of current training process.
Expand Down Expand Up @@ -962,14 +962,14 @@ def _merge_parameter_with_dist_attr(param_list, dist_attr):
)
# merge the parameter with dist_attr
partition_param_list = []
merged_partiton = []
merged_partition = []
for process in process_group:
partition_index = Resharder.compute_partition_index(
process, complete_shape, dims_mapping, process_shape, process_group
)
index = process_group.index(process)
if partition_index not in merged_partiton:
merged_partiton.append(partition_index)
if partition_index not in merged_partition:
merged_partition.append(partition_index)
_merge_parameter(
partition_param_list,
param_list[index],
Expand Down Expand Up @@ -1539,10 +1539,10 @@ def get_all_distributed_main_program(

class SerialProgramInfo:
def __init__(
self, train_program, satrtup_program, loss, optimizer, cluster=None
self, train_program, startup_program, loss, optimizer, cluster=None
):
self._train_program = train_program
self._startup_program = satrtup_program
self._startup_program = startup_program
self._loss = loss
self._optimizer = optimizer
self._cluster = cluster
Expand Down Expand Up @@ -1700,7 +1700,7 @@ def set_dist_op_desc_original_id(dist_op_desc, op_desc, dist_context):
elif op_original_id in dist_context._dist_ops_for_program:
dist_op_desc.set_original_id(op_original_id)
return
# Third, print error infomation if we cannot find the original id
# Third, print error information if we cannot find the original id
else:
raise AssertionError(
"Cannot find the original id in the distributed context"
Expand Down Expand Up @@ -1748,7 +1748,7 @@ def get_var_numel(var):
input:
- var: variable
return:
number of elemnet in var
number of element in var
"""
assert isinstance(var, Variable)
assert -1 not in var.shape
Expand Down Expand Up @@ -1835,7 +1835,7 @@ def initialize_pg_in_full_mode(all_process_groups, cur_rank):
)
client_sockets[send_rank].close()
print(
"It is able to instantiate {} as recver now.".format(
"It is able to instantiate {} as receiver now.".format(
process_group.ranks
)
)
Expand Down Expand Up @@ -1982,7 +1982,7 @@ def set_data_parallel(x):


def is_naive_data_parallel(dist_context):
# Navie data parallel only completes dist_attr once from the front to back.
# Naive data parallel only completes dist_attr once from the front to back.
if not dist_context.data_parallel:
return False

Expand Down