Source code for

Core module defining the functions for converting a consumption Markov Decision 
Process from consMDP model to dot representation and present it. 
import math
import re
import subprocess
import sys

from .objectives import *
from .objectives import _HELPER_AS_REACH, _HELPER_BUCHI

global _show_default
_show_default = "sprb"
global _dot_pr
_dot_pr = "dot"
global _dot_options
_dot_options = {"fillcolor_target": "#ccffcc", "max_states": 50}

tab_state_cell_style = ' rowspan="{}"'
cell_style    = ' align="center" valign="middle"'
targets_style        = f', style="filled,rounded", fillcolor="{_dot_options["fillcolor_target"]}"'
default_table_style         = ' border="0" cellborder="0" cellspacing="0"' +\
                       ' cellpadding="0" align="center" valign="middle"' +\
                       ' style="rounded" bgcolor="#ffffff50"'
legend_style = "border='0' cellborder='0' color='black' cellspacing='0'"

# For each letter set a list of objectives that should be displayed
opt_to_objs = {
    "m": [MIN_INIT_CONS],
    "s": [SAFE],
    "p": [POS_REACH],
    "r": [AS_REACH],
    "b": [BUCHI],

table_shape = [

[docs]class consMDP2dot: """Convert consMDP to dot""" def __init__(self, mdp, solver=None, options=""): self.mdp = mdp self.opt_string = _show_default if options: if options[0] == ".": self.opt_string = options[1:] + _show_default else: self.opt_string = options opt_s = self.opt_string # Parse max states from options max_states = _dot_options["max_states"] max_i = opt_s.find("<") if max_i > -1: pos = max_i + 1 max_v = "" while pos < len(opt_s) and \ (opt_s[pos].isdigit() or (pos == max_i + 1 and opt_s[pos] == "-")): max_v += opt_s[pos] pos += 1 max_states = math.inf if max_v[0] == "-" else int(max_v) self._opts = { "print_legend" : "l" in self.opt_string and "L" not in self.opt_string, "print_MELs" : False, "max_states" : max_states, "names" : _dot_options.get("names", True), "labels" : _dot_options.get("state_labels", True) and \ hasattr(self.mdp, "state_labels"), } # Print names? if "n" in options: self._opts["names"] = True if "N" in options: self._opts["names"] = False # Print labels? if "a" in options: self._opts["labels"] = True if "A" in options: self._opts["labels"] = False # targets given? res ='T{([^}]*)}', opt_s) if res: t_str = self._opts["targets"] = [int(t) for t in t_str.split(",")] self.act_color = "black" self.prob_color = "gray52" self.solver = solver self.targets = self._opts.get("targets", []) if self.targets and solver is not None: raise ValueError("Targets are specified both by the " \ "solver and in options using T{targets}") if solver is not None: self.targets = solver.targets # Trim the mdp if needed self.incomplete = set() if self.mdp.num_states > max_states: from .utils import copy_consmdp self.mdp, state_map = copy_consmdp(self.mdp, max_states=max_states, solver=solver) self.incomplete = self.mdp.incomplete # Update targets accordingly self.targets = [state_map[s] for s in state_map if s in self.targets] if solver is None: self.mel_values = None else: self.mel_values = solver.min_levels.copy() helpers = solver.helper_levels.copy() # Rename states for trimmed automata if self.incomplete: for objective in self.mel_values: vals = self.mel_values[objective] new_vals = [] for s, p in state_map.items(): new_vals.append(vals[s]) self.mel_values[objective] = new_vals for objective in helpers: vals = helpers[objective] new_vals = [] for s, p in state_map.items(): new_vals.append(vals[s]) helpers[objective] = new_vals if AS_REACH in helpers: self.mel_values[_HELPER_AS_REACH] = helpers[AS_REACH] if BUCHI in helpers: self.mel_values[_HELPER_BUCHI] = helpers[BUCHI] # Prepare for printing MELs self._mel_opts = { "print": False, "rows": [False, False], "cols": [False, False, False, False], "shape": table_shape, } self.mel_settings = dict() self.mel_settings[SAFE] = { "name": "Survival", "enabled": False, "color": "red", } self.mel_settings[POS_REACH] = { "name": "Positive reachability", "enabled": False, "color": "deepskyblue", } self.mel_settings[AS_REACH] = { "name": "Reachability", "enabled": False, "color": "dodgerblue4", } self.mel_settings[BUCHI] = { "name": "Büchi", "enabled": False, "color": "forestgreen", } self.mel_settings[MIN_INIT_CONS] = { "name": "MinInitCons", "enabled": False, "color": "orange", } self.mel_settings[_HELPER_AS_REACH] = { "name": "Reachability-safe", "enabled": False, "color": "blue4", } self.mel_settings[_HELPER_BUCHI] = { "name": "Büchi-safe", "enabled": False, "color": "darkgreen", } self.mel_settings["dummy"] = { "enabled": False, } # Resolve which values should be printed requested = set() for opt in opt_to_objs: if opt in self.opt_string: requested.update(opt_to_objs[opt]) if not self.mel_values is None: for row, cols in enumerate(self._mel_opts["shape"]): for col, objective in enumerate(cols): enabled = objective in requested and \ objective in self.mel_values if enabled: self.mel_settings[objective]["enabled"] = True self._opts["print_MELs"] = True self._mel_opts["print"] = True self._mel_opts["rows"][row] = True self._mel_opts["cols"][col] = True self.res = ""
[docs] def get_dot(self): self.start() m = self.mdp for s in range(m.num_states): self.process_state(s) for a in m.actions_for_state(s): self.process_action(a) if s in self.incomplete: self.add_incomplete(s) if self._opts["print_legend"]: self.add_legend() self.finish() return self.res
[docs] def start(self): gr_name = if else "" self.res += f'digraph "{gr_name}" {{\n' self.res += ' rankdir=LR\n' color = _dot_options.get("fillcolor", None) if color is not None: self.res += f' node[style="filled", fillcolor="{color}"]\n' font = _dot_options.get("font", None) if font is not None: self.res += f' node[fontname="{font}"]\n' self.res += f' edge[fontname="{font}"]\n' self.res += f' fontname="{font}"\n' # decide node shapes long_names = False if self._opts["names"]: for s in range(self.mdp.num_states): name = self.mdp.names[s] if name is not None and len(name) > 4: long_names = True break rect = False if self._mel_opts["print"]: rect = sum(self._mel_opts["cols"]) > 1 rect = rect or long_names or self._opts["labels"] if rect: self.res += f' node[shape="box", style="rounded,filled", width="0.5", height="0.4"]\n' elif self.mdp.num_states < 10 and not self._opts["names"]: self.res += f' node[shape="circle"]\n' else: self.res += f' node[shape ="ellipse", width="0.5", height="0.5"]\n'
[docs] def finish(self): self.res += "}\n"
[docs] def get_state_name(self, s): if not self._opts["names"] or self.mdp.names[s] is None: return str(s) return self.mdp.names[s]
[docs] def process_state(self, s): self.res += f"\n {s} [" # name name = self.get_state_name(s) # Create the state label (a table if MELs requested) if self._opts["print_MELs"]: state_str = f"<table{default_table_style}>" # Start with a state name rows = sum(self._mel_opts["rows"]) state_str += f"<tr><td rowspan='{rows}'>{name}</td>" print_tr = False for row, cols in enumerate(self._mel_opts["shape"]): if print_tr: state_str += f"<tr>" for col, objective in enumerate(cols): settings = self.mel_settings[objective] if not self._mel_opts["cols"][col]: continue if not settings["enabled"]: state_str += "<td></td>" continue color = settings["color"] val = self.mel_values[objective][s] if val == math.inf: val = "∞" state_str += f"<td{cell_style}>" \ f"<font color='{color}' point-size='10'>" \ f"{val}</font></td>" state_str += "</tr>" print_tr = True state_str += "</table>" else: state_str = name label_str = "" if self._opts["labels"]: labels = {self.mdp.AP[ap] for ap in self.mdp.state_labels[s]} label_str = "∅" if not labels else labels.__str__() label_str = f'<BR ALIGN="CENTER" />{label_str}' self.res += f'label=<{state_str}{label_str}>' # Reload states are double circled and target states filled if self.mdp.is_reload(s): self.res += ", peripheries=2" if self.targets is not None and s in self.targets: self.res += targets_style self.res += "]\n"
[docs] def add_incomplete(self, s): """ Adds a dashed line from s to a dummy ... node for the given state s. """ self.res += f'\n u{s} [label="...", shape=none, width=0, height=0, ' \ f'tooltip="hidden successors"]\n {s} -> u{s} ' \ f'[style=dashed, tooltip="hidden successors"]'
[docs] def add_legend(self): if not self._opts["print_MELs"]: return cols = sum(self._mel_opts["cols"]) self.res += f'\nsubgraph key {{\n \tlabel="legend"' \ f'\n\tlegend [shape=box, style="rounded,filled", ' \ f'label=<\n\t\t<table {legend_style}>' # Start with a state name rows = sum(self._mel_opts["rows"]) self.res += f"<tr><td rowspan='{rows}'>s</td>" print_tr = False for row, cols in enumerate(self._mel_opts["shape"]): if not self._mel_opts["rows"][row]: continue if print_tr: self.res += f"\n\t\t<tr>" for col, objective in enumerate(cols): settings = self.mel_settings[objective] if not self._mel_opts["cols"][col]: continue if not settings["enabled"]: self.res += "<td></td>" continue color = settings["color"] val = settings["name"] self.res += f"\n\t\t\t<td{cell_style}>" \ f"<font color='{color}' point-size='10'>" \ f"{val}</font></td>" self.res += "\n\t\t</tr>" print_tr = True self.res += "\n\t\t</table>\n\t>];\n}"
[docs] def process_action(self, a): act_id = f"\"{a.src}_{a.label}\"" # Src -> action-node self.res += f" {a.src} -> {act_id}" self.res += f"[arrowhead=onormal,label=\"{a.label}: {a.cons}\"" self.res += f", color={self.act_color}, fontcolor={self.act_color}]\n" # action-node self.res += f' {act_id}[label=<>,shape=point,style="",width=".1",fillcolor="gray52"]\n' # action-node -> dest for dst, p in a.distr.items(): self.res += f" {act_id} -> {dst}[label={p}, color={self.prob_color}, fontcolor={self.prob_color}]"
[docs]def dot_to_svg(dot_str, mdp=None): """ Send some text to dot for conversion to SVG. """ if mdp is not None and mdp.dot_layout is not None: dotpr = mdp.dot_layout else: dotpr = _dot_pr try: dot_pr = subprocess.Popen([dotpr, '-Tsvg'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except FileNotFoundError: print("The command 'dot' seems to be missing on your system.\n" "Please install the GraphViz package " "and make sure 'dot' is in your PATH.", file=sys.stderr) raise stdout, stderr = dot_pr.communicate(dot_str.encode('utf-8')) if stderr: print("Calling 'dot' for the conversion to SVG produced the message:\n" + stderr.decode('utf-8') + dot_str, file=sys.stderr) ret = dot_pr.wait() if ret: raise subprocess.CalledProcessError(ret, 'dot') return stdout.decode('utf-8')