Source code for idaes.dmf.util

##############################################################################
# Institute for the Design of Advanced Energy Systems Process Systems
# Engineering Framework (IDAES PSE Framework) Copyright (c) 2018-2019, by the
# software owners: The Regents of the University of California, through
# Lawrence Berkeley National Laboratory,  National Technology & Engineering
# Solutions of Sandia, LLC, Carnegie Mellon University, West Virginia
# University Research Corporation, et al. All rights reserved.
#
# Please see the files COPYRIGHT.txt and LICENSE.txt for full copyright and
# license information, respectively. Both files are also available online
# at the URL "https://github.com/IDAES/idaes-pse".
##############################################################################
"""
Utility functions.
"""
# stdlib
import importlib
import json

from json import JSONDecodeError
import logging
import os
import re
import shutil
import sys
import tempfile
import time

__author__ = 'Dan Gunter'

_log = logging.getLogger(__name__)


def strlist(x, sep=', '):
    # type: (list, str) -> str
    return sep.join([str(item) for item in x])


# def pluck(obj, key):
#     """Remove and return obj[key].
#     """
#     value = obj[key]
#     del obj[key]
#     return value


[docs]def get_file(file_or_path, mode='r'): """Open a file for reading, or simply return the file object. """ if hasattr(file_or_path, 'read'): return file_or_path return open(file_or_path, mode)
def import_module(name): mod = importlib.import_module(name) return mod
[docs]def get_module_version(mod): """Find and return the module version. Version must look like a semantic version with `<a>.<b>.<c>` parts; there can be arbitrary extra stuff after the `<c>`. For example:: 1.0.12 0.3.6 1.2.3-alpha-rel0 Args: mod (module): Python module Returns: (str) Version string or None if not found Raises: ValueError if version is found but not valid """ v = getattr(mod, '__version__', None) if v is None: return None pat = r'\d+\.\d+\.\d+.*' if not re.match(pat, v): raise ValueError( 'Version "{}" does not match regular expression ' 'pattern "{}"'.format(v, pat) ) return v
[docs]def get_module_author(mod): """Find and return the module author. Args: mod (module): Python module Returns: (str) Author string or None if not found Raises: nothing """ return getattr(mod, '__author__', None)
[docs]class TempDir(object): """Simple context manager for mkdtemp(). """ def __init__(self, *args): self._d = None self._a = args def __enter__(self): self._d = tempfile.mkdtemp(*self._a) return self._d def __exit__(self, *args): if self._d: shutil.rmtree(self._d) self._d = None
[docs]def is_jupyter_notebook(filename, check_contents=True): # type: (str) -> bool """See if this is a Jupyter notebook. """ if not filename.endswith('.ipynb'): return False if check_contents: try: nb = json.load(open(filename)) except (UnicodeDecodeError, JSONDecodeError): return False for key in 'cells', 'metadata', 'nbformat': if key not in nb: return False return True
[docs]def is_python(filename): # type: (str) -> bool """See if this is a Python file. Do *not* import the source code. """ if not filename.endswith('.py'): return False return True # XXX: look inside?
[docs]def is_resource_json(filename, max_bytes=1e6): """Is this file a JSON Resource? Args: filename (str): Full path to file max_bytes (int): Max. allowable size. Since we try to parse the file, this saves potential DoS issues. Large files are a bad idea anyways, since this is metadata and may be stored somewhere with a record size limit (like MongoDB). Returns: (bool) Whether it's a resource JSON file. """ if not filename.endswith('.json'): return False # get size st = os.stat(filename) # if it's under max_bytes, parse it if st.st_size <= max_bytes: try: d = json.load(open(filename)) except (UnicodeDecodeError, JSONDecodeError): return False # look for a couple distinctive keys for key in 'id_', 'type': if key not in d: return False return True else: # if it's over max_bytes, it's "bad" return False
[docs]def datetime_timestamp(v): """Get numeric timestamp. This will work under both Python 2 and 3. Args: v (datetime.datetime): Date/time value Returns: (float) Floating point timestamp """ if hasattr(v, 'timestamp'): # Python 2/3 test # Python 2 result = v.timestamp() else: # Python 3 result = time.mktime(v.timetuple()) + v.microsecond / 1e6 return result
# # XXX: Replace this with 'blessings' module #
[docs]class CPrint(object): """Colorized terminal printing. Codes are below. To use: cprint = CPrint() cprint('This has no colors') # just like print() cprint('This is @b[blue] and @_r[red underlined]') You can use the same class as a no-op by just passing `color=False` to the constructor. """ COLORS = { 'h': '\033[1m\033[95m', 'r': '\033[91m', 'g': '\033[92m', 'y': '\033[93m', 'b': '\033[94m', 'm': '\033[95m', 'c': '\033[96m', 'w': '\033[97m', '.': '\033[0m', '*': '\033[1m', '-': '\033[2m', '_': '\033[4m', } _styled = re.compile(r'@([*_-]?[hbgyrwcm*_-])\[([^]]*)\]') def __init__(self, color=True): self._c = color def println(self, s): print(self.colorize(s)) def __call__(self, *args): return self.println(args[0]) def write(self, s): sys.stdout.write(self.colorize(s)) def colorize(self, s): chunks = [] last = 0 c, stop = '', '' for m in re.finditer(self._styled, s): code, text = m.groups() clen = len(code) if self._c: if clen == 2: if code[0] == code[1]: c = self.COLORS[code] else: c = self.COLORS[code[0]] + self.COLORS[code[1]] else: c = self.COLORS[code] stop = self.COLORS['.'] x, y = m.span() chunks.append(s[last:x]) # text since last found piece chunks.append(c + s[x + 2 + clen : y - 1] + stop) # colorized last = y chunks.append(s[last:]) # to end of string return ''.join(chunks)
[docs]def mkdir_p(path, *args): """Try to create all non-existent components of a path. Args: path (str): Path to create args: Other arguments for `os.mkdir()`. Returns: None Raises: os.error: Raised from `os.mkdir()` """ plist = path.split(os.path.sep) if plist[0] == '': # do not try to create filesystem root dir_name = os.path.sep plist = plist[1:] else: dir_name = '' for p in plist: dir_name = os.path.join(dir_name, p) if not os.path.exists(dir_name): os.mkdir(dir_name, *args)
[docs]def uuid_prefix_len(uuids, step=4, maxlen=32): """Get smallest multiple of `step` len prefix that gives unique values. The algorithm is not fancy, but good enough: build *sets* of the ids at increasing prefix lengths until the set has all ids (no duplicates). Experimentally this takes ~.1ms for 1000 duplicate ids (the worst case). """ full = set(uuids) all_of_them = len(full) for n in range(step, maxlen, step): prefixes = {u[:n] for u in uuids} if len(prefixes) == all_of_them: return n return maxlen