Skip to content

Commit

Permalink
Fix issues with async http downloading. (#4304)
Browse files Browse the repository at this point in the history
1. Handle case when number of URLs that is less than the number of
cores.
2. Fix return values.
  • Loading branch information
jonathanmetzman authored Oct 9, 2024
1 parent 8857395 commit b0f6627
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 43 deletions.
32 changes: 19 additions & 13 deletions src/clusterfuzz/_internal/fuzzing/corpus_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,16 @@ def rsync_from_disk(self,
for filepath in shell.get_files_list(directory):
files_to_upload.append(filepath)
if filepath in files_to_delete:
# Remove it from the delete list if it is still on disk, since that
# means it's still in the corpus.
del self._filenames_to_delete_urls_mapping[filepath]

results = self.upload_files(files_to_upload)

urls_to_delete = []
for filename in files_to_delete:
urls_to_delete.append(self._filenames_to_delete_urls_mapping[filename])

urls_to_delete = [
self._filenames_to_delete_urls_mapping[filename]
for filename in files_to_delete
]
storage.delete_signed_urls(urls_to_delete)
logs.info(f'{results.count(True)} corpus files uploaded.')
return results.count(False) < MAX_SYNC_ERRORS
Expand All @@ -428,14 +430,17 @@ def _sync_corpus_to_disk(self, corpus, directory):
"""Syncs a corpus from GCS to disk."""
shell.create_directory(directory, create_intermediates=True)
results = storage.download_signed_urls(corpus.corpus_urls, directory)
for filepath, upload_url in results:
if filepath is None:
fails = 0
for result in results:
if not result.url:
fails += 1
continue
self._filenames_to_delete_urls_mapping[filepath] = (
corpus.corpus_urls[upload_url])

self._filenames_to_delete_urls_mapping[result.filepath] = (
corpus.corpus_urls[result.url])

# TODO(metzman): Add timeout and tolerance for missing URLs.
return results.count(None) < MAX_SYNC_ERRORS
return fails < MAX_SYNC_ERRORS

def upload_files(self, file_paths, timeout=CORPUS_FILES_SYNC_TIMEOUT):
del timeout
Expand Down Expand Up @@ -534,8 +539,9 @@ def _get_regressions_corpus_gcs_url(bucket_name, bucket_path):


def download_corpus(corpus, directory):
storage.download_signed_urls(corpus.download_urls, directory)
storage.download_signed_urls(corpus.regression_download_urls, directory)
storage.download_signed_urls(list(corpus.download_urls.keys()), directory)
storage.download_signed_urls(
list(corpus.regression_download_urls.keys()), directory)


def _get_gcs_url(bucket_name, bucket_path, suffix=''):
Expand Down Expand Up @@ -588,7 +594,8 @@ def sync_data_bundle_corpus_to_disk(data_bundle_corpus, directory):
data_bundle_corpus.gcs_url, directory, delete=False).return_code == 0
results = storage.download_signed_urls(data_bundle_corpus.corpus_urls,
directory)
return results.count(None) < MAX_SYNC_ERRORS
fails = [result.url for result in results if not result.url]
return len(fails) < MAX_SYNC_ERRORS


def get_proto_corpus(bucket_name,
Expand Down Expand Up @@ -648,7 +655,6 @@ def get_fuzz_target_corpus(engine,
bucket_path,
include_delete_urls=include_delete_urls,
max_upload_urls=max_upload_urls)
print('bucket_name', bucket_name, 'bucket_path', bucket_path, corpus.gcs_url)
fuzz_target_corpus.corpus.CopyFrom(corpus)

assert not (include_regressions and quarantine)
Expand Down
66 changes: 42 additions & 24 deletions src/clusterfuzz/_internal/google_cloud_utils/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
"""Functions for managing Google Cloud Storage."""

import collections
from concurrent import futures
import contextlib
import copy
Expand All @@ -22,6 +23,9 @@
import shutil
import threading
import time
from typing import Dict
from typing import List
from typing import Tuple
import uuid

import google.auth.exceptions
Expand Down Expand Up @@ -105,6 +109,9 @@
# We haven't imported this on non-linux platforms.
_TRANSIENT_ERRORS.append(OpenSSL.SSL.Error)

SignedUrlDownloadResult = collections.namedtuple('SignedUrlDownloadResult',
['url', 'filepath'])


class StorageProvider:
"""Core storage provider interface."""
Expand Down Expand Up @@ -167,7 +174,7 @@ def download_signed_url(self, signed_url):
"""Downloads |signed_url|."""
raise NotImplementedError

def upload_signed_url(self, data_or_fileobj, signed_url):
def upload_signed_url(self, data_or_fileobj, signed_url: str) -> bool:
"""Uploads |data| to |signed_url|."""
raise NotImplementedError

Expand Down Expand Up @@ -394,20 +401,22 @@ def delete(self, remote_path):
return True

def sign_download_url(self,
remote_path,
minutes=SIGNED_URL_EXPIRATION_MINUTES):
remote_path: str,
minutes=SIGNED_URL_EXPIRATION_MINUTES) -> str:
"""Signs a download URL for a remote file."""
return _sign_url(remote_path, method='GET', minutes=minutes)

def sign_upload_url(self, remote_path, minutes=SIGNED_URL_EXPIRATION_MINUTES):
def sign_upload_url(self,
remote_path: str,
minutes=SIGNED_URL_EXPIRATION_MINUTES) -> str:
"""Signs an upload URL for a remote file."""
return _sign_url(remote_path, method='PUT', minutes=minutes)

def download_signed_url(self, signed_url):
"""Downloads |signed_url|."""
return _download_url(signed_url)

def upload_signed_url(self, data_or_fileobj, signed_url):
def upload_signed_url(self, data_or_fileobj, signed_url: str) -> bool:
"""Uploads |data| to |signed_url|."""
requests.put(signed_url, data=data_or_fileobj, timeout=HTTP_TIMEOUT_SECONDS)
return True
Expand All @@ -422,7 +431,9 @@ def delete_signed_url(self, signed_url):
requests.delete(signed_url, timeout=HTTP_TIMEOUT_SECONDS)


def _sign_url(remote_path, minutes=SIGNED_URL_EXPIRATION_MINUTES, method='GET'):
def _sign_url(remote_path: str,
minutes=SIGNED_URL_EXPIRATION_MINUTES,
method='GET') -> str:
"""Returns a signed URL for |remote_path| with |method|."""
if _integration_test_env_doesnt_support_signed_urls():
return remote_path
Expand Down Expand Up @@ -666,7 +677,7 @@ def download_signed_url(self, signed_url):
"""Downloads |signed_url|."""
return self.read_data(signed_url)

def upload_signed_url(self, data_or_fileobj, signed_url):
def upload_signed_url(self, data_or_fileobj, signed_url: str) -> bool:
"""Uploads |data| to |signed_url|."""
return self.write_data(data_or_fileobj, signed_url)

Expand Down Expand Up @@ -1197,7 +1208,7 @@ def _download_url(url):
retries=DEFAULT_FAIL_RETRIES,
delay=DEFAULT_FAIL_WAIT,
function='google_cloud_utils.storage.upload_signed_url')
def upload_signed_url(data_or_fileobj, url):
def upload_signed_url(data_or_fileobj, url: str) -> bool:
"""Uploads data to the |signed_url|."""
return _provider().upload_signed_url(str_to_bytes(data_or_fileobj), url)

