Source code for astroquery.utils.progressbar
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import gzip
import sys
from io import StringIO
from urllib.request import build_opener
from astropy.io import fits
__all__ = ['chunk_report', 'chunk_read']
[docs]def chunk_report(bytes_so_far, chunk_size, total_size):
if total_size > 0:
percent = float(bytes_so_far) / total_size
percent = round(percent * 100, 2)
sys.stdout.write("Downloaded %12.2g of %12.2g Mb (%6.2f%%)\r" %
(bytes_so_far / 1024. ** 2, total_size / 1024. ** 2,
percent))
else:
sys.stdout.write("Downloaded %10.2g Mb\r" %
(bytes_so_far / 1024. ** 2))
[docs]def chunk_read(response, chunk_size=1024, report_hook=None):
content_length = response.info().get('Content-Length')
if content_length is None:
total_size = 0
else:
total_size = content_length.strip()
total_size = int(total_size)
bytes_so_far = 0
result_string = b""
# sys.stdout.write("Beginning download.\n")
while True:
chunk = response.read(chunk_size)
result_string += chunk
bytes_so_far += len(chunk)
if not chunk:
if report_hook:
sys.stdout.write('\n')
break
if report_hook:
report_hook(bytes_so_far, chunk_size, total_size)
return result_string
def retrieve(url, outfile, opener=None, overwrite=False):
"""
"retrieve" (i.e., download to file) a URL.
"""
if opener is None:
opener = build_opener()
page = opener.open(url)
results = chunk_read(page, report_hook=chunk_report)
S = StringIO(results)
try:
fitsfile = fits.open(S, ignore_missing_end=True)
except IOError:
S.seek(0)
G = gzip.GzipFile(fileobj=S)
fitsfile = fits.open(G, ignore_missing_end=True)
fitsfile.writeto(outfile, clobber=overwrite)