#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Módulo principal de pydatajson
Contiene la clase DataJson que reúne los métodos públicos para trabajar con
archivos data.json.
"""
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import with_statement
import sys
import io
import platform
import os.path
import warnings
import re
import json
from collections import OrderedDict
import jsonschema
from . import readers
from . import helpers
from . import writers
ABSOLUTE_PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
[documentos]class DataJson(object):
"""Métodos para trabajar con archivos data.json."""
# Variables por default
ABSOLUTE_SCHEMA_DIR = os.path.join(ABSOLUTE_PROJECT_DIR, "schemas")
DEFAULT_CATALOG_SCHEMA_FILENAME = "catalog.json"
def __init__(self,
schema_filename=DEFAULT_CATALOG_SCHEMA_FILENAME,
schema_dir=ABSOLUTE_SCHEMA_DIR):
"""Crea un manipulador de `data.json`s.
Salvo que se indique lo contrario, el validador de esquemas asociado
es el definido por default en las constantes de clase.
Args:
schema_filename (str): Nombre del archivo que contiene el esquema
validador.
schema_dir (str): Directorio (absoluto) donde se encuentra el
esquema validador (y sus referencias, de tenerlas).
"""
self.validator = self._create_validator(schema_filename, schema_dir)
@classmethod
def _create_validator(cls, schema_filename, schema_dir):
"""Crea el validador necesario para inicializar un objeto DataJson.
Para poder resolver referencias inter-esquemas, un Validador requiere
que se especifique un RefResolver (Resolvedor de Referencias) con el
directorio base (absoluto) y el archivo desde el que se referencia el
directorio.
Para poder validar formatos, un Validador requiere que se provea
explícitamente un FormatChecker. Actualmente se usa el default de la
librería, jsonschema.FormatChecker().
Args:
schema_filename (str): Nombre del archivo que contiene el esquema
validador "maestro".
schema_dir (str): Directorio (absoluto) donde se encuentra el
esquema validador maestro y sus referencias, de tenerlas.
Returns:
Draft4Validator: Un validador de JSONSchema Draft #4. El validador
se crea con un RefResolver que resuelve referencias de
`schema_filename` dentro de `schema_dir`.
"""
schema_path = os.path.join(schema_dir, schema_filename)
schema = readers.read_json(schema_path)
# Según https://github.com/Julian/jsonschema/issues/98
# Permite resolver referencias locales a otros esquemas.
if platform.system() == 'Windows':
base_uri = "file:///" + schema_path.replace("\\", "/")
else:
base_uri = "file://" + schema_path
resolver = jsonschema.RefResolver(base_uri=base_uri, referrer=schema)
format_checker = jsonschema.FormatChecker()
validator = jsonschema.Draft4Validator(
schema=schema, resolver=resolver, format_checker=format_checker)
return validator
[documentos] def is_valid_catalog(self, catalog):
"""Valida que un archivo `data.json` cumpla con el schema definido.
Chequea que el data.json tiene todos los campos obligatorios y que
tanto los campos obligatorios como los opcionales siguen la estructura
definida en el schema.
Args:
catalog (str o dict): Catálogo (dict, JSON o XLSX) a ser validado.
Returns:
bool: True si el data.json cumple con el schema, sino False.
"""
catalog = readers.read_catalog(catalog)
res = self.validator.is_valid(catalog)
return res
@staticmethod
def _update_validation_response(error, response):
"""Actualiza la respuesta por default acorde a un error de
validación."""
new_response = response.copy()
# El status del catálogo entero será ERROR
new_response["status"] = "ERROR"
# Adapto la información del ValidationError recibido a los fines
# del validador de DataJsons
error_info = {
# Error Code 1 para "campo obligatorio faltante"
# Error Code 2 para "error en tipo o formato de campo"
"error_code": 1 if error.validator == "required" else 2,
"message": error.message,
"validator": error.validator,
"validator_value": error.validator_value,
"path": list(error.path),
# La instancia validada es irrelevante si el error es de tipo 1
"instance": (None if error.validator == "required" else
error.instance)
}
# Identifico a qué nivel de jerarquía sucedió el error.
if len(error.path) >= 2 and error.path[0] == "dataset":
# El error está a nivel de un dataset particular o inferior
position = new_response["error"]["dataset"][error.path[1]]
else:
# El error está a nivel de catálogo
position = new_response["error"]["catalog"]
position["status"] = "ERROR"
position["errors"].append(error_info)
return new_response
[documentos] def validate_catalog(self, catalog):
"""Analiza un data.json registrando los errores que encuentra.
Chequea que el data.json tiene todos los campos obligatorios y que
tanto los campos obligatorios como los opcionales siguen la estructura
definida en el schema.
Args:
catalog (str o dict): Catálogo (dict, JSON o XLSX) a ser validado.
Returns:
dict: Diccionario resumen de los errores encontrados::
{
"status": "OK", # resultado de la validación global
"error": {
"catalog": {
"status": "OK",
"errors": []
"title": "Título Catalog"},
"dataset": [
{
"status": "OK",
"errors": [],
"title": "Titulo Dataset 1"
},
{
"status": "ERROR",
"errors": [error1_info, error2_info, ...],
"title": "Titulo Dataset 2"
}
]
}
}
Donde errorN_info es un dict con la información del N-ésimo
error encontrado, con las siguientes claves: "path", "instance",
"message", "validator", "validator_value", "error_code".
"""
catalog = readers.read_catalog(catalog)
# La respuesta por default se devuelve si no hay errores
default_response = {
"status": "OK",
"error": {
"catalog": {
"status": "OK",
"title": catalog.get("title"),
"errors": []
},
# "dataset" contiene lista de rtas default si el catálogo
# contiene la clave "dataset" y además su valor es una lista.
# En caso contrario "dataset" es None.
"dataset": [
{
"status": "OK",
"title": dataset.get("title"),
"errors": []
} for dataset in catalog["dataset"]
] if ("dataset" in catalog and
isinstance(catalog["dataset"], list)) else None
}
}
# Genero la lista de errores en la instancia a validar
errors_iterator = self.validator.iter_errors(catalog)
final_response = default_response.copy()
for error in errors_iterator:
final_response = self._update_validation_response(error,
final_response)
return final_response
@classmethod
def _dataset_report_helper(cls, dataset):
"""Toma un dict con la metadata de un dataset, y devuelve un dict coni
los valores que dataset_report() usa para reportar sobre él.
Args:
dataset (dict): Diccionario con la metadata de un dataset.
Returns:
dict: Diccionario con los campos a nivel dataset que requiere
dataset_report().
"""
publisher_name = helpers.traverse_dict(dataset, ["publisher", "name"])
super_themes = None
if isinstance(dataset.get("superTheme"), list):
strings = [s for s in dataset.get("superTheme")
if isinstance(s, (str, unicode))]
super_themes = ", ".join(strings)
themes = None
if isinstance(dataset.get("theme"), list):
strings = [s for s in dataset.get("theme")
if isinstance(s, (str, unicode))]
themes = ", ".join(strings)
def _stringify_distribution(distribution):
title = distribution.get("title")
url = distribution.get("downloadURL")
return "\"{}\": {}".format(title, url)
distributions = [d for d in dataset["distribution"]
if isinstance(d, dict)]
distributions_list = None
if isinstance(distributions, list):
distributions_strings = [
_stringify_distribution(d) for d in distributions
]
distributions_list = "\n".join(distributions_strings)
fields = OrderedDict()
fields["dataset_title"] = dataset.get("title")
fields["dataset_accrualPeriodicity"] = dataset.get(
"accrualPeriodicity")
fields["dataset_description"] = dataset.get("description")
fields["dataset_publisher_name"] = publisher_name
fields["dataset_superTheme"] = super_themes
fields["dataset_theme"] = themes
fields["dataset_landingPage"] = dataset.get("landingPage")
fields["distributions_list"] = distributions_list
return fields
@staticmethod
def _catalog_report_helper(catalog, catalog_validation, url):
"""Toma un dict con la metadata de un catálogo, y devuelve un dict con
los valores que catalog_report() usa para reportar sobre él.
Args:
catalog (dict): Diccionario con la metadata de un catálogo.
validation (dict): Resultado, únicamente a nivel catálogo, de la
validación completa de `catalog`.
Returns:
dict: Diccionario con los campos a nivel catálogo que requiere
catalog_report().
"""
fields = OrderedDict()
fields["catalog_metadata_url"] = url
fields["catalog_title"] = catalog.get("title")
fields["catalog_description"] = catalog.get("description")
fields["valid_catalog_metadata"] = (
1 if catalog_validation["status"] == "OK" else 0)
return fields
def _dataset_report(self, dataset, dataset_validation, dataset_index,
catalog_fields, harvest='none', report=None):
""" Genera una línea del `catalog_report`, correspondiente a un dataset
de los que conforman el catálogo analizado."""
dataset_report = OrderedDict(catalog_fields)
dataset_report["valid_dataset_metadata"] = (
1 if dataset_validation["status"] == "OK" else 0)
dataset_report["dataset_index"] = dataset_index
if harvest == 'all':
dataset_report["harvest"] = 1
elif harvest == 'none':
dataset_report["harvest"] = 0
elif harvest == 'valid':
dataset_report["harvest"] = (
int(dataset_report["valid_dataset_metadata"]))
elif harvest == 'report':
if not report:
raise ValueError("""
Usted eligio 'report' como criterio de harvest, pero no proveyo un valor para
el argumento 'report'. Por favor, intentelo nuevamente.""")
datasets_to_harvest = self._extract_datasets_to_harvest(report)
dataset_report["harvest"] = (
1 if (dataset_report["catalog_metadata_url"],
dataset.get("title")) in datasets_to_harvest
else 0)
else:
raise ValueError("""
{} no es un criterio de harvest reconocido. Pruebe con 'all', 'none', 'valid' o
'report'.""".format(harvest))
dataset_report.update(self._dataset_report_helper(dataset))
return dataset_report.copy()
[documentos] def catalog_report(self, catalog, harvest='none', report=None):
"""Genera un reporte sobre los datasets de un único catálogo.
Args:
catalog (dict, str o unicode): Representación externa (path/URL) o
interna (dict) de un catálogo.
harvest (str): Criterio de cosecha ('all', 'none',
'valid' o 'report').
Returns:
list: Lista de diccionarios, con un elemento por cada dataset
presente en `catalog`.
"""
url = catalog if isinstance(catalog, (str, unicode)) else None
catalog = readers.read_catalog(catalog)
validation = self.validate_catalog(catalog)
catalog_validation = validation["error"]["catalog"]
datasets_validations = validation["error"]["dataset"]
catalog_fields = self._catalog_report_helper(
catalog, catalog_validation, url)
if "dataset" in catalog and isinstance(catalog["dataset"], list):
datasets = [d if isinstance(d, dict) else {} for d in
catalog["dataset"]]
else:
datasets = []
catalog_report = [
self._dataset_report(
dataset, datasets_validations[index], index,
catalog_fields, harvest, report=report
) for index, dataset in enumerate(datasets)
]
return catalog_report
[documentos] def generate_datasets_report(self, catalogs, harvest='valid',
report=None, export_path=None):
"""Genera un reporte sobre las condiciones de la metadata de los
datasets contenidos en uno o varios catálogos.
Args:
catalogs (str, dict o list): Uno (str o dict) o varios (list de
strs y/o dicts) catálogos.
harvest (str): Criterio a utilizar para determinar el valor del
campo "harvest" en el reporte generado ('all', 'none',
'valid' o 'report').
report (str): Path a un reporte/config especificando qué
datasets marcar con harvest=1 (sólo si harvest=='report').
export_path (str): Path donde exportar el reporte generado (en
formato XLSX o CSV). Si se especifica, el método no devolverá
nada.
Returns:
list: Contiene tantos dicts como datasets estén presentes en
`catalogs`, con la data del reporte generado.
"""
assert isinstance(catalogs, (str, unicode, dict, list))
# Si se pasa un único catálogo, genero una lista que lo contenga
if isinstance(catalogs, (str, unicode, dict)):
catalogs = [catalogs]
catalogs_reports = [self.catalog_report(catalog, harvest, report)
for catalog in catalogs]
full_report = []
for report in catalogs_reports:
full_report.extend(report)
if export_path:
writers.write_table(table=full_report, path=export_path)
else:
return full_report
[documentos] def generate_harvester_config(self, catalogs=None, harvest='valid',
report=None, frequency='R/P1D',
export_path=None):
"""Genera un archivo de configuración del harvester a partir de un
reporte, o de un conjunto de catálogos y un criterio de cosecha
(`harvest`).
Args:
catalogs (str, dict o list): Uno (str o dict) o varios (list de
strs y/o dicts) catálogos.
harvest (str): Criterio para determinar qué datasets incluir en el
archivo de configuración generado ('all', 'none',
'valid' o 'report').
report (list o str): Tabla de reporte generada por
generate_datasets_report() como lista de diccionarios o archivo
en formato XLSX o CSV. Sólo se usa cuando `harvest=='report'`,
en cuyo caso `catalogs` se ignora.
frequency (str): Frecuencia de búsqueda de actualizaciones en los
datasets a cosechar. Todo intervalo de frecuencia válido según
ISO 8601 es válido. Es 'R/P1D' (diariamiente) por omisión, y
si se pasa`None`, se conservará el valor de original de cada
dataset, `dataset["accrualPeriodicity"]`.
export_path (str): Path donde exportar el reporte generado (en
formato XLSX o CSV). Si se especifica, el método no devolverá
nada.
Returns:
list of dicts: Un diccionario con variables de configuración
por cada dataset a cosechar.
"""
# Si se pasa un único catálogo, genero una lista que lo contenga
if isinstance(catalogs, (str, unicode, dict)):
catalogs = [catalogs]
if harvest == 'report':
if not report:
raise ValueError("""
Usted eligio 'report' como criterio de harvest, pero no proveyo un valor para
el argumento 'report'. Por favor, intentelo nuevamente.""")
datasets_report = readers.read_table(report)
elif harvest in ['valid', 'none', 'all']:
# catalogs no puede faltar para estos criterios
assert isinstance(catalogs, (str, unicode, dict, list))
datasets_report = self.generate_datasets_report(catalogs, harvest)
else:
raise ValueError("""
{} no es un criterio de harvest reconocido. Pruebe con 'all', 'none', 'valid' o
'report'.""".format(harvest))
config_keys = ["catalog_metadata_url", "dataset_title",
"dataset_accrualPeriodicity"]
harvester_config = [
OrderedDict(
# Retengo únicamente los campos que necesita el harvester
[(k, v) for (k, v) in dataset.items() if k in config_keys]
)
# Para aquellost datasets marcados con 'harvest'==1
for dataset in datasets_report if bool(int(dataset["harvest"]))
]
if frequency:
valid_patterns = [
"^R/P\\d+(\\.\\d+)?[Y|M|W|D]$",
"^R/PT\\d+(\\.\\d+)?[H|M|S]$"
]
if any([re.match(pat, frequency) for pat in valid_patterns]):
for dataset in harvester_config:
dataset["dataset_accrualPeriodicity"] = frequency
else:
warnings.warn("""
{} no es una frecuencia de cosecha valida. Se conservara la frecuencia de
actualizacion original de cada dataset.""".format(frequency))
if export_path:
writers.write_table(harvester_config, export_path)
else:
return harvester_config
[documentos] def generate_harvestable_catalogs(self, catalogs, harvest='all',
report=None, export_path=None):
"""Filtra los catálogos provistos según el criterio determinado en
`harvest`.
Args:
catalogs (str, dict o list): Uno (str o dict) o varios (list de
strs y/o dicts) catálogos.
harvest (str): Criterio para determinar qué datasets conservar de
cada catálogo ('all', 'none', 'valid' o 'report').
report (list o str): Tabla de reporte generada por
generate_datasets_report() como lista de diccionarios o archivo
en formato XLSX o CSV. Sólo se usa cuando `harvest=='report'`.
export_path (str): Path a un archivo JSON o directorio donde
exportar los catálogos filtrados. Si termina en ".json" se
exportará la lista de catálogos a un único archivo. Si es un
directorio, se guardará en él un JSON por catálogo. Si se
especifica `export_path`, el método no devolverá nada.
Returns:
list of dicts: Lista de catálogos.
"""
assert isinstance(catalogs, (str, unicode, dict, list))
# Si se pasa un único catálogo, genero una lista que lo contenga
if isinstance(catalogs, (str, unicode, dict)):
catalogs = [catalogs]
harvestable_catalogs = [readers.read_catalog(c) for c in catalogs]
catalogs_urls = [catalog if isinstance(catalog, (str, unicode))
else None for catalog in catalogs]
# aplica los criterios de cosecha
if harvest == 'all':
pass
elif harvest == 'none':
for catalog in harvestable_catalogs:
catalog["dataset"] = []
elif harvest == 'valid':
report = self.generate_datasets_report(catalogs, harvest)
return self.generate_harvestable_catalogs(
catalogs=catalogs, harvest='report', report=report,
export_path=export_path)
elif harvest == 'report':
if not report:
raise ValueError("""
Usted eligio 'report' como criterio de harvest, pero no proveyo un valor para
el argumento 'report'. Por favor, intentelo nuevamente.""")
datasets_to_harvest = self._extract_datasets_to_harvest(report)
for idx_cat, catalog in enumerate(harvestable_catalogs):
catalog_url = catalogs_urls[idx_cat]
if ("dataset" in catalog and
isinstance(catalog["dataset"], list)):
catalog["dataset"] = [
dataset for dataset in catalog["dataset"]
if (catalog_url, dataset.get("title")) in
datasets_to_harvest
]
else:
catalog["dataset"] = []
else:
raise ValueError("""
{} no es un criterio de harvest reconocido. Pruebe con 'all', 'none', 'valid' o
'report'.""".format(harvest))
# devuelve los catálogos harvesteables
if export_path and os.path.isdir(export_path):
# Creo un JSON por catálogo
for idx, catalog in enumerate(harvestable_catalogs):
filename = os.path.join(export_path, "catalog_{}".format(idx))
writers.write_json(catalog, filename)
elif export_path:
# Creo un único JSON con todos los catálogos
writers.write_json(harvestable_catalogs, export_path)
else:
return harvestable_catalogs
[documentos] def generate_datasets_summary(self, catalog, export_path=None):
"""Genera un informe sobre los datasets presentes en un catálogo,
indicando para cada uno:
- Índice en la lista catalog["dataset"]
- Título
- Identificador
- Cantidad de distribuciones
- Estado de sus metadatos ["OK"|"ERROR"]
Es utilizada por la rutina diaria de `libreria-catalogos` para reportar
sobre los datasets de los catálogos mantenidos.
Args:
catalog (str o dict): Path a un catálogo en cualquier formato,
JSON, XLSX, o diccionario de python.
export_path (str): Path donde exportar el informe generado (en
formato XLSX o CSV). Si se especifica, el método no devolverá
nada.
Returns:
list: Contiene tantos dicts como datasets estén presentes en
`catalogs`, con los datos antes mencionados.
"""
catalog = readers.read_catalog(catalog)
# Trato de leer todos los datasets bien formados de la lista
# catalog["dataset"], si existe.
if "dataset" in catalog and isinstance(catalog["dataset"], list):
datasets = [d if isinstance(d, dict) else {} for d in
catalog["dataset"]]
else:
# Si no, considero que no hay datasets presentes
datasets = []
validation = self.validate_catalog(catalog)["error"]["dataset"]
def info_dataset(index, dataset):
"""Recolecta información básica de un dataset."""
info = OrderedDict()
info["indice"] = index
info["titulo"] = dataset.get("title")
info["identificador"] = dataset.get("identifier")
info["estado_metadatos"] = validation[index]["status"]
info["cant_errores"] = len(validation[index]["errors"])
info["cant_distribuciones"] = len(dataset["distribution"])
return info
summary = [info_dataset(i, ds) for i, ds in enumerate(datasets)]
if export_path:
writers.write_table(summary, export_path)
else:
return summary
[documentos] def generate_catalog_readme(self, catalog, export_path=None):
"""Genera una descripción textual en formato Markdown sobre los
metadatos generales de un catálogo (título, editor, fecha de
publicación, et cetera), junto con:
- estado de los metadatos a nivel catálogo,
- estado global de los metadatos, y
- cantidad de datasets y distribuciones incluidas
Es utilizada por la rutina diaria de `libreria-catalogos` para generar
un README con información básica sobre los catálogos mantenidos.
Args:
catalog (str o dict): Path a un catálogo en cualquier formato,
JSON, XLSX, o diccionario de python.
export_path (str): Path donde exportar el texto generado (en
formato Markdown). Si se especifica, el método no devolverá
nada.
Returns:
str: Texto de la descripción generada.
"""
catalog = readers.read_catalog(catalog)
validation = self.validate_catalog(catalog)
readme_template = """
# Catálogo: {title}
## Información General
- **Autor**: {publisher_name}
- **Correo Electrónico**: {publisher_mbox}
- **Nombre del catálogo**: {title}
- **Descripción**:
> {description}
## Estado de los metadatos y cantidad de recursos
Estado metadatos globales | Estado metadatos catálogo | # de Datasets | # de Distribuciones
--------------------------|---------------------------|---------------|--------------------
{global_status} | {catalog_status} | {no_of_datasets} | {no_of_distributions}
## Datasets incluidos
Por favor, consulte el informe [`datasets.csv`](datasets.csv).
"""
content = {
"title": catalog.get("title"),
"publisher_name": helpers.traverse_dict(
catalog, ["publisher", "name"]),
"publisher_mbox": helpers.traverse_dict(
catalog, ["publisher", "mbox"]),
"description": catalog.get("description"),
"global_status": validation["status"],
"catalog_status": validation["error"]["catalog"]["status"],
"no_of_datasets": len(catalog["dataset"]),
"no_of_distributions": sum([len(dataset["distribution"]) for
dataset in catalog["dataset"]])
}
catalog_readme = readme_template.format(**content)
if export_path:
with io.open(export_path, 'w', encoding='utf-8') as target:
target.write(catalog_readme)
else:
return catalog_readme
@classmethod
def _extract_datasets_to_harvest(cls, report):
"""Extrae de un reporte los datos necesarios para reconocer qué
datasets marcar para cosecha en cualquier generador.
Args:
report (str o list): Reporte (lista de dicts) o path a uno.
Returns:
list: Lista de tuplas con los títulos de catálogo y dataset de cada
reporte extraído.
"""
assert isinstance(report, (str, unicode, list))
# Si `report` es una lista de tuplas con longitud 2, asumimos que es un
# reporte procesado para extraer los datasets a harvestear. Se devuelve
# intacta.
if (isinstance(report, list) and all([isinstance(x, tuple) and
len(x) == 2 for x in report])):
return report
table = readers.read_table(report)
table_keys = table[0].keys()
expected_keys = ["catalog_metadata_url", "dataset_title",
"dataset_accrualPeriodicity"]
# Verifico la presencia de las claves básicas de un config de harvester
for key in expected_keys:
if key not in table_keys:
raise KeyError("""
El reporte no contiene la clave obligatoria {}. Pruebe con otro archivo.
""".format(key))
if "harvest" in table_keys:
# El archivo es un reporte de datasets.
datasets_to_harvest = [
(row["catalog_metadata_url"], row["dataset_title"]) for row in
table if int(row["harvest"])]
else:
# El archivo es un config de harvester.
datasets_to_harvest = [
(row["catalog_metadata_url"], row["dataset_title"]) for row in
table]
return datasets_to_harvest
[documentos]def main():
"""Permite ejecutar el módulo por línea de comandos.
Valida un path o url a un archivo data.json devolviendo True/False si es
válido y luego el resultado completo.
Example:
python pydatajson.py http://181.209.63.71/data.json
python pydatajson.py ~/github/pydatajson/tests/samples/full_data.json
"""
try:
datajson_file = sys.argv[1]
dj_instance = DataJson()
bool_res = dj_instance.is_valid_catalog(datajson_file)
full_res = dj_instance.validate_catalog(datajson_file)
pretty_full_res = json.dumps(
full_res, indent=4, separators=(",", ": "))
print(bool_res)
print(pretty_full_res)
except IndexError as errmsg:
format_str = """
{}: pydatajson.py fue ejecutado como script sin proveer un argumento
"""
print(format_str.format(errmsg))
if __name__ == '__main__':
main()