#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Módulo 'indicators' de Pydatajson
Contiene los métodos para monitorear y generar indicadores de un catálogo o de
una red de catálogos.
"""
from __future__ import print_function, absolute_import, unicode_literals, with_statement
import json
import os
from datetime import datetime
from six import string_types
from . import helpers
from . import readers
from .reporting import generate_datasets_summary
CENTRAL_CATALOG = "http://datos.gob.ar/data.json"
ABSOLUTE_PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
CATALOG_FIELDS_PATH = os.path.join(ABSOLUTE_PROJECT_DIR, "fields")
[documentos]def generate_catalogs_indicators(catalogs, central_catalog=None,
validator=None):
"""Genera una lista de diccionarios con varios indicadores sobre
los catálogos provistos, tales como la cantidad de datasets válidos,
días desde su última fecha actualizada, entre otros.
Args:
catalogs (str o list): uno o más catalogos sobre los que se quiera
obtener indicadores
central_catalog (str): catálogo central sobre el cual comparar los
datasets subidos en la lista anterior. De no pasarse no se
generarán indicadores de federación de datasets.
Returns:
tuple: 2 elementos, el primero una lista de diccionarios con los
indicadores esperados, uno por catálogo pasado, y el segundo
un diccionario con indicadores a nivel global,
datos sobre la lista entera en general.
"""
central_catalog = central_catalog or CENTRAL_CATALOG
assert isinstance(catalogs, string_types + (dict, list))
# Si se pasa un único catálogo, genero una lista que lo contenga
if isinstance(catalogs, string_types + (dict,)):
catalogs = [catalogs]
# Leo todos los catálogos
catalogs = [readers.read_catalog(catalog) for catalog in catalogs]
indicators_list = []
# Cuenta la cantidad de campos usados/recomendados a nivel global
fields = {}
for catalog in catalogs:
catalog = readers.read_catalog(catalog)
fields_count, result = _generate_indicators(
catalog, validator=validator)
if central_catalog:
result.update(_federation_indicators(catalog,
central_catalog))
indicators_list.append(result)
# Sumo a la cuenta total de campos usados/totales
fields = helpers.add_dicts(fields_count, fields)
# Indicadores de la red entera
network_indicators = {
'catalogos_cant': len(catalogs)
}
# Sumo los indicadores individuales al total
indicators_total = indicators_list[0].copy()
for i in range(1, len(indicators_list)):
indicators_total = helpers.add_dicts(indicators_total,
indicators_list[i])
network_indicators.update(indicators_total)
# Genero los indicadores de la red entera,
_network_indicator_percentages(fields, network_indicators)
return indicators_list, network_indicators
def _generate_indicators(catalog, validator=None):
"""Genera los indicadores de un catálogo individual.
Args:
catalog (dict): diccionario de un data.json parseado
Returns:
dict: diccionario con los indicadores del catálogo provisto
"""
result = {}
# Obtengo summary para los indicadores del estado de los metadatos
result.update(_generate_status_indicators(catalog, validator=validator))
# Genero los indicadores relacionados con fechas, y los agrego
result.update(_generate_date_indicators(catalog))
# Agrego la cuenta de los formatos de las distribuciones
count = _count_distribution_formats(catalog)
result.update({
'distribuciones_formatos_cant': count
})
# Agrego porcentaje de campos recomendados/optativos usados
fields_count = _count_required_and_optional_fields(catalog)
recomendados_pct = 100 * float(fields_count['recomendado']) / \
fields_count['total_recomendado']
optativos_pct = 100 * float(fields_count['optativo']) / \
fields_count['total_optativo']
result.update({
'campos_recomendados_pct': round(recomendados_pct, 2),
'campos_optativos_pct': round(optativos_pct, 2)
})
return fields_count, result
def _federation_indicators(catalog, central_catalog):
"""Cuenta la cantidad de datasets incluídos tanto en la lista
'catalogs' como en el catálogo central, y genera indicadores a partir
de esa información.
Args:
catalog (dict): catálogo ya parseado
central_catalog (str o dict): ruta a catálogo central, o un dict
con el catálogo ya parseado
"""
central_catalog = readers.read_catalog(central_catalog)
federados = 0 # En ambos catálogos
no_federados = 0
datasets_federados_eliminados_cant = 0
datasets_federados = []
datasets_no_federados = []
datasets_federados_eliminados = []
# busca c/dataset del catálogo específico a ver si está en el central
for dataset in catalog.get('dataset', []):
found = False
for central_dataset in central_catalog.get('dataset', []):
if datasets_equal(dataset, central_dataset):
found = True
federados += 1
datasets_federados.append((dataset.get('title'),
dataset.get('landingPage')))
break
if not found:
no_federados += 1
datasets_no_federados.append((dataset.get('title'),
dataset.get('landingPage')))
# busca c/dataset del central cuyo publisher podría pertenecer al
# catálogo específico, a ver si está en el catálogo específico
# si no está, probablemente signifique que fue eliminado
filtered_central = _filter_by_likely_publisher(
central_catalog.get('dataset', []),
catalog.get('dataset', [])
)
for central_dataset in filtered_central:
found = False
for dataset in catalog.get('dataset', []):
if datasets_equal(dataset, central_dataset):
found = True
break
if not found:
datasets_federados_eliminados_cant += 1
datasets_federados_eliminados.append(
(central_dataset.get('title'),
central_dataset.get('landingPage'))
)
if federados or no_federados: # Evita división por 0
federados_pct = 100 * float(federados) / (federados + no_federados)
else:
federados_pct = 0
result = {
'datasets_federados_cant': federados,
'datasets_no_federados_cant': no_federados,
'datasets_federados_eliminados_cant': datasets_federados_eliminados_cant,
'datasets_federados_eliminados': datasets_federados_eliminados,
'datasets_no_federados': datasets_no_federados,
'datasets_federados': datasets_federados,
'datasets_federados_pct': round(federados_pct, 2)
}
return result
def _network_indicator_percentages(fields, network_indicators):
"""Encapsula el cálculo de indicadores de porcentaje (de errores,
de campos recomendados/optativos utilizados, de datasets actualizados)
sobre la red de nodos entera.
Args:
fields (dict): Diccionario con claves 'recomendado', 'optativo',
'total_recomendado', 'total_optativo', cada uno con valores
que representan la cantidad de c/u en la red de nodos entera.
network_indicators (dict): Diccionario de la red de nodos, con
las cantidades de datasets_meta_ok y datasets_(des)actualizados
calculados previamente. Se modificará este argumento con los
nuevos indicadores.
"""
# Los porcentuales no se pueden sumar, tienen que ser recalculados
# % de datasets cuya metadata está ok
meta_ok = network_indicators['datasets_meta_ok_cant']
meta_error = network_indicators['datasets_meta_error_cant']
total_pct = 0.0
if meta_ok or meta_error: # Evita división por cero
total_pct = 100 * float(meta_ok) / (meta_error + meta_ok)
network_indicators['datasets_meta_ok_pct'] = round(total_pct, 2)
# % de campos recomendados y optativos utilizados en todo el catálogo
if fields: # 'fields' puede estar vacío si ningún campo es válido
rec_pct = 100 * float(fields['recomendado']) / \
fields['total_recomendado']
opt_pct = 100 * float(fields['optativo']) / \
fields['total_optativo']
network_indicators.update({
'campos_recomendados_pct': round(rec_pct, 2),
'campos_optativos_pct': round(opt_pct, 2)
})
# % de datasets actualizados
act = network_indicators['datasets_actualizados_cant']
desact = network_indicators['datasets_desactualizados_cant']
updated_pct = 0
if act or desact: # Evita división por cero
updated_pct = 100 * act / float(act + desact)
network_indicators['datasets_actualizados_pct'] = round(updated_pct, 2)
# % de datasets federados
federados = network_indicators.get('datasets_federados_cant')
no_federados = network_indicators.get('datasets_no_federados_cant')
if federados or no_federados:
federados_pct = 100 * float(federados) / (federados + no_federados)
network_indicators['datasets_federados_pct'] = \
round(federados_pct, 2)
def _generate_status_indicators(catalog, validator=None):
"""Genera indicadores básicos sobre el estado de un catálogo
Args:
catalog (dict): diccionario de un data.json parseado
Returns:
dict: indicadores básicos sobre el catálogo, tal como la cantidad
de datasets, distribuciones y número de errores
"""
summary = generate_datasets_summary(catalog, validator=validator)
cant_ok = 0
cant_error = 0
cant_distribuciones = 0
datasets_total = len(summary)
for dataset in summary:
cant_distribuciones += dataset['cant_distribuciones']
if dataset['estado_metadatos'] == "OK":
cant_ok += 1
else: # == "ERROR"
cant_error += 1
datasets_ok_pct = 0
if datasets_total:
datasets_ok_pct = round(100 * float(cant_ok) / datasets_total, 2)
result = {
'datasets_cant': datasets_total,
'distribuciones_cant': cant_distribuciones,
'datasets_meta_ok_cant': cant_ok,
'datasets_meta_error_cant': cant_error,
'datasets_meta_ok_pct': datasets_ok_pct
}
return result
def _generate_date_indicators(catalog, tolerance=0.2):
"""Genera indicadores relacionados a las fechas de publicación
y actualización del catálogo pasado por parámetro. La evaluación de si
un catálogo se encuentra actualizado o no tiene un porcentaje de
tolerancia hasta que se lo considere como tal, dado por el parámetro
tolerance.
Args:
catalog (dict o str): path de un catálogo en formatos aceptados,
o un diccionario de python
tolerance (float): porcentaje de tolerancia hasta que se considere
un catálogo como desactualizado, por ejemplo un catálogo con
período de actualización de 10 días se lo considera como
desactualizado a partir de los 12 con una tolerancia del 20%.
También acepta valores negativos.
Returns:
dict: diccionario con indicadores
"""
result = {}
dias_ultima_actualizacion = _days_from_last_update(
catalog, "modified")
if not dias_ultima_actualizacion:
dias_ultima_actualizacion = _days_from_last_update(
catalog, "issued")
result['catalogo_ultima_actualizacion_dias'] = \
dias_ultima_actualizacion
actualizados = 0
desactualizados = 0
periodicity_amount = {}
for dataset in catalog.get('dataset', []):
# Parseo la fecha de publicación, y la frecuencia de actualización
periodicity = dataset.get('accrualPeriodicity')
if not periodicity:
continue
# Si la periodicity es eventual, se considera como actualizado
if periodicity == 'eventual':
actualizados += 1
prev_periodicity = periodicity_amount.get(periodicity, 0)
periodicity_amount[periodicity] = prev_periodicity + 1
continue
# dataset sin fecha de última actualización es desactualizado
if "modified" not in dataset:
desactualizados += 1
else:
# Calculo el período de días que puede pasar sin actualizarse
# Se parsea el período especificado por accrualPeriodicity,
# cumple con el estándar ISO 8601 para tiempos con repetición
date = helpers.parse_date_string(dataset['modified'])
days_diff = float((datetime.now() - date).days)
interval = helpers.parse_repeating_time_interval(
periodicity) * \
(1 + tolerance)
if days_diff < interval:
actualizados += 1
else:
desactualizados += 1
prev_periodicity = periodicity_amount.get(periodicity, 0)
periodicity_amount[periodicity] = prev_periodicity + 1
datasets_total = len(catalog.get('dataset', []))
actualizados_pct = 0
if datasets_total:
actualizados_pct = float(actualizados) / datasets_total
result.update({
'datasets_desactualizados_cant': desactualizados,
'datasets_actualizados_cant': actualizados,
'datasets_actualizados_pct': 100 * round(actualizados_pct, 2),
'datasets_frecuencia_cant': periodicity_amount
})
return result
def _count_distribution_formats(catalog):
"""Cuenta los formatos especificados por el campo 'format' de cada
distribución de un catálogo o de un dataset.
Args:
catalog (str o dict): path a un catálogo, o un dict de python que
Returns:
dict: diccionario con los formatos de las distribuciones
encontradas como claves, con la cantidad de ellos en sus valores.
"""
# Leo catálogo
catalog = readers.read_catalog(catalog)
catalog_formats = {}
for dataset in catalog.get('dataset', []):
dataset_formats = _count_distribution_formats_dataset(dataset)
for distribution_format in dataset_formats:
count_catalog = catalog_formats.get(distribution_format, 0)
count_dataset = dataset_formats.get(distribution_format, 0)
catalog_formats[
distribution_format] = count_catalog + count_dataset
return catalog_formats
def _count_distribution_formats_dataset(dataset):
formats = {}
for distribution in dataset['distribution']:
# 'format' es recomendado, no obligatorio. Puede no estar.
distribution_format = distribution.get('format', None)
if distribution_format:
# Si no está en el diccionario, devuelvo 0
count = formats.get(distribution_format, 0)
formats[distribution_format] = count + 1
return formats
def _days_from_last_update(catalog, date_field="modified"):
"""Calcula días desde la última actualización del catálogo.
Args:
catalog (dict): Un catálogo.
date_field (str): Campo de metadatos a utilizar para considerar
los días desde la última actualización del catálogo.
Returns:
int or None: Cantidad de días desde la última actualización del
catálogo o None, si no pudo ser calculada.
"""
# el "date_field" se busca primero a nivel catálogo, luego a nivel
# de cada dataset, y nos quedamos con el que sea más reciente
date_modified = catalog.get(date_field, None)
dias_ultima_actualizacion = None
# "date_field" a nivel de catálogo puede no ser obligatorio,
# si no está pasamos
if isinstance(date_modified, string_types):
date = helpers.parse_date_string(date_modified)
dias_ultima_actualizacion = (
datetime.now() - date).days if date else None
for dataset in catalog.get('dataset', []):
date = helpers.parse_date_string(dataset.get(date_field, ""))
days_diff = float((datetime.now() - date).days) if date else None
# Actualizo el indicador de días de actualización si corresponde
if not dias_ultima_actualizacion or \
(days_diff and days_diff < dias_ultima_actualizacion):
dias_ultima_actualizacion = days_diff
if dias_ultima_actualizacion:
return int(dias_ultima_actualizacion)
else:
return None
def _count_required_and_optional_fields(catalog):
"""Cuenta los campos obligatorios/recomendados/requeridos usados en
'catalog', junto con la cantidad máxima de dichos campos.
Args:
catalog (str o dict): path a un catálogo, o un dict de python que
contenga a un catálogo ya leído
Returns:
dict: diccionario con las claves 'recomendado', 'optativo',
'requerido', 'recomendado_total', 'optativo_total',
'requerido_total', con la cantidad como valores.
"""
catalog = readers.read_catalog(catalog)
# Archivo .json con el uso de cada campo. Lo cargamos a un dict
catalog_fields_path = os.path.join(CATALOG_FIELDS_PATH,
'fields.json')
with open(catalog_fields_path) as f:
catalog_fields = json.load(f)
# Armado recursivo del resultado
return _count_fields_recursive(catalog, catalog_fields)
def _count_fields_recursive(dataset, fields):
"""Cuenta la información de campos optativos/recomendados/requeridos
desde 'fields', y cuenta la ocurrencia de los mismos en 'dataset'.
Args:
dataset (dict): diccionario con claves a ser verificadas.
fields (dict): diccionario con los campos a verificar en dataset
como claves, y 'optativo', 'recomendado', o 'requerido' como
valores. Puede tener objetios anidados pero no arrays.
Returns:
dict: diccionario con las claves 'recomendado', 'optativo',
'requerido', 'recomendado_total', 'optativo_total',
'requerido_total', con la cantidad como valores.
"""
key_count = {
'recomendado': 0,
'optativo': 0,
'requerido': 0,
'total_optativo': 0,
'total_recomendado': 0,
'total_requerido': 0
}
for k, v in fields.items():
# Si la clave es un diccionario se implementa recursivamente el
# mismo algoritmo
if isinstance(v, dict):
# dataset[k] puede ser o un dict o una lista, ej 'dataset' es
# list, 'publisher' no. Si no es lista, lo metemos en una.
# Si no es ninguno de los dos, dataset[k] es inválido
# y se pasa un diccionario vacío para poder comparar
elements = dataset.get(k)
if not isinstance(elements, (list, dict)):
elements = [{}]
if isinstance(elements, dict):
elements = [dataset[k].copy()]
for element in elements:
# Llamada recursiva y suma del resultado al nuestro
result = _count_fields_recursive(element, v)
for key in result:
key_count[key] += result[key]
# Es un elemento normal (no iterable), se verifica si está en
# dataset o no. Se suma 1 siempre al total de su tipo
else:
# total_requerido, total_recomendado, o total_optativo
key_count['total_' + v] += 1
if k in dataset:
key_count[v] += 1
return key_count
[documentos]def datasets_equal(dataset, other, fields_dataset=None,
fields_distribution=None, return_diff=False):
"""Función de igualdad de dos datasets: se consideran iguales si
los valores de los campos 'title', 'publisher.name',
'accrualPeriodicity' e 'issued' son iguales en ambos.
Args:
dataset (dict): un dataset, generado por la lectura de un catálogo
other (dict): idem anterior
Returns:
bool: True si son iguales, False en caso contrario
"""
dataset_is_equal = True
dataset_diff = []
# Campos a comparar. Si es un campo anidado escribirlo como lista
if not fields_dataset:
fields_dataset = [
'title',
['publisher', 'name']
]
for field_dataset in fields_dataset:
if isinstance(field_dataset, list):
value = helpers.traverse_dict(dataset, field_dataset)
other_value = helpers.traverse_dict(other, field_dataset)
else:
value = dataset.get(field_dataset)
other_value = other.get(field_dataset)
if value != other_value:
dataset_diff.append({
"error_location": field_dataset,
"dataset_value": value,
"other_value": other_value
})
dataset_is_equal = False
if fields_distribution:
dataset_distributions = dataset.get("distribution")
other_distributions = other.get("distribution")
if len(dataset_distributions) != len(other_distributions):
print("{} distribuciones en origen y {} en destino".format(
len(dataset_distributions), len(other_distributions)))
dataset_is_equal = False
distributions_equal = True
for dataset_distribution, other_distribution in zip(
dataset_distributions, other_distributions):
for field_distribution in fields_distribution:
if isinstance(field_distribution, list):
value = helpers.traverse_dict(
dataset_distribution, field_distribution)
other_value = helpers.traverse_dict(
other_distribution, field_distribution)
else:
value = dataset_distribution.get(field_distribution)
other_value = other_distribution.get(field_distribution)
if value != other_value:
dataset_diff.append({
"error_location": "{} ({})".format(
field_distribution,
dataset_distribution.get("title")
),
"dataset_value": value,
"other_value": other_value
})
distributions_equal = False
if not distributions_equal:
dataset_is_equal = False
if return_diff:
return dataset_diff
else:
return dataset_is_equal
def _filter_by_likely_publisher(central_datasets, catalog_datasets):
publisher_names = [
catalog_dataset["publisher"]["name"]
for catalog_dataset in catalog_datasets
if "name" in catalog_dataset["publisher"]
]
filtered_central_datasets = []
for central_dataset in central_datasets:
if "name" in central_dataset["publisher"] and \
central_dataset["publisher"]["name"] in publisher_names:
filtered_central_datasets.append(central_dataset)
return filtered_central_datasets