Expand Down Expand Up @@ -1242,16 +1253,16 @@ def _error_tolerant_upload_signed_url(url_and_path):
return upload_signed_url(fp, url)


def delete_signed_url(url):
def delete_signed_url(url: str):
"""Makes a DELETE HTTP request to |url|."""
_provider().delete_signed_url(url)


def _error_tolerant_delete_signed_url(url):
def _error_tolerant_delete_signed_url(url: str):
try:
return delete_signed_url(url)
delete_signed_url(url)
except Exception:
return False
logs.warning(f'Failed to delete: {url}')


def upload_signed_urls(signed_urls, files):
Expand All @@ -1269,36 +1280,41 @@ def sign_delete_url(remote_path, minutes=SIGNED_URL_EXPIRATION_MINUTES):
return _provider().sign_delete_url(remote_path, minutes)


def download_signed_urls(signed_urls: list[str], directory: str) -> list[bool]:
def download_signed_urls(signed_urls: List[str],
directory: str) -> List[SignedUrlDownloadResult]:
"""Download |signed_urls| to |directory|."""
# TODO(metzman): Use the actual names of the files stored on GCS instead of
# renaming them.
if not signed_urls:
return []
os.makedirs(directory, exist_ok=True)
basename = uuid.uuid4().hex
filepaths = [
os.path.join(directory, f'{basename}-{idx}')
for idx in range(len(signed_urls))
]
logs.info('Downloading URLs.')
result = fast_http.download_urls(signed_urls, filepaths)
logs.info('Done downloading URLs.')
return result
urls = fast_http.download_urls(signed_urls, filepaths)
download_results = [
SignedUrlDownloadResult(url, filepaths[idx])
for idx, url in enumerate(urls)
]
return download_results


