From 48b3e16f6b53fc96578dc57310a57626a3f819d1 Mon Sep 17 00:00:00 2001 From: "Nikolaus C. Awtrey" Date: Thu, 15 Aug 2024 21:35:19 -0700 Subject: [PATCH 1/3] MAINT: Clean up code in `calculations.py` module * Restructures all calculation functions so the symbolic output is first in the code path * Changes all `output_strings`/`key` mismatch errors to use the same error message * Various variable name changes to make code more readable --- kda/calculations.py | 288 +++++++++++++++++++------------------------- 1 file changed, 125 insertions(+), 163 deletions(-) diff --git a/kda/calculations.py b/kda/calculations.py index 2f12cec..17fe74c 100644 --- a/kda/calculations.py +++ b/kda/calculations.py @@ -128,11 +128,10 @@ def calc_sigma(G, dirpar_edges, key="name", output_strings=True): Returns ------- - sigma : float - Normalization factor for state probabilities. - sigma_str : str - Sum of rate products of all directional diagrams for input - diagram ``G``, in string form. + sigma : float or str + Sum of rate products of all directional diagrams for the input + diagram ``G`` as a float (``output_strings=False``) for a string + (``output_strings=True``). Notes ----- @@ -160,41 +159,33 @@ def calc_sigma(G, dirpar_edges, key="name", output_strings=True): of all directional diagrams for the kinetic diagram. """ - # Number of nodes/states - n_states = G.number_of_nodes() - n_dirpars = dirpar_edges.shape[0] - edge_value = G.edges[list(G.edges)[0]][key] + edge_is_str = isinstance(G.edges[list(G.edges)[0]][key], str) + if output_strings != edge_is_str: + msg = f"""Inputs `key={key}` and `output_strings={output_strings}` + do not match. If symbolic outputs are requested the input `key` + should retrieve edge data from `G` that corresponds to symbolic + variable names for all edges.""" + raise TypeError(msg) - if not output_strings: - if isinstance(edge_value, str): - raise TypeError( - "To enter variable strings set parameter output_strings=True." - ) - dirpar_rate_products = np.ones(n_dirpars, dtype=float) + n_dir_diagrams = dirpar_edges.shape[0] + if output_strings: + rate_products = np.empty(shape=(n_dir_diagrams,), dtype=object) # iterate over the directional diagrams for i, edge_list in enumerate(dirpar_edges): - # iterate over the edges in the given directional diagram i - for edge in edge_list: - # multiply the rate of each edge in edge_list - dirpar_rate_products[i] *= G.edges[edge][key] - sigma = math.fsum(dirpar_rate_products) - return sigma - elif output_strings: - if not isinstance(edge_value, str): - raise TypeError( - "To enter variable values set parameter output_strings=False." - ) - dirpar_rate_products = np.empty(shape=(n_dirpars,), dtype=object) + rates = [G.edges[edge][key] for edge in edge_list] + rate_products[i] = "*".join(rates) + # sum all terms to get normalization factor + sigma = "+".join(rate_products) + else: + rate_products = np.ones(n_dir_diagrams, dtype=float) # iterate over the directional diagrams for i, edge_list in enumerate(dirpar_edges): - rate_product_vals = [] + # iterate over the edges in the given directional diagram i for edge in edge_list: - # append rate constant names from dir_par to list - rate_product_vals.append(G.edges[edge][key]) - dirpar_rate_products[i] = "*".join(rate_product_vals) - # sum all terms to get normalization factor - sigma_str = "+".join(dirpar_rate_products) - return sigma_str + # multiply the rate of each edge in edge_list + rate_products[i] *= G.edges[edge][key] + sigma = math.fsum(rate_products) + return sigma def calc_sigma_K(G, cycle, flux_diags, key="name", output_strings=True): @@ -228,12 +219,10 @@ def calc_sigma_K(G, cycle, flux_diags, key="name", output_strings=True): Returns ------- - sigma_K : float - Sum of rate products of directional flux diagram edges pointing to - input cycle. - sigma_K_str : str + sigma_K : float or str Sum of rate products of directional flux diagram edges pointing to - input cycle in string form. + input cycle as a float (``output_strings=False``) or as a string + (``output_strings=True``). Notes ----- @@ -252,55 +241,45 @@ def calc_sigma_K(G, cycle, flux_diags, key="name", output_strings=True): sum. For cycles with no flux diagrams, :math:`\Sigma_{k} = 1`. """ - if isinstance(flux_diags, list) == False: - print( - "No flux diagrams detected for cycle {}. Sigma K value is 1.".format(cycle) - ) + if not isinstance(flux_diags, list): + print(f"No flux diagrams detected for cycle {cycle}. Sigma K value is 1.") return 1 + edge_is_str = isinstance(G.edges[list(G.edges)[0]][key], str) + if output_strings != edge_is_str: + msg = f"""Inputs `key={key}` and `output_strings={output_strings}` + do not match. If symbolic outputs are requested the input `key` + should retrieve edge data from `G` that corresponds to symbolic + variable names for all edges.""" + raise TypeError(msg) + + # check that the input cycle is in the correct order + ordered_cycle = _get_ordered_cycle(G, cycle) + cycle_edges = diagrams._construct_cycle_edges(ordered_cycle) + if output_strings: + rate_products = [] + for diagram in flux_diags: + diag = diagram.copy() + for edge in cycle_edges: + diag.remove_edge(edge[0], edge[1], edge[2]) + diag.remove_edge(edge[1], edge[0], edge[2]) + rates = [] + for edge in diag.edges: + rates.append(G.edges[edge[0], edge[1], edge[2]][key]) + rate_products.append("*".join(rates)) + sigma_K = "+".join(rate_products) else: - # check that the input cycle is in the correct order - ordered_cycle = _get_ordered_cycle(G, cycle) - cycle_edges = diagrams._construct_cycle_edges(ordered_cycle) - if output_strings == False: - if isinstance( - G.edges[cycle_edges[0][0], cycle_edges[0][1], cycle_edges[0][2]][key], - str, - ): - raise TypeError( - "To enter variable strings set parameter output_strings=True." - ) - rate_products = [] - for diagram in flux_diags: - diag = diagram.copy() - for edge in cycle_edges: - diag.remove_edge(edge[0], edge[1], edge[2]) - diag.remove_edge(edge[1], edge[0], edge[2]) - vals = 1 - for edge in diag.edges: - vals *= G.edges[edge[0], edge[1], edge[2]][key] - rate_products.append(vals) - sigma_K = math.fsum(rate_products) - return sigma_K - elif output_strings == True: - if not isinstance( - G.edges[cycle_edges[0][0], cycle_edges[0][1], cycle_edges[0][2]][key], - str, - ): - raise TypeError( - "To enter variable values set parameter output_strings=False." - ) - rate_products = [] - for diagram in flux_diags: - diag = diagram.copy() - for edge in cycle_edges: - diag.remove_edge(edge[0], edge[1], edge[2]) - diag.remove_edge(edge[1], edge[0], edge[2]) - rates = [] - for edge in diag.edges: - rates.append(G.edges[edge[0], edge[1], edge[2]][key]) - rate_products.append("*".join(rates)) - sigma_K_str = "+".join(rate_products) - return sigma_K_str + rate_products = [] + for diagram in flux_diags: + diag = diagram.copy() + for edge in cycle_edges: + diag.remove_edge(edge[0], edge[1], edge[2]) + diag.remove_edge(edge[1], edge[0], edge[2]) + vals = 1 + for edge in diag.edges: + vals *= G.edges[edge[0], edge[1], edge[2]][key] + rate_products.append(vals) + sigma_K = math.fsum(rate_products) + return sigma_K def calc_pi_difference(G, cycle, order, key="name", output_strings=True): @@ -336,12 +315,10 @@ def calc_pi_difference(G, cycle, order, key="name", output_strings=True): Returns ------- - pi_diff : float - Difference of product of counter clockwise cycle rates and clockwise - cycle rates. - pi_diff_str : str - String of difference of product of counter clockwise cycle rates and - clockwise cycle rates. + pi_difference : float or str + Difference of the counter-clockwise and clockwise cycle + rate-products as a float (``output_strings=False``) + or a string (``output_strings=True``). Notes ----- @@ -362,38 +339,33 @@ def calc_pi_difference(G, cycle, order, key="name", output_strings=True): the forward (i.e. positive) direction is counter-clockwise (CCW). """ + edge_is_str = isinstance(G.edges[list(G.edges)[0]][key], str) + if output_strings != edge_is_str: + msg = f"""Inputs `key={key}` and `output_strings={output_strings}` + do not match. If symbolic outputs are requested the input `key` + should retrieve edge data from `G` that corresponds to symbolic + variable names for all edges.""" + raise TypeError(msg) + # check that the input cycle is in the correct order ordered_cycle = _get_ordered_cycle(G, cycle) CCW_cycle = graph_utils.get_ccw_cycle(ordered_cycle, order) cycle_edges = diagrams._construct_cycle_edges(CCW_cycle) - if output_strings == False: - if isinstance( - G.edges[cycle_edges[0][0], cycle_edges[0][1], cycle_edges[0][2]][key], str - ): - raise TypeError( - "To enter variable strings set parameter output_strings=True." - ) - ccw_rates = 1 - cw_rates = 1 - for edge in cycle_edges: - ccw_rates *= G.edges[edge[0], edge[1], edge[2]][key] - cw_rates *= G.edges[edge[1], edge[0], edge[2]][key] - pi_difference = ccw_rates - cw_rates - return pi_difference - elif output_strings == True: - if not isinstance( - G.edges[cycle_edges[0][0], cycle_edges[0][1], cycle_edges[0][2]][key], str - ): - raise TypeError( - "To enter variable values set parameter output_strings=False." - ) + if output_strings: ccw_rates = [] cw_rates = [] for edge in cycle_edges: ccw_rates.append(G.edges[edge[0], edge[1], edge[2]][key]) cw_rates.append(G.edges[edge[1], edge[0], edge[2]][key]) pi_difference = "-".join(["*".join(ccw_rates), "*".join(cw_rates)]) - return pi_difference + else: + ccw_rates = 1 + cw_rates = 1 + for edge in cycle_edges: + ccw_rates *= G.edges[edge[0], edge[1], edge[2]][key] + cw_rates *= G.edges[edge[1], edge[0], edge[2]][key] + pi_difference = ccw_rates - cw_rates + return pi_difference def calc_thermo_force(G, cycle, order, key="name", output_strings=True): @@ -427,12 +399,12 @@ def calc_thermo_force(G, cycle, order, key="name", output_strings=True): Returns ------- - thermo_force : float - The calculated thermodynamic force for the input cycle. This value is - unitless and should be multiplied by ``kT``. - parsed_thermo_force_str : ``SymPy`` expression - Symbolic thermodynamic force expression. Should be - multiplied by ``kT`` to get actual thermodynamic force. + thermo_force : float or ``SymPy`` expression + The thermodynamic force for the input ``cycle`` returned + as a float (``output_strings=False``) or a ``SymPy`` expression + (``output_strings=True`). The returned value is unitless and + should be multiplied by ``kT`` to calculate the actual + thermodynamic force. Notes ----- @@ -452,41 +424,36 @@ def calc_thermo_force(G, cycle, order, key="name", output_strings=True): (i.e. :math:`\chi_{k} = 0`). """ + edge_is_str = isinstance(G.edges[list(G.edges)[0]][key], str) + if output_strings != edge_is_str: + msg = f"""Inputs `key={key}` and `output_strings={output_strings}` + do not match. If symbolic outputs are requested the input `key` + should retrieve edge data from `G` that corresponds to symbolic + variable names for all edges.""" + raise TypeError(msg) + # check that the input cycle is in the correct order ordered_cycle = _get_ordered_cycle(G, cycle) CCW_cycle = graph_utils.get_ccw_cycle(ordered_cycle, order) cycle_edges = diagrams._construct_cycle_edges(CCW_cycle) - if output_strings == False: - if isinstance( - G.edges[cycle_edges[0][0], cycle_edges[0][1], cycle_edges[0][2]][key], str - ): - raise TypeError( - "To enter variable strings set parameter output_strings=True." - ) - ccw_rates = 1 - cw_rates = 1 - for edge in cycle_edges: - ccw_rates *= G.edges[edge[0], edge[1], edge[2]][key] - cw_rates *= G.edges[edge[1], edge[0], edge[2]][key] - thermo_force = np.log(ccw_rates / cw_rates) - return thermo_force - elif output_strings == True: - if not isinstance( - G.edges[cycle_edges[0][0], cycle_edges[0][1], cycle_edges[0][2]][key], str - ): - raise TypeError( - "To enter variable values set parameter output_strings=False." - ) + if output_strings: ccw_rates = [] cw_rates = [] for edge in cycle_edges: ccw_rates.append(G.edges[edge[0], edge[1], edge[2]][key]) cw_rates.append(G.edges[edge[1], edge[0], edge[2]][key]) - thermo_force_str = ( + thermo_force = ( "ln(" + "*".join(ccw_rates) + ") - ln(" + "*".join(cw_rates) + ")" ) - parsed_thermo_force_str = logcombine(parse_expr(thermo_force_str), force=True) - return parsed_thermo_force_str + thermo_force = logcombine(parse_expr(thermo_force), force=True) + else: + ccw_rates = 1 + cw_rates = 1 + for edge in cycle_edges: + ccw_rates *= G.edges[edge[0], edge[1], edge[2]][key] + cw_rates *= G.edges[edge[1], edge[0], edge[2]][key] + thermo_force = np.log(ccw_rates / cw_rates) + return thermo_force def calc_state_probs(G, key="name", output_strings=True, dir_edges=None): @@ -561,27 +528,24 @@ def calc_state_probs(G, key="name", output_strings=True, dir_edges=None): # get the number of nodes/states n_states = G.number_of_nodes() # get the number of directional diagrams - n_dirpars = dir_edges.shape[0] + n_dir_diagrams = dir_edges.shape[0] # get the number of partial diagrams - n_partials = int(n_dirpars / n_states) + n_partials = int(n_dir_diagrams / n_states) if output_strings: - dirpar_rate_products = np.empty(shape=(n_dirpars,), dtype=object) + rate_products = np.empty(shape=(n_dir_diagrams,), dtype=object) for i, edge_list in enumerate(dir_edges): - rate_product_vals = [] - for edge in edge_list: - rate_product_vals.append(G.edges[edge][key]) - dirpar_rate_products[i] = "*".join(rate_product_vals) - + rates = [G.edges[edge][key] for edge in edge_list] + rate_products[i] = "*".join(rates) + rate_products = rate_products.reshape(n_states, n_partials) state_mults = np.empty(shape=(n_states,), dtype=object) - dirpar_rate_products = dirpar_rate_products.reshape(n_states, n_partials) - for i, arr in enumerate(dirpar_rate_products): + for i, arr in enumerate(rate_products): state_mults[i] = "+".join(arr) state_probs = expressions.construct_sympy_prob_funcs(state_mult_funcs=state_mults) else: # retrieve the rate matrix from G Kij = graph_utils.retrieve_rate_matrix(G) # create array of ones for storing rate products - dirpar_rate_products = np.ones(n_dirpars, dtype=float) + rate_products = np.ones(n_dir_diagrams, dtype=float) # iterate over the directional diagrams for i, edge_list in enumerate(dir_edges): # for each edge list, retrieve an array of the ith and jth indices, @@ -589,15 +553,13 @@ def calc_state_probs(G, key="name", output_strings=True, dir_edges=None): # calculate the product of those values Ki = edge_list[:, 0] Kj = edge_list[:, 1] - dirpar_rate_products[i] = np.prod(Kij[Ki, Kj]) - - state_mults = dirpar_rate_products.reshape(n_states, n_partials).sum(axis=1) - state_probs = state_mults / math.fsum(dirpar_rate_products) + rate_products[i] = np.prod(Kij[Ki, Kj]) + state_mults = rate_products.reshape(n_states, n_partials).sum(axis=1) + state_probs = state_mults / math.fsum(rate_products) if any(elem < 0 for elem in state_probs): - raise ValueError( - "Calculated negative state probabilities, overflow or underflow occurred." - ) - + msg = """Calculated negative state probabilities, + overflow or underflow occurred.""" + raise ValueError(msg) return state_probs From 4cbe049f5b7e958856ae92cff264a4f832a84b01 Mon Sep 17 00:00:00 2001 From: "Nikolaus C. Awtrey" Date: Fri, 16 Aug 2024 08:09:43 -0700 Subject: [PATCH 2/3] Misc docstring changes --- kda/calculations.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kda/calculations.py b/kda/calculations.py index 17fe74c..af32abe 100644 --- a/kda/calculations.py +++ b/kda/calculations.py @@ -130,7 +130,7 @@ def calc_sigma(G, dirpar_edges, key="name", output_strings=True): ------- sigma : float or str Sum of rate products of all directional diagrams for the input - diagram ``G`` as a float (``output_strings=False``) for a string + diagram ``G`` as a float (``output_strings=False``) or a string (``output_strings=True``). Notes @@ -487,9 +487,9 @@ def calc_state_probs(G, key="name", output_strings=True, dir_edges=None): state_probs : ndarray or list of ``SymPy`` expressions Array of state probabilities for ``N`` states of the form ``[p1, p2, p3, ..., pN]`` - (for ``output_strings=False``), or a + (``output_strings=False``), or a list of symbolic state probability expressions - in the same order (for ``output_strings=True``). + in the same order (``output_strings=True``). Notes ----- @@ -675,9 +675,9 @@ def calc_state_probs_from_diags(G, dirpar_edges, key="name", output_strings=True state_probs : ndarray or list of ``SymPy`` expressions Array of state probabilities for ``N`` states of the form ``[p1, p2, p3, ..., pN]`` - (for ``output_strings=False``), or a + (``output_strings=False``), or a list of symbolic state probability expressions - in the same order (for ``output_strings=True``). + in the same order (``output_strings=True``). """ msg = """`kda.calculations.calc_state_probs_from_diags` will be deprecated. From d8afca1968e088068c10db09f6b1645b8a34aa69 Mon Sep 17 00:00:00 2001 From: "Nikolaus C. Awtrey" Date: Fri, 16 Aug 2024 08:22:01 -0700 Subject: [PATCH 3/3] Fix docstring --- kda/calculations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kda/calculations.py b/kda/calculations.py index af32abe..8a474bc 100644 --- a/kda/calculations.py +++ b/kda/calculations.py @@ -402,7 +402,7 @@ def calc_thermo_force(G, cycle, order, key="name", output_strings=True): thermo_force : float or ``SymPy`` expression The thermodynamic force for the input ``cycle`` returned as a float (``output_strings=False``) or a ``SymPy`` expression - (``output_strings=True`). The returned value is unitless and + (``output_strings=True``). The returned value is unitless and should be multiplied by ``kT`` to calculate the actual thermodynamic force.