Source code for eppy.useful_scripts.loopdiagram

# Copyright (c) 2012 Santosh Philip
# Copyright (c) 2016 Jamie Bull
# =======================================================================
#  Distributed under the MIT License.
#  (See accompanying file LICENSE or copy at
#  http://opensource.org/licenses/MIT)
# =======================================================================

"""Draw all the  loops in the IDF file.

There are two output files saved in the same location as the idf file:
- idf_file_location/idf_filename.dot
- idf_file_location/idf_filename.png

"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import os
import sys

pathnameto_eplusscripting = "../../"
sys.path.append(pathnameto_eplusscripting)

import argparse
import eppy
import eppy.EPlusInterfaceFunctions
from eppy.EPlusInterfaceFunctions import readidf

# import pydot3k as pydot
import pydot

import eppy.loops as loops


pathnameto_eplusscripting = "../../"
sys.path.append(pathnameto_eplusscripting)


[docs]def firstisnode(edge): if type(edge[0]) == tuple: return True else: return False
[docs]def secondisnode(edge): if type(edge[1]) == tuple: return True else: return False
[docs]def bothnodes(edge): if type(edge[0]) == tuple and type(edge[1]) == tuple: return True else: return False
[docs]def dropnodes(edges): """draw a graph without the nodes""" newedges = [] added = False for edge in edges: if bothnodes(edge): newtup = (edge[0][0], edge[1][0]) newedges.append(newtup) added = True elif firstisnode(edge): for edge1 in edges: if edge[0] == edge1[1]: newtup = (edge1[0], edge[1]) try: newedges.index(newtup) except ValueError as e: newedges.append(newtup) added = True elif secondisnode(edge): for edge1 in edges: if edge[1] == edge1[0]: newtup = (edge[0], edge1[1]) try: newedges.index(newtup) except ValueError as e: newedges.append(newtup) added = True # gets the hanging nodes - nodes with no connection if not added: if firstisnode(edge): newedges.append((edge[0][0], edge[1])) if secondisnode(edge): newedges.append((edge[0], edge[1][0])) added = False return newedges
[docs]def makeanode(name): return pydot.Node(name, shape="plaintext", label=name)
[docs]def makeabranch(name): return pydot.Node(name, shape="box3d", label=name)
[docs]def makeendnode(name): return pydot.Node( name, shape="doubleoctagon", label=name, style="filled", fillcolor="#e4e4e4" )
[docs]def istuple(x): return type(x) == tuple
[docs]def nodetype(anode): """return the type of node""" try: return anode[1] except IndexError as e: return None
[docs]def edges2nodes(edges): """gather the nodes from the edges""" nodes = [] for e1, e2 in edges: nodes.append(e1) nodes.append(e2) nodedict = dict([(n, None) for n in nodes]) justnodes = list(nodedict.keys()) # justnodes.sort() justnodes = sorted(justnodes, key=lambda x: str(x[0])) return justnodes
[docs]def makediagram(edges): """make the diagram with the edges""" graph = pydot.Dot(graph_type="digraph") nodes = edges2nodes(edges) epnodes = [ (node, makeanode(node[0])) for node in nodes if nodetype(node) == "epnode" ] endnodes = [ (node, makeendnode(node[0])) for node in nodes if nodetype(node) == "EndNode" ] epbr = [(node, makeabranch(node)) for node in nodes if not istuple(node)] nodedict = dict(epnodes + epbr + endnodes) for value in list(nodedict.values()): graph.add_node(value) for e1, e2 in edges: graph.add_edge(pydot.Edge(nodedict[e1], nodedict[e2])) return graph
[docs]def transpose2d(mtx): """Transpose a 2d matrix [ [1,2,3], [4,5,6] ] becomes [ [1,4], [2,5], [3,6] ] """ return zip(*mtx)
[docs]def makebranchcomponents(data, commdct, anode="epnode"): """return the edges jointing the components of a branch""" alledges = [] objkey = "BRANCH" cnamefield = "Component %s Name" inletfield = "Component %s Inlet Node Name" outletfield = "Component %s Outlet Node Name" numobjects = len(data.dt[objkey]) cnamefields = loops.repeatingfields(data, commdct, objkey, cnamefield) inletfields = loops.repeatingfields(data, commdct, objkey, inletfield) outletfields = loops.repeatingfields(data, commdct, objkey, outletfield) inlts = loops.extractfields(data, commdct, objkey, [inletfields] * numobjects) cmps = loops.extractfields(data, commdct, objkey, [cnamefields] * numobjects) otlts = loops.extractfields(data, commdct, objkey, [outletfields] * numobjects) zipped = list(zip(inlts, cmps, otlts)) tzipped = [transpose2d(item) for item in zipped] for i in range(len(data.dt[objkey])): tt = tzipped[i] # branchname = data.dt[objkey][i][1] edges = [] for t0 in tt: edges = edges + [((t0[0], anode), t0[1]), (t0[1], (t0[2], anode))] alledges = alledges + edges return alledges
[docs]def makeairplantloop(data, commdct): """make the edges for the airloop and the plantloop""" anode = "epnode" endnode = "EndNode" # in plantloop get: # demand inlet, outlet, branchlist # supply inlet, outlet, branchlist plantloops = loops.plantloopfields(data, commdct) # splitters # inlet # outlet1 # outlet2 splitters = loops.splitterfields(data, commdct) # # mixer # outlet # inlet1 # inlet2 mixers = loops.mixerfields(data, commdct) # # supply barnchlist # branch1 -> inlet, outlet # branch2 -> inlet, outlet # branch3 -> inlet, outlet # # CONNET INLET OUTLETS edges = [] # get all branches branchkey = "branch".upper() branches = data.dt[branchkey] branch_i_o = {} for br in branches: br_name = br[1] in_out = loops.branch_inlet_outlet(data, commdct, br_name) branch_i_o[br_name] = dict(list(zip(["inlet", "outlet"], in_out))) # for br_name, in_out in branch_i_o.items(): # edges.append(((in_out["inlet"], anode), br_name)) # edges.append((br_name, (in_out["outlet"], anode))) # instead of doing the branch # do the content of the branch edges = makebranchcomponents(data, commdct) # connect splitter to nodes for splitter in splitters: # splitter_inlet = inletbranch.node splittername = splitter[0] inletbranchname = splitter[1] splitter_inlet = branch_i_o[inletbranchname]["outlet"] # edges = splitter_inlet -> splittername edges.append(((splitter_inlet, anode), splittername)) # splitter_outlets = ouletbranches.nodes outletbranchnames = [br for br in splitter[2:]] splitter_outlets = [branch_i_o[br]["inlet"] for br in outletbranchnames] # edges = [splittername -> outlet for outlet in splitter_outlets] moreedges = [(splittername, (outlet, anode)) for outlet in splitter_outlets] edges = edges + moreedges for mixer in mixers: # mixer_outlet = outletbranch.node mixername = mixer[0] outletbranchname = mixer[1] mixer_outlet = branch_i_o[outletbranchname]["inlet"] # edges = mixername -> mixer_outlet edges.append((mixername, (mixer_outlet, anode))) # mixer_inlets = inletbranches.nodes inletbranchnames = [br for br in mixer[2:]] mixer_inlets = [branch_i_o[br]["outlet"] for br in inletbranchnames] # edges = [mixername -> inlet for inlet in mixer_inlets] moreedges = [((inlet, anode), mixername) for inlet in mixer_inlets] edges = edges + moreedges # connect demand and supply side # for plantloop in plantloops: # supplyinlet = plantloop[1] # supplyoutlet = plantloop[2] # demandinlet = plantloop[4] # demandoutlet = plantloop[5] # # edges = [supplyoutlet -> demandinlet, demandoutlet -> supplyinlet] # moreedges = [((supplyoutlet, endnode), (demandinlet, endnode)), # ((demandoutlet, endnode), (supplyinlet, endnode))] # edges = edges + moreedges # # -----------air loop stuff---------------------- # from s_airloop2.py # Get the demand and supply nodes from 'airloophvac' # in airloophvac get: # get branch, supplyinlet, supplyoutlet, demandinlet, demandoutlet objkey = "airloophvac".upper() fieldlists = [ [ "Branch List Name", "Supply Side Inlet Node Name", "Demand Side Outlet Node Name", "Demand Side Inlet Node Names", "Supply Side Outlet Node Names", ] ] * loops.objectcount(data, objkey) airloophvacs = loops.extractfields(data, commdct, objkey, fieldlists) # airloophvac = airloophvacs[0] # in AirLoopHVAC:ZoneSplitter: # get Name, inlet, all outlets objkey = "AirLoopHVAC:ZoneSplitter".upper() singlefields = ["Name", "Inlet Node Name"] fld = "Outlet %s Node Name" repeatfields = loops.repeatingfields(data, commdct, objkey, fld) fieldlist = singlefields + repeatfields fieldlists = [fieldlist] * loops.objectcount(data, objkey) zonesplitters = loops.extractfields(data, commdct, objkey, fieldlists) # in AirLoopHVAC:SupplyPlenum: # get Name, Zone Name, Zone Node Name, inlet, all outlets objkey = "AirLoopHVAC:SupplyPlenum".upper() singlefields = ["Name", "Zone Name", "Zone Node Name", "Inlet Node Name"] fld = "Outlet %s Node Name" repeatfields = loops.repeatingfields(data, commdct, objkey, fld) fieldlist = singlefields + repeatfields fieldlists = [fieldlist] * loops.objectcount(data, objkey) supplyplenums = loops.extractfields(data, commdct, objkey, fieldlists) # in AirLoopHVAC:ZoneMixer: # get Name, outlet, all inlets objkey = "AirLoopHVAC:ZoneMixer".upper() singlefields = ["Name", "Outlet Node Name"] fld = "Inlet %s Node Name" repeatfields = loops.repeatingfields(data, commdct, objkey, fld) fieldlist = singlefields + repeatfields fieldlists = [fieldlist] * loops.objectcount(data, objkey) zonemixers = loops.extractfields(data, commdct, objkey, fieldlists) # in AirLoopHVAC:ReturnPlenum: # get Name, Zone Name, Zone Node Name, outlet, all inlets objkey = "AirLoopHVAC:ReturnPlenum".upper() singlefields = ["Name", "Zone Name", "Zone Node Name", "Outlet Node Name"] fld = "Inlet %s Node Name" repeatfields = loops.repeatingfields(data, commdct, objkey, fld) fieldlist = singlefields + repeatfields fieldlists = [fieldlist] * loops.objectcount(data, objkey) returnplenums = loops.extractfields(data, commdct, objkey, fieldlists) # connect room to each equip in equiplist # in ZoneHVAC:EquipmentConnections: # get Name, equiplist, zoneairnode, returnnode objkey = "ZoneHVAC:EquipmentConnections".upper() singlefields = [ "Zone Name", "Zone Conditioning Equipment List Name", "Zone Air Node Name", "Zone Return Air Node Name", ] repeatfields = [] fieldlist = singlefields + repeatfields fieldlists = [fieldlist] * loops.objectcount(data, objkey) equipconnections = loops.extractfields(data, commdct, objkey, fieldlists) # in ZoneHVAC:EquipmentList: # get Name, all equiptype, all equipnames objkey = "ZoneHVAC:EquipmentList".upper() singlefields = ["Name"] fieldlist = singlefields flds = ["Zone Equipment %s Object Type", "Zone Equipment %s Name"] repeatfields = loops.repeatingfields(data, commdct, objkey, flds) fieldlist = fieldlist + repeatfields fieldlists = [fieldlist] * loops.objectcount(data, objkey) equiplists = loops.extractfields(data, commdct, objkey, fieldlists) equiplistdct = dict([(ep[0], ep[1:]) for ep in equiplists]) for key, equips in list(equiplistdct.items()): enames = [equips[i] for i in range(1, len(equips), 2)] equiplistdct[key] = enames # adistuunit -> room # adistuunit <- VAVreheat # airinlet -> VAVreheat # in ZoneHVAC:AirDistributionUnit: # get Name, equiplist, zoneairnode, returnnode objkey = "ZoneHVAC:AirDistributionUnit".upper() singlefields = ["Name", "Air Terminal Object Type", "Air Terminal Name"] repeatfields = [] fieldlist = singlefields + repeatfields fieldlists = [fieldlist] * loops.objectcount(data, objkey) adistuunits = loops.extractfields(data, commdct, objkey, fieldlists) # code only for AirTerminal:SingleDuct:VAV:Reheat # get airinletnodes for vavreheats # in AirTerminal:SingleDuct:VAV:Reheat: # get Name, airinletnode adistuinlets = loops.makeadistu_inlets(data, commdct) alladistu_comps = [] for key in list(adistuinlets.keys()): objkey = key.upper() singlefields = ["Name"] + adistuinlets[key] repeatfields = [] fieldlist = singlefields + repeatfields fieldlists = [fieldlist] * loops.objectcount(data, objkey) adistu_components = loops.extractfields(data, commdct, objkey, fieldlists) alladistu_comps.append(adistu_components) # in AirTerminal:SingleDuct:Uncontrolled: # get Name, airinletnode objkey = "AirTerminal:SingleDuct:Uncontrolled".upper() singlefields = ["Name", "Zone Supply Air Node Name"] repeatfields = [] fieldlist = singlefields + repeatfields fieldlists = [fieldlist] * loops.objectcount(data, objkey) uncontrolleds = loops.extractfields(data, commdct, objkey, fieldlists) anode = "epnode" endnode = "EndNode" # edges = [] # connect demand and supply side # for airloophvac in airloophvacs: # supplyinlet = airloophvac[1] # supplyoutlet = airloophvac[4] # demandinlet = airloophvac[3] # demandoutlet = airloophvac[2] # # edges = [supplyoutlet -> demandinlet, demandoutlet -> supplyinlet] # moreedges = [((supplyoutlet, endnode), (demandinlet, endnode)), # ((demandoutlet, endnode), (supplyinlet, endnode))] # edges = edges + moreedges # connect zonesplitter to nodes for zonesplitter in zonesplitters: name = zonesplitter[0] inlet = zonesplitter[1] outlets = zonesplitter[2:] edges.append(((inlet, anode), name)) for outlet in outlets: edges.append((name, (outlet, anode))) # connect supplyplenum to nodes for supplyplenum in supplyplenums: name = supplyplenum[0] inlet = supplyplenum[3] outlets = supplyplenum[4:] edges.append(((inlet, anode), name)) for outlet in outlets: edges.append((name, (outlet, anode))) # connect zonemixer to nodes for zonemixer in zonemixers: name = zonemixer[0] outlet = zonemixer[1] inlets = zonemixer[2:] edges.append((name, (outlet, anode))) for inlet in inlets: edges.append(((inlet, anode), name)) # connect returnplenums to nodes for returnplenum in returnplenums: name = returnplenum[0] outlet = returnplenum[3] inlets = returnplenum[4:] edges.append((name, (outlet, anode))) for inlet in inlets: edges.append(((inlet, anode), name)) # connect room to return node for equipconnection in equipconnections: zonename = equipconnection[0] returnnode = equipconnection[-1] edges.append((zonename, (returnnode, anode))) # connect equips to room for equipconnection in equipconnections: zonename = equipconnection[0] zequiplistname = equipconnection[1] for zequip in equiplistdct[zequiplistname]: edges.append((zequip, zonename)) # adistuunit <- adistu_component for adistuunit in adistuunits: unitname = adistuunit[0] compname = adistuunit[2] edges.append((compname, unitname)) # airinlet -> adistu_component for adistu_comps in alladistu_comps: for adistu_comp in adistu_comps: name = adistu_comp[0] for airnode in adistu_comp[1:]: edges.append(((airnode, anode), name)) # supplyairnode -> uncontrolled for uncontrolled in uncontrolleds: name = uncontrolled[0] airnode = uncontrolled[1] edges.append(((airnode, anode), name)) # edges = edges + moreedges return edges
[docs]def getedges(fname, iddfile): """return the edges of the idf file fname""" data, commdct, _idd_index = readidf.readdatacommdct(fname, iddfile=iddfile) edges = makeairplantloop(data, commdct) return edges
[docs]def replace_colon(s, replacewith="__"): """replace the colon with something""" return s.replace(":", replacewith)
[docs]def clean_edges(arg): if isinstance(arg, str): return replace_colon(arg) try: return tuple(clean_edges(x) for x in arg) except TypeError: # catch when for loop fails return replace_colon(arg) # not a sequence so just return repr
[docs]def make_and_save_diagram(fname, iddfile): g = process_idf(fname, iddfile) save_diagram(fname, g)
[docs]def process_idf(fname, iddfile): data, commdct, _iddindex = readidf.readdatacommdct(fname, iddfile=iddfile) print("constructing the loops") edges = makeairplantloop(data, commdct) print("cleaning edges") edges = clean_edges(edges) print("making the diagram") return makediagram(edges)
[docs]def save_diagram(fname, g, silent=False): dotname = "%s.dot" % (os.path.splitext(fname)[0]) pngname = "%s.png" % (os.path.splitext(fname)[0]) g.write(dotname) if not silent: print("saved file: %s" % (dotname)) g.write_png(pngname) if not silent: print("saved file: %s" % (pngname))
[docs]def main(): parser = argparse.ArgumentParser( usage=None, description=__doc__, formatter_class=argparse.RawTextHelpFormatter ) # need the formatter to print newline from __doc__ parser.add_argument( "idd", type=str, action="store", help="location of idd file = ./somewhere/eplusv8-0-1.idd", ) parser.add_argument( "file", type=str, action="store", help="location of idf file = ./somewhere/f1.idf", ) args = parser.parse_args() make_and_save_diagram(args.file, args.idd)
if __name__ == "__main__": sys.exit(main())