Source code for sunpy.data.data_manager.storage
"""
Storage module contains the abstract implementation of storage
for `sunpy.data.data_manager.Cache` and a concrete implementation
using sqlite.
"""
import sqlite3
from abc import ABCMeta, abstractmethod
from pathlib import Path
from contextlib import contextmanager
__all__ = [
'StorageProviderBase',
'SqliteStorage',
'InMemStorage',
]
[docs]
class StorageProviderBase(metaclass=ABCMeta):
"""
Base class for remote data manager storage providers.
"""
[docs]
@abstractmethod
def find_by_key(self, key, value):
"""
Returns the file details if value corresponding to the key
found in storage. Returns `None` if hash not found.
Parameters
----------
key : `str`
The key/column name of the field.
value : `str`
The value associated with the key of the entry.
Returns
-------
`dict` or `None`
`dict` contains the details of the file. `None` if hash not found.
Raises
------
``KeyError``
KeyError is raised if key does not exist.
"""
[docs]
@abstractmethod
def delete_by_key(self, key, value):
"""
Deletes the matching entry from the store.
Parameters
----------
key : `str`
The key/column name of the field.
value : `str`
The value associated with the key of the entry.
Raises
------
``KeyError``
KeyError is raised if key does not exist.
"""
[docs]
@abstractmethod
def store(self, details):
"""
Stores the details in the storage.
Parameters
----------
details : `dict`
Details to be stored.
"""
[docs]
class InMemStorage(StorageProviderBase):
"""
This provides a storage stored in memory.
"""
def __init__(self):
self._store = []
[docs]
def store(self, details):
self._store += [details]
[docs]
def delete_by_key(self, key, value):
for i in self._store:
if i[key] == value:
self._store.remove(i)
[docs]
def find_by_key(self, key, value):
for i in self._store:
if i[key] == value:
return i
return None
[docs]
class SqliteStorage(StorageProviderBase):
"""
This provides a sqlite backend for storage.
Parameters
----------
path: `str`
Path to the database file.
"""
COLUMN_NAMES = [
'file_hash',
'file_path',
'url',
'time',
]
def __init__(self, path):
self._db_path = Path(path)
self._table_name = 'cache_storage'
self._db_path.parent.mkdir(parents=True, exist_ok=True)
if not self._db_path.exists():
# setup database
self._setup()
def _setup(self):
with self.connection(commit=True) as conn:
self._create_table(conn)
def _create_table(self, conn):
schema = ' text, '.join(self.COLUMN_NAMES) + ' text'
conn.execute(f'''CREATE TABLE IF NOT EXISTS {self._table_name} ({schema})''')
[docs]
@contextmanager
def connection(self, commit=False):
"""
A context manager which provides an easy way to handle db connections.
Parameters
----------
commit : `bool`
Whether to commit after successful execution of db command.
"""
conn = sqlite3.connect(str(self._db_path))
self._create_table(conn)
try:
yield conn
if commit:
conn.commit()
finally:
conn.close()
[docs]
def find_by_key(self, key, value):
if key not in self.COLUMN_NAMES:
raise KeyError
with self.connection() as conn:
cursor = conn.cursor()
cursor.execute(f'''SELECT * FROM {self._table_name}
WHERE {key}="{value}"''')
row = cursor.fetchone()
if row:
return dict(zip(self.COLUMN_NAMES, row))
return None
[docs]
def delete_by_key(self, key, value):
if key not in self.COLUMN_NAMES:
raise KeyError
with self.connection(commit=True) as conn:
cursor = conn.cursor()
cursor.execute(f'''DELETE FROM {self._table_name}
WHERE {key}="{value}"''')
[docs]
def store(self, details):
values = [details[k] for k in self.COLUMN_NAMES]
placeholder = '?,' * len(values)
placeholder = placeholder[:-1]
with self.connection(commit=True) as conn:
conn.execute(f'''INSERT INTO {self._table_name}
VALUES ({placeholder})''', list(values))