#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Métodos auxiliares"""
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import with_statement
import datetime
import os
import json
ABSOLUTE_PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
ABSOLUTE_SCHEMA_DIR = os.path.join(ABSOLUTE_PROJECT_DIR, "schemas")
[documentos]def ensure_dir_exists(directory):
"""Se asegura de que un directorio exista."""
if not os.path.exists(directory):
os.makedirs(directory)
[documentos]def traverse_dict(dicc, keys, default_value=None):
"""Recorre un diccionario siguiendo una lista de claves, y devuelve
default_value en caso de que alguna de ellas no exista.
Args:
dicc (dict): Diccionario a ser recorrido.
keys (list): Lista de claves a ser recorrida. Puede contener
índices de listas y claves de diccionarios mezcladas.
default_value: Valor devuelto en caso de que `dicc` no se pueda
recorrer siguiendo secuencialmente la lista de `keys` hasta
el final.
Returns:
object: El valor obtenido siguiendo la lista de `keys` dentro de
`dicc`.
"""
for key in keys:
if isinstance(dicc, dict) and key in dicc:
dicc = dicc[key]
elif (isinstance(dicc, list)
and isinstance(key, int)
and key < len(dicc)):
dicc = dicc[key]
else:
return default_value
return dicc
[documentos]def is_list_of_matching_dicts(list_of_dicts, expected_keys=None):
"""Comprueba que una lista esté compuesta únicamente por diccionarios,
que comparten exactamente las mismas claves.
Args:
list_of_dicts (list): Lista de diccionarios a comparar.
expected_keys (set): Conjunto de las claves que cada diccionario debe
tener. Si no se incluye, se asume que son las claves del primer
diccionario de la lista.
Returns:
bool: True si todos los diccionarios comparten sus claves.
"""
if isinstance(list_of_dicts, list) and len(list_of_dicts) == 0:
return False
is_not_list_msg = """
{} no es una lista. Debe ingresar una lista""".format(list_of_dicts)
assert isinstance(list_of_dicts, list), is_not_list_msg
not_all_dicts_msg = """
No todos los elementos de {} son diccionarios. Ingrese una lista compuesta
solo por diccionarios.""".format(list_of_dicts)
assert all([isinstance(d, dict) for d in list_of_dicts]), not_all_dicts_msg
# Si no se pasan expected_keys, se las toma del primer diccionario
expected_keys = expected_keys or set(list_of_dicts[0].keys())
elements = [set(d.keys()) == expected_keys for d in list_of_dicts]
return all(elements)
[documentos]def parse_value(cell):
"""Extrae el valor de una celda de Excel como texto."""
value = cell.value
# stripea espacios en strings
if isinstance(value, (str, unicode)):
value = value.strip()
# convierte a texto ISO 8601 las fechas
if isinstance(value, (datetime.datetime)):
value = value.isoformat()
return value
[documentos]def sheet_to_table(worksheet):
"""Transforma una hoja de libro de Excel en una lista de diccionarios.
Args:
worksheet (Workbook.worksheet): Hoja de cálculo de un archivo XLSX
según los lee `openpyxl`
Returns:
list_of_dicts: Lista de diccionarios, con tantos elementos como
registros incluya la hoja, y con tantas claves por diccionario como
campos tenga la hoja.
"""
headers = []
value_rows = []
for row_i, row in enumerate(worksheet.iter_rows()):
# lee los headers y el tamaño máximo de la hoja en columnas en fila 1
if row_i == 0:
for header_cell in row:
if header_cell.value:
headers.append(parse_value(header_cell))
else:
break
continue
# limita la cantidad de celdas a considerar, por la cantidad de headers
row_cells = [parse_value(cell) for index, cell in enumerate(row)
if index < len(headers)]
# agrega las filas siguientes que tengan al menos un campo no nulo
if any(row_cells):
value_rows.append(row_cells)
# no se admiten filas vacías, eso determina el fin de la hoja
else:
break
# convierte las filas en diccionarios con los headers como keys
table = [
# Ignoro los campos con valores nulos (None)
{k: v for (k, v) in zip(headers, row) if v is not None}
for row in value_rows
]
return table
[documentos]def string_to_list(string, sep=","):
"""Transforma una string con elementos separados por `sep` en una lista."""
return [value.strip() for value in string.split(sep)]
[documentos]def add_dicts(one_dict, other_dict):
"""Suma clave a clave los dos diccionarios. Si algún valor es un
diccionario, llama recursivamente a la función. Ambos diccionarios deben
tener exactamente las mismas claves, y los valores asociados deben ser
sumables, o diccionarios.
Args:
one_dict (dict)
other_dict (dict)
Returns:
dict: resultado de la suma
"""
result = other_dict.copy()
for k, v in one_dict.items():
if v is None:
v = 0
if isinstance(v, dict):
result[k] = add_dicts(v, other_dict.get(k, {}))
else:
other_value = result.get(k, 0)
if other_value is None:
other_value = 0
result[k] = other_value + v
return result
[documentos]def parse_repeating_time_interval(date_str, to="days"):
if to == "days":
return parse_repeating_time_interval_to_days(date_str)
elif to == "string":
return parse_repeating_time_interval_to_str(date_str)
else:
raise NotImplementedError(
"Falta implementar parsing a {}".format(to)
)
[documentos]def parse_repeating_time_interval_to_days(date_str):
"""Parsea un string con un intervalo de tiempo con repetición especificado
por la norma ISO 8601 en una cantidad de días que representa ese intervalo.
Devuelve 0 en caso de que el intervalo sea inválido.
"""
intervals = {
'Y': 365,
'M': 30,
'W': 7,
'D': 1,
'H': 0,
'S': 0
}
if date_str.find('R/P') != 0: # Periodicity mal formada
return 0
date_str = date_str.strip('R/P')
days = 0
index = 0
for interval in intervals:
value_end = date_str.find(interval)
if value_end < 0:
continue
try:
days += int(float(date_str[index:value_end]) * intervals[interval])
# Valor de accrualPeriodicity inválido, se toma como 0
except ValueError:
continue
index = value_end
# Si el número de días es menor lo redondeamos a 1
return max(days, 1)
[documentos]def parse_repeating_time_interval_to_str(date_str):
"""Devuelve descripción humana de un intervalo de repetición.
TODO: Por ahora sólo interpreta una lista fija de intervalos. Debería poder
parsear cualquier caso.
"""
with open(os.path.join(ABSOLUTE_SCHEMA_DIR,
"accrualPeriodicity.json"), "r") as f:
freqs_map = {freq["id"]: freq["description"]
for freq in json.load(f)}
return freqs_map[date_str]
[documentos]def get_ws_case_insensitive(wb, title):
"""Devuelve una hoja en un workbook sin importar mayúsculas/minúsculas."""
return wb[find_ws_name(wb, title)]
[documentos]def find_ws_name(wb, name):
"""Busca una hoja en un workbook sin importar mayúsculas/minúsculas."""
if type(wb) == str or type(wb) == unicode:
wb = load_workbook(wb, read_only=True, data_only=True)
for sheetname in wb.sheetnames:
if sheetname.lower() == name.lower():
return sheetname
raise Exception("No existe la hoja {}".format(name))