def delete_signed_urls(urls):
if not urls:
return []
return
logs.info('Deleting URLs.')
with _pool() as pool:
result = list(pool.map(_error_tolerant_delete_signed_url, urls))
pool.map(_error_tolerant_delete_signed_url, urls)
logs.info('Done deleting URLs.')
return result


def _sign_urls_for_existing_file(corpus_element_url,
include_delete_urls,
minutes=SIGNED_URL_EXPIRATION_MINUTES):
def _sign_urls_for_existing_file(
corpus_element_url: str,
include_delete_urls: bool,
minutes: int = SIGNED_URL_EXPIRATION_MINUTES) -> Tuple[str, str]:
download_url = get_signed_download_url(corpus_element_url, minutes)
if include_delete_urls:
delete_url = sign_delete_url(corpus_element_url, minutes)
Expand All @@ -1307,7 +1323,8 @@ def _sign_urls_for_existing_file(corpus_element_url,
return (download_url, delete_url)


def sign_urls_for_existing_files(urls, include_delete_urls):
def sign_urls_for_existing_files(urls,
include_delete_urls) -> List[Tuple[str, str]]:
logs.info('Signing URLs for existing files.')
result = [
_sign_urls_for_existing_file(url, include_delete_urls) for url in urls
Expand All @@ -1320,7 +1337,8 @@ def get_arbitrary_signed_upload_url(remote_directory):
return get_arbitrary_signed_upload_urls(remote_directory, num_uploads=1)[0]


def get_arbitrary_signed_upload_urls(remote_directory, num_uploads):
def get_arbitrary_signed_upload_urls(remote_directory: str,
num_uploads: int) -> List[str]:
"""Returns |num_uploads| number of signed upload URLs to upload files with
unique arbitrary names to remote_directory."""
# We verify there are no collisions for uuid4s in CF because it would be bad
Expand Down
30 changes: 24 additions & 6 deletions src/clusterfuzz/_internal/system/fast_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import itertools
import multiprocessing
from typing import List
from typing import Optional
from typing import Tuple

import aiohttp
Expand All @@ -40,13 +41,20 @@ def _pool(pool_size=_POOL_SIZE):
yield futures.ProcessPoolExecutor(pool_size)


def download_urls(urls: List[str], filepaths: List[str]) -> List[bool]:
def download_urls(urls: List[str], filepaths: List[str]) -> List[Optional[str]]:
"""Downloads multiple |urls| to |filepaths| in parallel and
asynchronously. Tolerates errors. Returns a list of whether each
download was successful."""
assert len(urls) == len(filepaths)
if len(urls) == 0:
# Do this to avoid issues with the range function.
return []
url_batches = []
url_batch_size = len(urls) // _POOL_SIZE

# Avoid issues with range when urls is less than _POOL_SIZE.
url_batch_size = max(url_batch_size, len(urls))

urls_and_filepaths = list(zip(urls, filepaths))
for idx in range(0, len(urls), url_batch_size):
url_batch = urls_and_filepaths[idx:idx + url_batch_size]
Expand All @@ -55,13 +63,14 @@ def download_urls(urls: List[str], filepaths: List[str]) -> List[bool]:
return list(itertools.chain(*pool.map(_download_files, url_batches)))


def _download_files(urls_and_paths: List[Tuple[str, str]]) -> List[bool]:
def _download_files(
urls_and_paths: List[Tuple[str, str]]) -> List[Optional[str]]:
urls, paths = list(zip(*urls_and_paths))
return asyncio.run(_async_download_files(list(urls), list(paths)))


async def _async_download_files(urls: List[str],
paths: List[str]) -> List[bool]:
paths: List[str]) -> List[Optional[str]]:
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(_error_tolerant_download_file(session, url, path))
Expand All @@ -71,18 +80,27 @@ async def _async_download_files(urls: List[str],


async def _error_tolerant_download_file(session: aiohttp.ClientSession,
url: str, path: str) -> bool:
url: str, path: str) -> Optional[str]:
try:
await _async_download_file(session, url, path)
return True
return url
except:
logs.warning(f'Failed to download {url}.')
return False
return None


async def _async_download_file(session: aiohttp.ClientSession, url: str,
path: str):
"""Asynchronously downloads |url| and writes it to |path|."""
async with session.get(url) as response:
if response.status != 200:
print(response.status, url)
raise aiohttp.ClientResponseError(
response.request_info,
response.history,
message=f'Failed to download. Code: {response.status}.',
status=response.status,
)
with open(path, 'wb') as fp:
while True:
chunk = await response.content.read(1024)
Expand Down

0 comments on commit b0f6627

Please sign in to comment.