Source code for hscquery

# -*- coding: utf-8 -*-
"""
Module for accessing the Hyper Suprime-Cam Subaru Strategic Program database.

A valid account for the HSC Archive is needed to use this module.
See `HSC Online Registration
<https://hsc-release.mtk.nao.ac.jp/datasearch/new_user/new>`_.

Based on the python script developed by michitaro, NAOJ / HSC Collaboration.
[`Source <https://hsc-gitlab.mtk.nao.ac.jp/snippets/17>`_]
"""
from __future__ import print_function

from future import standard_library
standard_library.install_aliases()
from builtins import input
from builtins import object
from builtins import bytes, str

import os
import json
import urllib.request, urllib.error, urllib.parse
import time
import sys
import csv
import getpass
import tempfile

from astropy import units as u
from astropy.table import Table


[docs]class QueryError(Exception): """ Query error class. """ pass
[docs]class HSC(object): """ Main class for accessing the HSC-SSP database. Parameters ---------- survey : str, optional Available surveys: 'wide', 'deep', 'udeep'. By default is 'wide'. release_version : str, optional For the moment, only 'pdr1' is available (Public Data Release 1) columns : str, optional List of selected columns for query results. See the `HSP-SSP schema <https://hsc-release.mtk.nao.ac.jp/schema/>`_ for details. By default is 'object_id, ra, dec'. user : str or `None`, optional Account name in the HSC-SSP database. If `None`, when an ``HSC`` object is initiated, the user can introduced the account name. password_env : str, optional The account's password can be stored in a system enviroment variable. By default the password is searched at ``HSCPASSW``. If this environment variable doesn't exist, the user is asked to introduce his password. Use the `password_env` option with caution, since your password can be easily exposed! """ _version = 20181012.1 _url = 'https://hsc-release.mtk.nao.ac.jp/datasearch/api/catalog_jobs/' def __init__(self, survey='wide', release_version='pdr1', columns='object_id, ra, dec', user=None, password_env='HSCPASSW'): surveys = ['wide', 'deep', 'udeep'] if survey not in surveys: error_message = 'Unknown survey: {}' raise ValueError(error_message.format(survey)) user, passw = self.__login(user, password_env) self.credential = {'account_name': user, 'password': passw} self.columns = columns self.survey = survey self.release_version = release_version
[docs] def query_region(self, coords, radius, catalog='forced'): """ Returns an astropy ``Table`` object with all sources from catalog `catalog` within radius `radius` around sky position `coords`. Parameters ---------- coords : ``SkyCoord`` Search around this position. radius : ``Quantity`` Search radius (angular units) catalog : str, optional Available options: 'forced', 'meas', 'specz', or 'random'. See the `HSP-SSP schema <https://hsc-release.mtk.nao.ac.jp/schema/>`_ for details. By default is 'forced'. """ catalogs = ['forced', 'meas', 'specz', 'random'] if catalog not in catalogs: error_message = 'Unknown survey: {}' raise ValueError(error_message.format(catalog)) table = '{}_{}.{}'.format(self.release_version, self.survey, catalog) data_raw = self.__cone_search(coords, radius, self.columns, table) data = self.__clean_fits_output(data_raw) return data
[docs] def send_query(self, sql, output_format='csv', output_file=None, delete_job=True): """ Send an SQL query `sql`. If `output_file` is ``None``, a preview of the results is shown. Otherwise, results are saved in a file with name `output_file` and in the format defined by `output_format`. Parameters ---------- sql : str SQL query. output_format : str, optional Available formats: 'csv', 'csv.gz', 'sqlite3', or 'fits'. output_file : str or ``None`` Name of the file for storing the query results. If ``None``, a preview of the results is shown. delete_job : bool Delete job and results from the user space. By default is ``True``. """ formats = ['csv', 'csv.gz', 'sqlite3', 'fits'] try: if output_file is None: self.__preview(self.credential, sql, sys.stdout) else: if output_format not in formats: error_message = 'Unknown output format: {}' raise ValueError(error_message.format(output_format)) job = self.__submit_job(self.credential, sql, output_format) self.__block_until_job_finishes(self.credential, job['id']) with open(output_file, 'wb') as output: self.__download(self.credential, job['id'], output) if delete_job: self.__delete_job(self.credential, job['id']) except urllib.error.HTTPError as error: if error.code == 401: print('invalid id or password.', file=sys.stderr) if error.code == 406: print(error.read(), file=sys.stderr) else: print(error, file=sys.stderr) except QueryError as error: print(error, file=sys.stderr) except KeyboardInterrupt: if job is not None: self.__job_cancel(self.credential, job['id']) raise
def __login(self, user, password_env): if user is None: user = input('HSC-SSP user: ') password_from_envvar = os.environ.get(password_env, '') if password_from_envvar != '': passw = password_from_envvar else: passw = getpass.getpass('password: ') return user, passw def __cone_search(self, coords, radius, columns='object_id, ra, dec', table='pdr1_udeep.forced'): query = 'SELECT {} FROM {} WHERE coneSearch(coord, {}, {}, {})' query = query.format(columns, table, coords.ra.deg, coords.dec.deg, radius.to(u.arcsec).value) with tempfile.NamedTemporaryFile() as temp: self.send_query(query, output_format='fits', output_file=temp.name) temp.seek(0) data = Table.read(temp.name, format='fits') return data def __clean_fits_output(self, fits_table): # Remove isnull columns columns = [col for col in fits_table.colnames if not col.endswith('_isnull')] return fits_table[columns] def __http_json_post(self, url, data): data['clientVersion'] = self._version post_data = json.dumps(data) headers = {'Content-type': 'application/json'} req = urllib.request.Request(url, post_data.encode(), headers) res = urllib.request.urlopen(req) return res def __submit_job(self, credential, sql, out_format, nomail=True, skip_syntax_check=True): url = self._url + 'submit' catalog_job = { 'sql' : sql, 'out_format' : out_format, 'include_metainfo_to_body': True, 'release_version' : self.release_version, } post_data = {'credential': credential, 'catalog_job': catalog_job, 'nomail': nomail, 'skip_syntax_check': skip_syntax_check} res = self.__http_json_post(url, post_data) job = json.load(res) return job def __job_status(self, credential, job_id): url = self._url + 'status' post_data = {'credential': credential, 'id': job_id} res = self.__http_json_post(url, post_data) job = json.load(res) return job def __job_cancel(self, credential, job_id): url = self._url + 'cancel' post_data = {'credential': credential, 'id': job_id} self.__http_json_post(url, post_data) def __preview(self, credential, sql, out): url = self._url + 'preview' catalog_job = { 'sql' : sql, 'release_version' : self.release_version, } post_data = {'credential': credential, 'catalog_job': catalog_job} res = self.__http_json_post(url, post_data) result = json.load(res) writer = csv.writer(out) for row in result['result']['rows']: writer.writerow(row) result_nrows = len(result['result']['rows']) if result['result']['count'] > result_nrows: error_message = 'only top {:d} records are displayed!' raise QueryError(error_message.format(result_nrows)) def __block_until_job_finishes(self, credential, job_id): max_interval = 5 * 60 # sec. interval = 1 while True: time.sleep(interval) job = self.__job_status(credential, job_id) if job['status'] == 'error': raise QueryError('query error: {}'.format(job['error'])) if job['status'] == 'done': break interval *= 2 if interval > max_interval: interval = max_interval def __download(self, credential, job_id, out): url = self._url + 'download' post_data = {'credential': credential, 'id': job_id} res = self.__http_json_post(url, post_data) buffer_size = 64 * 1<<10 # 64k while True: buf = res.read(buffer_size) out.write(buf) if len(buf) < buffer_size: break def __delete_job(self, credential, job_id): url = self._url + 'delete' post_data = {'credential': credential, 'id': job_id} self.__http_json_post(url, post_data)