Source code for src.yolov5.utils.google_utils

# Google utils: https://cloud.google.com/storage/docs/reference/libraries

import os
import platform
import subprocess
import time
from pathlib import Path

import requests
import torch


[docs]def gsutil_getsize(url=''): # gs://bucket/file size https://cloud.google.com/storage/docs/gsutil/commands/du s = subprocess.check_output(f'gsutil du {url}', shell=True).decode('utf-8') return eval(s.split(' ')[0]) if len(s) else 0 # bytes
[docs]def safe_download(file, url, url2=None, min_bytes=1E0, error_msg=''): # Attempts to download file from url or url2, checks and removes incomplete downloads < min_bytes file = Path(file) try: # GitHub print(f'Downloading {url} to {file}...') torch.hub.download_url_to_file(url, str(file)) assert file.exists() and file.stat().st_size > min_bytes # check except Exception as e: # GCP file.unlink(missing_ok=True) # remove partial downloads print(f'Download error: {e}\nRe-attempting {url2 or url} to {file}...') os.system(f"curl -L '{url2 or url}' -o '{file}' --retry 3 -C -") # curl download, retry and resume on fail finally: if not file.exists() or file.stat().st_size < min_bytes: # check file.unlink(missing_ok=True) # remove partial downloads print(f'ERROR: Download failure: {error_msg or url}') print('')
[docs]def attempt_download(file, repo='ultralytics/yolov5'): # Attempt file download if does not exist file = Path(str(file).strip().replace("'", '')) if not file.exists(): # URL specified name = file.name if str(file).startswith(('http:/', 'https:/')): # download url = str(file).replace(':/', '://') # Pathlib turns :// -> :/ safe_download(file=name, url=url, min_bytes=1E5) return name # GitHub assets file.parent.mkdir(parents=True, exist_ok=True) # make parent dir (if required) try: response = requests.get(f'https://api.github.com/repos/{repo}/releases/latest').json() # github api assets = [x['name'] for x in response['assets']] # release assets, i.e. ['yolov5s.pt', 'yolov5m.pt', ...] tag = response['tag_name'] # i.e. 'v1.0' except: # fallback plan assets = ['yolov5s.pt', 'yolov5m.pt', 'yolov5l.pt', 'yolov5x.pt', 'yolov5s6.pt', 'yolov5m6.pt', 'yolov5l6.pt', 'yolov5x6.pt'] try: tag = subprocess.check_output('git tag', shell=True, stderr=subprocess.STDOUT).decode().split()[-1] except: tag = 'v5.0' # current release if name in assets: safe_download(file, url=f'https://github.com/{repo}/releases/download/{tag}/{name}', # url2=f'https://storage.googleapis.com/{repo}/ckpt/{name}', # backup url (optional) min_bytes=1E5, error_msg=f'{file} missing, try downloading from https://github.com/{repo}/releases/') return str(file)
[docs]def gdrive_download(id='16TiPfZj7htmTyhntwcZyEEAejOUxuT6m', file='tmp.zip'): # Downloads a file from Google Drive. from yolov5.utils.google_utils import *; gdrive_download() t = time.time() file = Path(file) cookie = Path('cookie') # gdrive cookie print(f'Downloading https://drive.google.com/uc?export=download&id={id} as {file}... ', end='') file.unlink(missing_ok=True) # remove existing file cookie.unlink(missing_ok=True) # remove existing cookie # Attempt file download out = "NUL" if platform.system() == "Windows" else "/dev/null" os.system(f'curl -c ./cookie -s -L "drive.google.com/uc?export=download&id={id}" > {out}') if os.path.exists('cookie'): # large file s = f'curl -Lb ./cookie "drive.google.com/uc?export=download&confirm={get_token()}&id={id}" -o {file}' else: # small file s = f'curl -s -L -o {file} "drive.google.com/uc?export=download&id={id}"' r = os.system(s) # execute, capture return cookie.unlink(missing_ok=True) # remove existing cookie # Error check if r != 0: file.unlink(missing_ok=True) # remove partial print('Download error ') # raise Exception('Download error') return r # Unzip if archive if file.suffix == '.zip': print('unzipping... ', end='') os.system(f'unzip -q {file}') # unzip file.unlink() # remove zip to free space print(f'Done ({time.time() - t:.1f}s)') return r
[docs]def get_token(cookie="./cookie"): with open(cookie) as f: for line in f: if "download" in line: return line.split()[-1] return ""
# def upload_blob(bucket_name, source_file_name, destination_blob_name): # # Uploads a file to a bucket # # https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python # # storage_client = storage.Client() # bucket = storage_client.get_bucket(bucket_name) # blob = bucket.blob(destination_blob_name) # # blob.upload_from_filename(source_file_name) # # print('File {} uploaded to {}.'.format( # source_file_name, # destination_blob_name)) # # # def download_blob(bucket_name, source_blob_name, destination_file_name): # # Uploads a blob from a bucket # storage_client = storage.Client() # bucket = storage_client.get_bucket(bucket_name) # blob = bucket.blob(source_blob_name) # # blob.download_to_filename(destination_file_name) # # print('Blob {} downloaded to {}.'.format( # source_blob_name, # destination_file_name))