import json
import pathlib
from concurrent.futures import ThreadPoolExecutor
import requests
from astropy.table import Table
from .cdaweb import _CDAS_BASEURL, _CDAS_HEADERS, _DATAVIEW
__all__ = ['get_observatory_groups', 'get_datasets']
[docs]
def get_observatory_groups():
"""
Get a list of observatory IDs for each observatory in CDAWeb.
An observatory group is typically a single mission, which can contain
multiple observatories, e.g. for the STEREO observatory group there are two
observatories, STEREO-A and STEREO-B.
Returns
-------
`astropy.table.Table`
Examples
--------
>>> from sunpy.net.cdaweb import get_observatory_groups
>>>
>>> groups = get_observatory_groups() # doctest: +REMOTE_DATA
>>> groups['Group'] # doctest: +REMOTE_DATA
<Column name='Group' dtype='str55' length=...>
ACE
AIM
AMPTE
...
Voyager
Wind
>>> groups.loc['STEREO'] # doctest: +REMOTE_DATA
<Row index=...>
Group Observatories
str55 str...
------ -----------------------------------------------------------------------------
STEREO 'Ahead', 'Behind', 'STA', 'STB', 'STEREO', 'STEREOA', 'STEREOB', 'sta', 'stb'
"""
# Get a list of files for a given dataset between start and end times
url = '/'.join([
_CDAS_BASEURL,
'dataviews', _DATAVIEW,
'observatoryGroups'
])
response = requests.get(url, headers=_CDAS_HEADERS)
obs_groups = response.json()
names = [obs['Name'] for obs in obs_groups['ObservatoryGroupDescription']]
obs_ids = [obs['ObservatoryId'] for obs in obs_groups['ObservatoryGroupDescription']]
# Join all IDs into a single string
obs_ids = ["'" + "', '".join(id) + "'" for id in obs_ids]
t = Table([names, obs_ids], names=['Group', 'Observatories'])
t.add_index('Group')
return t
[docs]
def get_datasets(observatory):
"""
Get a list of datasets for a given observatory.
Parameters
----------
observatory : `str`
Observatory name.
Returns
-------
`astropy.table.Table`
Examples
--------
>>> from sunpy.net.cdaweb import get_datasets
>>>
>>> datasets = get_datasets('STEREOB') # doctest: +REMOTE_DATA
>>> datasets['Id'] # doctest: +REMOTE_DATA
<Column name='Id' dtype='str17' length=5>
STB_LB_IMPACT
STB_L1_IMPACT_HKP
STB_L1_HET
STB_L2_SWEA_PAD
STB_L1_SWEA_SPEC
>>> datasets.loc['STB_L1_SWEA_SPEC']['Label'] # doctest: +REMOTE_DATA
'STEREO Behind IMPACT/SWEA Spectra - J. Luhmann (UCB/SSL)'
>>> datasets.loc['STB_L1_SWEA_SPEC'][['Start', 'End']] # doctest: +REMOTE_DATA
<Row index=4>
Start End
str24 str24
------------------------ ------------------------
2012-12-01T00:00:03.000Z 2013-12-31T23:59:41.000Z
"""
# Get a list of files for a given dataset between start and end times
url = '/'.join([
_CDAS_BASEURL,
'dataviews', _DATAVIEW,
'datasets'
])
url = f'{url}?observatory={observatory}'
response = requests.get(url, headers=_CDAS_HEADERS)
datasets = response.json()['DatasetDescription']
ids = [dataset['Id'] for dataset in datasets]
instruments = [', '.join(dataset['Instrument']) for dataset in datasets]
labels = [dataset['Label'] for dataset in datasets]
stimes = [dataset['TimeInterval']['Start'] for dataset in datasets]
etimes = [dataset['TimeInterval']['End'] for dataset in datasets]
t = Table([ids, instruments, labels, stimes, etimes],
names=['Id', 'Instruments', 'Label', 'Start', 'End'])
t.add_index('Id')
return t
def _update_cdaweb_dataset_data():
all_obs = get_observatory_groups()
url = '/'.join([
_CDAS_BASEURL,
'dataviews', _DATAVIEW,
'datasets'
])
# Mapping from dataset ID to description
all_datasets = {}
# Number of parallel threads we spawn
N = 3
def _fetch_cdaweb_dataset(group, url=url):
print(f'🛰 Getting datasets for {group}')
u = url + f'?observatoryGroup={group}'
res = requests.get(u, headers=_CDAS_HEADERS)
datasets = res.json()['DatasetDescription']
dataset_ids = {ds['Id']: ds['Label'] for ds in datasets}
all_datasets.update(dataset_ids)
with ThreadPoolExecutor(max_workers=N) as executor:
# Submit each URL to the thread pool
futures = [executor.submit(_fetch_cdaweb_dataset, group) for group in all_obs['Group']]
# Wait for all tasks to complete
for future in futures:
future.result()
attr_file = pathlib.Path(__file__).parent / 'data' / 'attrs.json'
with open(attr_file, 'w') as attrs_file:
json.dump(dict(sorted(all_datasets.items())), attrs_file, indent=2)