##############################################################################
# Institute for the Design of Advanced Energy Systems Process Systems
# Engineering Framework (IDAES PSE Framework) Copyright (c) 2018-2019, by the
# software owners: The Regents of the University of California, through
# Lawrence Berkeley National Laboratory, National Technology & Engineering
# Solutions of Sandia, LLC, Carnegie Mellon University, West Virginia
# University Research Corporation, et al. All rights reserved.
#
# Please see the files COPYRIGHT.txt and LICENSE.txt for full copyright and
# license information, respectively. Both files are also available online
# at the URL "https://github.com/IDAES/idaes-pse".
##############################################################################
"""
Resource database.
"""
# system
from datetime import datetime
import logging
import re
# third party
import pendulum
import six
from tinydb import TinyDB, Query
# local
from . import errors
from .resource import Resource
from .resource import Triple, triple_from_resource_relations
__author__ = 'Dan Gunter <dkgunter@lbl.gov>'
_log = logging.getLogger(__name__)
[docs]class ResourceDB(object):
"""A database interface to all the resources within a given DMF workspace.
"""
def __init__(self, dbfile=None, connection=None):
"""Initialize from DMF and given configuration field.
Args:
dbfile (str): DB location
connection: If non-empty, this is an
existing connection that should be re-used, instead of
trying to connect to the location in `dbfile`.
Raises:
ValueError, if dbfile
and connection are both None
"""
self._db = None
self._gr = None
if connection is not None:
self._db = connection
elif dbfile is not None:
try:
db = TinyDB(dbfile)
except IOError:
raise errors.FileError('Cannot open resource DB "{}"'.format(dbfile))
# turn off caching, otherwise update() does not work properly
self._db = db.table('resources', cache_size=0)
def __len__(self):
return len(self._db)
[docs] def find(self, filter_dict, id_only=False, flags=0):
"""Find and return records based on the provided filter.
Args:
filter_dict (dict): Search filter. For syntax, see docs in
:meth:`.dmf.DMF.find`.
id_only (bool): If true, return only the identifier of each
resource; otherwise a Resource object is returned.
flags (int): Flag values for, e.g., regex searches
Returns:
generator of int|Resource, depending on the value of `id_only`
"""
def as_resource(_r):
_log.debug(f"as_resource: id_={_r['id_']}")
rsrc = Resource(value=_r)
rsrc.v['doc_id'] = _r.doc_id
return rsrc
# with no filter, do a find-all
if not filter_dict:
for r in self._db.all():
if id_only:
yield r.eid
else:
yield as_resource(r)
return
filter_expr = self._create_filter_expr(filter_dict, flags)
# return results for query
_log.debug('Find resources matching: {}'.format(filter_expr))
results = self._db.search(filter_expr)
for r in results:
if id_only:
yield r.eid
else:
_log.debug(f"got resource: {r}")
yield as_resource(r)
@classmethod
def _create_filter_expr(cls, filter_dict, flags=0):
# convert filter to expression for TinyDB
q = Query()
filter_expr = None
def fadd(c):
"""Update filter, or set to 1st condition, & return new value."""
return c if filter_expr is None else filter_expr & c
for k, v in six.iteritems(filter_dict):
if not k:
continue # XXX: Issue a warning?
# strip off list-query operator
qry_all = False
if isinstance(v, list) and k.endswith('!'):
k, qry_all = k[:-1], True
# Get the query[field1][field2][..][fieldN] object
qry = q
for field in k.split('.'):
qry = qry[field]
# A list value means find these value(s) in the
# list value of the record. By default, any matching
# value is a match. If the key has a '!' appended, then
# require all values for a match.
if isinstance(v, list):
if len(v) > 0:
# if values are dicts, nest a query
# XXX: this only works one level deep
# XXX: only one nested query is used/allowed
if isinstance(v[0], dict):
list_expr = None
for list_k, list_v in v[0].items():
list_qry = Query()
for field in list_k.split('.'):
list_qry = list_qry[field]
list_cond = cls._expr_to_query(
list_qry, list_v, flags=flags
)
if list_expr is None:
list_expr = list_cond
else:
list_expr = list_expr & list_cond
# otherwise, simply put the values in there
else:
list_expr = tuple(v)
# add nested "query" (or value tuple)
if qry_all:
cond = qry.all(list_expr)
else:
cond = qry.any(list_expr)
else:
cond = cls._expr_to_query(qry, v, flags=flags)
filter_expr = fadd(cond)
return filter_expr
@classmethod
def _expr_to_query(cls, qry, v, flags=None):
"""Get a query from a filter expr.
There are two types of non-list values:
1. equality is just {key: value}
2. inequalities are {key: {op: value, ...}}
operators are "$<shell test op>", such as "$lt",
just like in MongoDB; see _op_cond() method for details.
"""
result = None
if isinstance(v, dict):
for op_key, op_value in v.items():
tv = cls._value_transform(op_value)
cond = cls._op_cond(qry, op_key, tv)
result = cond if result is None else result & cond
else:
tv = cls._value_transform(v)
if v is True:
result = qry.exists()
elif tv is False:
result = ~qry.exists()
elif hasattr(tv, "match"): # regex
result = qry.matches(tv.pattern, flags=flags)
else:
result = qry == tv
return result
@staticmethod
def _value_transform(v):
# transform dates into timestamps
if isinstance(v, datetime) or isinstance(v, pendulum.Pendulum):
if isinstance(v, datetime):
pv = pendulum.create(
v.year,
v.month,
v.day,
v.hour,
v.minute,
v.second,
v.microsecond,
v.tzname(),
)
else:
pv = v
return pv.timestamp()
# support for special string '@' values
elif isinstance(v, str) and len(v) > 0 and v[0] == '@':
if v == '@true':
return True
if v == '@false':
return False
return v
# for a regex, return a Python regex obj
elif isinstance(v, str) and len(v) > 0 and v[0] == '~':
return re.compile(v[1:])
# default is no transformation
else:
return v
@staticmethod
def _op_cond(query, op, value):
# sanity check that operator is truthy
if not op:
raise ValueError(f"empty operator for value `{value}`")
# just a clumsy switch statement..
if op == '$gt':
cond = query > value
elif op == '$ge':
cond = query >= value
elif op == '$lt':
cond = query < value
elif op == '$le':
cond = query <= value
elif op == '$ne':
cond = query != value
else:
raise ValueError('Unexpected operator: {}'.format(op))
return cond
[docs] def find_one(self, *args, **kwargs):
"""Same as `find()`, but returning only first value or None.
"""
result = None
for value in self.find(*args, **kwargs):
result = value
break
return result
[docs] def get(self, identifier):
"""Get a resource by identifier.
Args:
identifier: Internal identifier
Returns:
(Resource) A resource or None
"""
def as_resource(_r):
rsrc = Resource(value=_r)
rsrc.v['doc_id'] = _r.doc_id
return rsrc
item = self._db.get(doc_id=identifier)
if item is None:
return None
return as_resource(item)
[docs] def put(self, resource):
"""Put this resource into the database.
Args:
resource (Resource): The resource to add
Returns:
None
Raises:
errors.DuplicateResourceError: If there is already a resource
in the database with the same "id".
"""
_log.debug(f"put resource id={resource.id}")
# check for same id
qry = Query()
if self._db.contains(qry.id_ == resource.id):
raise errors.DuplicateResourceError("put", resource.id)
# add resource
self._db.insert(resource.v)
[docs] def delete(self, id_=None, idlist=None, filter_dict=None, internal_ids=False):
"""Delete one or more resources with given identifiers.
Args:
id_ (Union[str,int]): If given, delete this id.
idlist (list): If given, delete ids in this list
filter_dict (dict): If given, perform a search and
delete ids it finds.
internal_ids (bool): If True, treat identifiers as numeric
(internal) identifiers. Otherwise treat them as
resource (string) indentifiers.
Returns:
(list[str]) Identifiers
"""
if internal_ids:
doc_ids = idlist if idlist else [id_]
self._db.remove(doc_ids=doc_ids)
else:
ID = Resource.ID_FIELD
if filter_dict:
cond = self._create_filter_expr(filter_dict)
elif id_:
cond = self._create_filter_expr({ID: id_})
elif idlist:
cond = self._create_filter_expr({ID: [idlist]})
else:
return
self._db.remove(cond=cond)
[docs] def update(self, id_, new_dict):
"""Update the identified resource with new values.
Args:
id_ (int): Identifier of resource to update
new_dict (dict): New dictionary of resource values
Returns:
None
Raises:
ValueError: If new resource is of wrong type
KeyError: If old resource is not found
"""
_log.debug("update.start")
id_cond = {Resource.ID_FIELD: id_}
old = self.find_one(id_cond)
if old is None:
raise KeyError('Cannot find resource id={}'.format(id_))
T = Resource.TYPE_FIELD
if old.v[T] != new_dict[T]:
raise ValueError(
'New resource type="{}" does not '
'match current resource type "{}"'.format(new_dict[T], old.v[T])
)
_log.debug(f"update old resource: {old.v}")
changed = {}
for k, v in new_dict.items():
if k not in old.v:
changed[k] = v
elif old.v[k] != v:
changed[k] = v
_log.debug(f"update resource {id_} with new values: {changed}")
self._db.update(changed, self._create_filter_expr(id_cond))