##############################################################################
# Institute for the Design of Advanced Energy Systems Process Systems
# Engineering Framework (IDAES PSE Framework) Copyright (c) 2018-2019, by the
# software owners: The Regents of the University of California, through
# Lawrence Berkeley National Laboratory, National Technology & Engineering
# Solutions of Sandia, LLC, Carnegie Mellon University, West Virginia
# University Research Corporation, et al. All rights reserved.
#
# Please see the files COPYRIGHT.txt and LICENSE.txt for full copyright and
# license information, respectively. Both files are also available online
# at the URL "https://github.com/IDAES/idaes-pse".
##############################################################################
"""
Perform all logic, input, output of commands that is
particular to the CLI.
Call functions defined in 'api' module to handle logic
that is common to the API and CLI.
"""
# stdlib
import glob
import json
import logging
import math
import os
import re
import sys
# Third-party
import jsonschema
import pendulum
# Local
from .dmfbase import DMF, DMFConfig
from .util import strlist
from .util import is_jupyter_notebook, is_python, is_resource_json
from .util import ColorTerm
from .errors import (
ParseError,
CommandError,
WorkspaceNotFoundError,
WorkspaceConfNotFoundError,
WorkspaceError,
BadResourceError,
)
from . import resource
from .workspace import Workspace, find_workspaces
__author__ = "Dan Gunter"
_log = logging.getLogger(__name__)
[docs]def workspace_init(dirname, metadata):
# type: (str, dict) -> None
"""Initialize from root at `dirname`, set environment variable
for other commands, and parse config file.
"""
try:
ws = Workspace(dirname, create=True, add_defaults=True)
except OSError as err:
raise CommandError("init", "initialize workspace", str(err))
except ParseError as err:
raise CommandError("init", "parse config", str(err))
except WorkspaceError as err:
raise CommandError("init", "initialize workspace", str(err))
_log.info("Created new workspace in: {}".format(dirname))
if metadata:
ws.set_meta(metadata)
_log.info("Set metadata for: {}".format(strlist(list(metadata))))
def workspace_info(dirname):
# type: (str) -> None
t = ColorTerm()
try:
ws = Workspace(dirname, create=False)
except WorkspaceNotFoundError:
print("Workspace not found at path: {}".format(dirname))
raise CommandError("info", "find workspace", "not found at: {}".format(dirname))
except WorkspaceConfNotFoundError:
print("No configuration found for workspace for path: {}".format(dirname))
raise CommandError(
"info", "find workspace configuration", "not found at: {}".format(dirname)
)
num_obj = DMF(path=ws.root).count()
bullet = " - "
print(f"\n{t.blue}Workspace")
if ws.name and (ws.name != "none"):
if ws.description and (ws.description != "none"):
print(f" {t.blue}[{ws.name}] - {ws.description}")
else:
print(" {t.blue}{ws.name} - (no description)")
elif ws.description and (ws.description != "none"):
print(f" {t.blue}(no name) - {ws.description}")
else:
print(f" {t.blue}(no name or description)")
print("\nGeneral information")
print(f"{bullet}{t.blue}Location = {ws.root}")
info = ws.meta.copy()
if "_id" in info:
print(f"{bullet}{t.blue}Workspace identifier (_id) = {info['_id']}")
del info["_id"]
else:
print(f"{bullet}{t.blue}Workspace identifier (_id) = unknown")
if "created" in info:
print(f"{bullet}{t.blue}Created = {info[ws.CONF_CREATED]}")
else:
print(f"{bullet}{t.blue}Created = unknown")
if "modified" in info:
print(f"{bullet}{t.blue}Modified = {info[ws.CONF_MODIFIED]}")
else:
print(f"{bullet}{t.blue}Modified = unknown")
print(f"{bullet}{t.blue}Num. resources = {num_obj}")
print(f"\n{t.magenta}{t.bold}Configuration")
already_shown = (ws.CONF_MODIFIED, ws.CONF_CREATED, ws.CONF_NAME, ws.CONF_DESC)
for k in info.keys():
if k in already_shown:
continue
v = info[k]
print(f"{bullet}{t.blue}{k} = {v}")
print("")
[docs]def init_conf(workspace):
# type: (str) -> int
"""Initialize the workspace.
"""
t = ColorTerm()
# Open/create configuration file
try:
conf = DMFConfig()
except IOError as err:
print(f"Failed to open global configuration: {err}")
try:
open(DMFConfig._filename, "w")
except IOError:
print("Failed to create new configuration file")
return -1
print("Created new configuration file")
conf = DMFConfig()
# If a workspace argument is given, save this value,
# as the default workspace, in the configuration file
if workspace:
fullpath = os.path.abspath(workspace)
conf.c[conf.WORKSPACE] = fullpath
conf.save()
# Print contents of configuration file to standard output
print(
f"{t.magenta}{t.bold}DMF global configuration{t.reset} "
f"<{t.green}{conf._filename}>"
)
keys = conf.c.keys()
if keys:
for k in sorted(keys):
print(f" > {t.blue}{k}{t.reset} = {t.bold}{conf.c[k]}]")
else:
print(f"{t.blue}(empty)")
return 0
[docs]def workspace_import(path, patterns, exit_on_error):
# type: (str, list, bool) -> int
"""Import files into workspace.
Args:
path (str): Target workspace directory
patterns (list): List of Unix-style glob for files to import.
Files are expected to be resource JSON or a
Jupyter Notebook.
exit_on_error (bool): If False, continue trying to import resources
even if one or more fail.
Returns:
int: Number of things imported
Raises:
BadResourceError, if there is a problem
"""
d = DMF(path)
count = 0
for pattern in patterns:
for filename in glob.glob(pattern):
# Skip directories
if os.path.isdir(filename):
_log.warning('Not importing directory "{}"'.format(filename))
continue
# For Jupyter Notebooks, first create a (temporary)
# JSON resource from the original data.
if is_jupyter_notebook(filename):
try:
rsrc = _import_jupyternb(filename)
except ValueError as e:
msg = (
"Cannot create resource from Jupyter Notebook "
'"{}": {}'.format(filename, e)
)
if exit_on_error:
raise BadResourceError(msg)
_log.error(msg)
continue
# For Python files, first create a (temporary)
# JSON resource from the original data.
elif is_python(filename):
try:
rsrc = _import_python(filename)
except ValueError as e:
msg = "Cannot create resource from Python file " '"{}": {}'.format(
filename, e
)
if exit_on_error:
raise BadResourceError(msg)
_log.error(msg)
continue
# JSON resource file
elif is_resource_json(filename):
try:
rsrc = _import_resource(filename)
except ValueError as e:
msg = 'Bad resource from file "{}": {}'.format(filename, e)
if exit_on_error:
raise BadResourceError(msg)
_log.error(msg)
continue
# Generic file import
else:
try:
rsrc = _import_file(filename)
except ValueError as e:
msg = "Cannot create resource from file " '"{}": {}'.format(
filename, e
)
if exit_on_error:
raise BadResourceError(msg)
_log.error(msg)
continue
# Resource in hand. Now add it.
d.add(rsrc)
count += 1
return count
[docs]def list_workspaces(root, stream=None):
"""List workspaces found from a given root path.
Args:
root: root path
stream: Output stream (must have .write() method)
"""
workspaces = find_workspaces(root)
if stream is None or stream == sys.stdout:
colors = True
else:
colors = False
t = ColorTerm(enabled=colors)
if colors:
output_table = [("Path", "Name")]
else:
output_table = [("Path", "Name"), ("----", "----")]
widths = [4, 4]
any_good_workspaces = False
for w in sorted(workspaces):
try:
ws = Workspace(w)
output_table.append((w, ws.name))
widths = [max(len(w), widths[0]), max(len(ws.name), widths[1])]
any_good_workspaces = True
except WorkspaceError:
pass # XXX: Should we print a warning?
if not any_good_workspaces:
# either no paths, or all paths raised an error
stream.write("ERROR: No valid workspaces found\n")
else:
colfmts = ["{{:{:d}s}}".format(width) for width in widths]
first_row = True
for row in output_table:
for i in (0, 1):
if colors:
if first_row:
fmt = f"{t.bold}{colfmts[i]}"
else:
fmt = f"{[t.blue, t.white][i]}{colfmts[i]}"
fmt += t.reset
else:
fmt = colfmts[i]
stream.write(fmt.format(row[i]))
stream.write("\n" if i == 1 else " ")
first_row = False
[docs]def list_resources(path, long_format=None, relations=False):
"""List resources in a given DMF workspace.
Args:
path (str): Path to the workspace
long_format (bool): List in long format flag
relations (bool): Show relationships, in long format
Returns:
None
"""
t = ColorTerm()
d = DMF(path)
if long_format:
resources = list(d.find())
uuid_pfx = _uuid_prefix([r.uuid for r in resources])
fields = ("uuid", "name", "type", "modified", "created")
widths = (uuid_pfx, 30, 20, 19, 19)
colors = (t.green, t.white, t.yellow, t.white, t.white)
fmts = [f"{{:{w}s}}" for w in widths]
left_gutter = "| " if relations else ""
# table header
print(
" " * len(left_gutter)
+ t.bold
+ " ".join([f.format(v) for f, v in zip(fmts, fields)])
+ t.reset
)
def datestr(t):
p = pendulum.from_timestamp(t)
return p.to_datetime_string()
# table body
for r in resources:
values = list(getattr(r, k) for k in fields[:-2])
values.append(datestr(r.modified))
values.append(datestr(r.created))
if not values[1] and r.desc:
values[1] = r.desc[: widths[1]]
else:
values[1] = values[1][: widths[1]]
if uuid_pfx < 32:
values[0] = values[0][:uuid_pfx]
print(
left_gutter
+ " ".join([c + f.format(v) for c, f, v in zip(colors, fmts, values)])
+ t.reset
)
if relations and len(r.relations) > 0:
relitems = []
for rel in r.relations:
if rel.subject == r.uuid:
fmt = f"{t.white}{{p}}->{t.blue}{{o}}"
else:
fmt = f"{t.blue}{{s}}->{t.white}{{p}}"
item = fmt.format(
s=rel.subject[:uuid_pfx],
p=rel.predicate,
o=rel.object[:uuid_pfx],
)
relitems.append(item)
print(f"+-- {' / '.join(relitems)}")
else:
items = []
for r in d.find():
name_color = "w"
if r.name:
name = r.name
elif r.desc:
name = r.desc[:40]
name_color = t.blue
else:
name = r.uuid
name_color = t.green
item = f"{name_color}{name}{t.yellow}:{r.type}"
items.append(item)
if items:
columnized = _display_in_columns(items, max_line=t.width)
print(columnized + t.reset)
def _uuid_prefix(uuids, step=4, maxlen=32):
"""Get smallest multiple of `step` len prefix that gives unique values.
"""
full = set(uuids)
for n in range(step, maxlen, step):
prefixes = {u[:n] for u in uuids}
if len(prefixes) == len(full):
return n
return maxlen
def cat_resources(path, objects=(), color=True):
d = DMF(path=path)
t = ColorTerm(enabled=color)
unmatched = set(objects)
first = True
# get all resources,
# display any that match an object as a prefix
for r in d.find():
for oid in unmatched:
if r.uuid.startswith(oid):
unmatched.remove(oid) # don't show twice
if not first:
_cat_resource_sep(t)
_cat_resource_show(t, r)
first = False
break
def _cat_resource_sep(t):
print(f"{t.blue}{'-' * 60}")
def _cat_resource_show(cp, r):
d = r.as_dict()
json.dump(d, cp, indent=2)
print()
# regular expression to find VT100 color escape codes
_noprint_re = re.compile(r"\033\[[0-9]+m")
def _display_in_columns(items, max_line=80, col_sep=" ", row_sep="\n"):
"""Take a list of items and max line width, and calculate display
of the items in columns.
The algorithm is simple, just trying increasing numbers of columns and
picking the largest number that did not result in a row that was too wide.
The input items are not re-ordered.
Args:
items (List[str]): String items
max_line (int): Maximum width for any displayed line (row)
col_sep (str): Separator between columns, after each item
row_sep (str): Separator between rows, at the end of each line
Returns:
str:
"""
if not items:
return ""
# Calculate item lengths, stripping terminal escapes
lengths, nplengths = [], []
for item in items:
clean = _noprint_re.sub("", item)
lengths.append(len(clean))
nplengths.append(len(item) - len(clean))
col_sep_len = len(col_sep) # useful later
# Give up immediately, putting everything in one column,
# if any single item doesn't fit
if max_line <= max(lengths) + col_sep_len:
return row_sep.join(items)
# Determine maximum number of columns
max_columns = 1 # number of columns
max_widths = [max_line] # width of each column
n = len(lengths)
# Determine number of columns.
# Start at 2 columns, stop when cannot fit items side-by-side
for i in range(2, n):
# initialize calculated widths of each column
widths = [0] * i
# for display where all columns are same length except last,
# number of items per column is ceiling of items/#col
nitems = int(math.ceil(n / i))
# put items in each column
for col in range(i):
pad = 0 if col == (i - 1) else col_sep_len # sep. between columns
# put items in the current column, adjusting the column
# max width to widest item
maxj = min(n, (col + 1) * nitems) # don't overshoot on last col
for j in range(col * nitems, maxj):
widths[col] = max(widths[col], lengths[j] + pad)
# total width is sum of column widths
line_len = sum(widths)
# if we went over, then stop
if line_len > max_line:
break
# otherwise, this is valid -- save and continue
max_columns, max_widths = i, widths[:]
# Put items into rows of max. number of columns determined above
nrows, rows = int(math.ceil(n / max_columns)), []
for row in range(nrows):
col, row_items = 0, []
# skip through items by nrows at a time, to move across the columns,
# and fill in the items for the current row (which acts as an offset)
for i in range(row, len(items), nrows):
# create format string with width = col. max including esc chars,
# but without padding since we will add that when we join
# the row items together
pad = 0 if col == (max_columns - 1) else col_sep_len
fmt = "{{:{n}s}}".format(n=max_widths[col] + nplengths[i] - pad)
# format row item for column
row_items.append(fmt.format(items[i]))
col += 1 # move to next column
# append the row items as big string
rows.append(col_sep.join(row_items))
# Final result is a big string of the rows joined together
return row_sep.join(rows)
def _import_resource(filename):
"""Import a resource from 'filename'. Raises a ValueError if that
fails. Most of the code is simply generating error messages.
"""
if not os.path.exists(filename):
raise ValueError('File "{}" not found'.format(filename))
try:
f = open(filename)
except Exception as e:
raise ValueError('Cannot open file "{}": {}'.format(filename, e))
try:
j = json.load(f)
except json.JSONDecodeError as e:
raise ValueError('Cannot parse JSON file "{}": {}'.format(filename, e))
try:
r = resource.Resource(value=j)
r.validate()
except (ValueError, jsonschema.ValidationError) as err:
raise ValueError("Invalid resource: {}".format(err))
return r
def _import_jupyternb(path):
"""Create & import a resource from a Jupyter Notebook file at `path`.
Assume that `path` exists and is a Jupyter Notebook.
Args:
path (str): Jupyter Notebook file.
Returns:
(Resource) DMF Resource representing the notebook.
"""
r = resource.Resource(type_=resource.TY_NOTEBOOK)
filename = os.path.splitext(os.path.split(path)[1])[0]
# XXX: add notebook 'metadata' as FilePath metadata attr
r.v["datafiles"].append({"desc": filename, "path": path})
r.v["desc"] = filename
r.validate()
return r
def _import_python(path):
"""Create & import a resource from a Python file at `path`.
Assume that `path` exists and is a valid Python file.
Args:
path (str): Python file name.
Returns:
(Resource) DMF Resource representing the notebook.
"""
r = resource.Resource(type_=resource.TY_CODE)
filename = os.path.splitext(os.path.split(path)[1])[0]
r.v["codes"].append({"name": filename, "language": "python", "type": "module"})
r.v["datafiles"].append({"desc": filename, "path": path})
r.validate()
return r
def _import_file(path):
"""Create & import a resource from a generic file at `path`.
Assume that `path` exists.
Args:
path (str): File name.
Returns:
(Resource) DMF Resource representing the notebook.
"""
r = resource.Resource(type_=resource.TY_DATA)
filename = os.path.split(path)[1]
r.v["datafiles"].append({"desc": filename, "path": path})
r.v["desc"] = filename
r.validate()
return r