Código fuente para pydatajson.core

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Módulo principal de pydatajson

Contiene la clase DataJson que reúne los métodos públicos para trabajar con
archivos data.json.
"""

from __future__ import unicode_literals
from __future__ import print_function
from __future__ import with_statement

import sys
import io
import platform
import os.path
import warnings
import re
import json
from collections import OrderedDict
import jsonschema
from . import readers
from . import helpers
from . import writers

ABSOLUTE_PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))


[documentos]class DataJson(object): """Métodos para trabajar con archivos data.json.""" # Variables por default ABSOLUTE_SCHEMA_DIR = os.path.join(ABSOLUTE_PROJECT_DIR, "schemas") DEFAULT_CATALOG_SCHEMA_FILENAME = "catalog.json" def __init__(self, schema_filename=DEFAULT_CATALOG_SCHEMA_FILENAME, schema_dir=ABSOLUTE_SCHEMA_DIR): """Crea un manipulador de `data.json`s. Salvo que se indique lo contrario, el validador de esquemas asociado es el definido por default en las constantes de clase. Args: schema_filename (str): Nombre del archivo que contiene el esquema validador. schema_dir (str): Directorio (absoluto) donde se encuentra el esquema validador (y sus referencias, de tenerlas). """ self.validator = self._create_validator(schema_filename, schema_dir) @classmethod def _create_validator(cls, schema_filename, schema_dir): """Crea el validador necesario para inicializar un objeto DataJson. Para poder resolver referencias inter-esquemas, un Validador requiere que se especifique un RefResolver (Resolvedor de Referencias) con el directorio base (absoluto) y el archivo desde el que se referencia el directorio. Para poder validar formatos, un Validador requiere que se provea explícitamente un FormatChecker. Actualmente se usa el default de la librería, jsonschema.FormatChecker(). Args: schema_filename (str): Nombre del archivo que contiene el esquema validador "maestro". schema_dir (str): Directorio (absoluto) donde se encuentra el esquema validador maestro y sus referencias, de tenerlas. Returns: Draft4Validator: Un validador de JSONSchema Draft #4. El validador se crea con un RefResolver que resuelve referencias de `schema_filename` dentro de `schema_dir`. """ schema_path = os.path.join(schema_dir, schema_filename) schema = readers.read_json(schema_path) # Según https://github.com/Julian/jsonschema/issues/98 # Permite resolver referencias locales a otros esquemas. if platform.system() == 'Windows': base_uri = "file:///" + schema_path.replace("\\", "/") else: base_uri = "file://" + schema_path resolver = jsonschema.RefResolver(base_uri=base_uri, referrer=schema) format_checker = jsonschema.FormatChecker() validator = jsonschema.Draft4Validator( schema=schema, resolver=resolver, format_checker=format_checker) return validator
[documentos] def is_valid_catalog(self, catalog): """Valida que un archivo `data.json` cumpla con el schema definido. Chequea que el data.json tiene todos los campos obligatorios y que tanto los campos obligatorios como los opcionales siguen la estructura definida en el schema. Args: catalog (str o dict): Catálogo (dict, JSON o XLSX) a ser validado. Returns: bool: True si el data.json cumple con el schema, sino False. """ catalog = readers.read_catalog(catalog) res = self.validator.is_valid(catalog) return res
@staticmethod def _update_validation_response(error, response): """Actualiza la respuesta por default acorde a un error de validación.""" new_response = response.copy() # El status del catálogo entero será ERROR new_response["status"] = "ERROR" # Adapto la información del ValidationError recibido a los fines # del validador de DataJsons error_info = { # Error Code 1 para "campo obligatorio faltante" # Error Code 2 para "error en tipo o formato de campo" "error_code": 1 if error.validator == "required" else 2, "message": error.message, "validator": error.validator, "validator_value": error.validator_value, "path": list(error.path), # La instancia validada es irrelevante si el error es de tipo 1 "instance": (None if error.validator == "required" else error.instance) } # Identifico a qué nivel de jerarquía sucedió el error. if len(error.path) >= 2 and error.path[0] == "dataset": # El error está a nivel de un dataset particular o inferior position = new_response["error"]["dataset"][error.path[1]] else: # El error está a nivel de catálogo position = new_response["error"]["catalog"] position["status"] = "ERROR" position["errors"].append(error_info) return new_response
[documentos] def validate_catalog(self, catalog): """Analiza un data.json registrando los errores que encuentra. Chequea que el data.json tiene todos los campos obligatorios y que tanto los campos obligatorios como los opcionales siguen la estructura definida en el schema. Args: catalog (str o dict): Catálogo (dict, JSON o XLSX) a ser validado. Returns: dict: Diccionario resumen de los errores encontrados:: { "status": "OK", # resultado de la validación global "error": { "catalog": { "status": "OK", "errors": [] "title": "Título Catalog"}, "dataset": [ { "status": "OK", "errors": [], "title": "Titulo Dataset 1" }, { "status": "ERROR", "errors": [error1_info, error2_info, ...], "title": "Titulo Dataset 2" } ] } } Donde errorN_info es un dict con la información del N-ésimo error encontrado, con las siguientes claves: "path", "instance", "message", "validator", "validator_value", "error_code". """ catalog = readers.read_catalog(catalog) # La respuesta por default se devuelve si no hay errores default_response = { "status": "OK", "error": { "catalog": { "status": "OK", "title": catalog.get("title"), "errors": [] }, # "dataset" contiene lista de rtas default si el catálogo # contiene la clave "dataset" y además su valor es una lista. # En caso contrario "dataset" es None. "dataset": [ { "status": "OK", "title": dataset.get("title"), "errors": [] } for dataset in catalog["dataset"] ] if ("dataset" in catalog and isinstance(catalog["dataset"], list)) else None } } # Genero la lista de errores en la instancia a validar errors_iterator = self.validator.iter_errors(catalog) final_response = default_response.copy() for error in errors_iterator: final_response = self._update_validation_response(error, final_response) return final_response
@classmethod def _dataset_report_helper(cls, dataset): """Toma un dict con la metadata de un dataset, y devuelve un dict coni los valores que dataset_report() usa para reportar sobre él. Args: dataset (dict): Diccionario con la metadata de un dataset. Returns: dict: Diccionario con los campos a nivel dataset que requiere dataset_report(). """ publisher_name = helpers.traverse_dict(dataset, ["publisher", "name"]) super_themes = None if isinstance(dataset.get("superTheme"), list): strings = [s for s in dataset.get("superTheme") if isinstance(s, (str, unicode))] super_themes = ", ".join(strings) themes = None if isinstance(dataset.get("theme"), list): strings = [s for s in dataset.get("theme") if isinstance(s, (str, unicode))] themes = ", ".join(strings) def _stringify_distribution(distribution): title = distribution.get("title") url = distribution.get("downloadURL") return "\"{}\": {}".format(title, url) distributions = [d for d in dataset["distribution"] if isinstance(d, dict)] distributions_list = None if isinstance(distributions, list): distributions_strings = [ _stringify_distribution(d) for d in distributions ] distributions_list = "\n".join(distributions_strings) fields = OrderedDict() fields["dataset_title"] = dataset.get("title") fields["dataset_accrualPeriodicity"] = dataset.get( "accrualPeriodicity") fields["dataset_description"] = dataset.get("description") fields["dataset_publisher_name"] = publisher_name fields["dataset_superTheme"] = super_themes fields["dataset_theme"] = themes fields["dataset_landingPage"] = dataset.get("landingPage") fields["distributions_list"] = distributions_list return fields @staticmethod def _catalog_report_helper(catalog, catalog_validation, url): """Toma un dict con la metadata de un catálogo, y devuelve un dict con los valores que catalog_report() usa para reportar sobre él. Args: catalog (dict): Diccionario con la metadata de un catálogo. validation (dict): Resultado, únicamente a nivel catálogo, de la validación completa de `catalog`. Returns: dict: Diccionario con los campos a nivel catálogo que requiere catalog_report(). """ fields = OrderedDict() fields["catalog_metadata_url"] = url fields["catalog_title"] = catalog.get("title") fields["catalog_description"] = catalog.get("description") fields["valid_catalog_metadata"] = ( 1 if catalog_validation["status"] == "OK" else 0) return fields def _dataset_report(self, dataset, dataset_validation, dataset_index, catalog_fields, harvest='none', report=None): """ Genera una línea del `catalog_report`, correspondiente a un dataset de los que conforman el catálogo analizado.""" dataset_report = OrderedDict(catalog_fields) dataset_report["valid_dataset_metadata"] = ( 1 if dataset_validation["status"] == "OK" else 0) dataset_report["dataset_index"] = dataset_index if harvest == 'all': dataset_report["harvest"] = 1 elif harvest == 'none': dataset_report["harvest"] = 0 elif harvest == 'valid': dataset_report["harvest"] = ( int(dataset_report["valid_dataset_metadata"])) elif harvest == 'report': if not report: raise ValueError(""" Usted eligio 'report' como criterio de harvest, pero no proveyo un valor para el argumento 'report'. Por favor, intentelo nuevamente.""") datasets_to_harvest = self._extract_datasets_to_harvest(report) dataset_report["harvest"] = ( 1 if (dataset_report["catalog_metadata_url"], dataset.get("title")) in datasets_to_harvest else 0) else: raise ValueError(""" {} no es un criterio de harvest reconocido. Pruebe con 'all', 'none', 'valid' o 'report'.""".format(harvest)) dataset_report.update(self._dataset_report_helper(dataset)) return dataset_report.copy()
[documentos] def catalog_report(self, catalog, harvest='none', report=None): """Genera un reporte sobre los datasets de un único catálogo. Args: catalog (dict, str o unicode): Representación externa (path/URL) o interna (dict) de un catálogo. harvest (str): Criterio de cosecha ('all', 'none', 'valid' o 'report'). Returns: list: Lista de diccionarios, con un elemento por cada dataset presente en `catalog`. """ url = catalog if isinstance(catalog, (str, unicode)) else None catalog = readers.read_catalog(catalog) validation = self.validate_catalog(catalog) catalog_validation = validation["error"]["catalog"] datasets_validations = validation["error"]["dataset"] catalog_fields = self._catalog_report_helper( catalog, catalog_validation, url) if "dataset" in catalog and isinstance(catalog["dataset"], list): datasets = [d if isinstance(d, dict) else {} for d in catalog["dataset"]] else: datasets = [] catalog_report = [ self._dataset_report( dataset, datasets_validations[index], index, catalog_fields, harvest, report=report ) for index, dataset in enumerate(datasets) ] return catalog_report
[documentos] def generate_datasets_report(self, catalogs, harvest='valid', report=None, export_path=None): """Genera un reporte sobre las condiciones de la metadata de los datasets contenidos en uno o varios catálogos. Args: catalogs (str, dict o list): Uno (str o dict) o varios (list de strs y/o dicts) catálogos. harvest (str): Criterio a utilizar para determinar el valor del campo "harvest" en el reporte generado ('all', 'none', 'valid' o 'report'). report (str): Path a un reporte/config especificando qué datasets marcar con harvest=1 (sólo si harvest=='report'). export_path (str): Path donde exportar el reporte generado (en formato XLSX o CSV). Si se especifica, el método no devolverá nada. Returns: list: Contiene tantos dicts como datasets estén presentes en `catalogs`, con la data del reporte generado. """ assert isinstance(catalogs, (str, unicode, dict, list)) # Si se pasa un único catálogo, genero una lista que lo contenga if isinstance(catalogs, (str, unicode, dict)): catalogs = [catalogs] catalogs_reports = [self.catalog_report(catalog, harvest, report) for catalog in catalogs] full_report = [] for report in catalogs_reports: full_report.extend(report) if export_path: writers.write_table(table=full_report, path=export_path) else: return full_report
[documentos] def generate_harvester_config(self, catalogs=None, harvest='valid', report=None, frequency='R/P1D', export_path=None): """Genera un archivo de configuración del harvester a partir de un reporte, o de un conjunto de catálogos y un criterio de cosecha (`harvest`). Args: catalogs (str, dict o list): Uno (str o dict) o varios (list de strs y/o dicts) catálogos. harvest (str): Criterio para determinar qué datasets incluir en el archivo de configuración generado ('all', 'none', 'valid' o 'report'). report (list o str): Tabla de reporte generada por generate_datasets_report() como lista de diccionarios o archivo en formato XLSX o CSV. Sólo se usa cuando `harvest=='report'`, en cuyo caso `catalogs` se ignora. frequency (str): Frecuencia de búsqueda de actualizaciones en los datasets a cosechar. Todo intervalo de frecuencia válido según ISO 8601 es válido. Es 'R/P1D' (diariamiente) por omisión, y si se pasa`None`, se conservará el valor de original de cada dataset, `dataset["accrualPeriodicity"]`. export_path (str): Path donde exportar el reporte generado (en formato XLSX o CSV). Si se especifica, el método no devolverá nada. Returns: list of dicts: Un diccionario con variables de configuración por cada dataset a cosechar. """ # Si se pasa un único catálogo, genero una lista que lo contenga if isinstance(catalogs, (str, unicode, dict)): catalogs = [catalogs] if harvest == 'report': if not report: raise ValueError(""" Usted eligio 'report' como criterio de harvest, pero no proveyo un valor para el argumento 'report'. Por favor, intentelo nuevamente.""") datasets_report = readers.read_table(report) elif harvest in ['valid', 'none', 'all']: # catalogs no puede faltar para estos criterios assert isinstance(catalogs, (str, unicode, dict, list)) datasets_report = self.generate_datasets_report(catalogs, harvest) else: raise ValueError(""" {} no es un criterio de harvest reconocido. Pruebe con 'all', 'none', 'valid' o 'report'.""".format(harvest)) config_keys = ["catalog_metadata_url", "dataset_title", "dataset_accrualPeriodicity"] harvester_config = [ OrderedDict( # Retengo únicamente los campos que necesita el harvester [(k, v) for (k, v) in dataset.items() if k in config_keys] ) # Para aquellost datasets marcados con 'harvest'==1 for dataset in datasets_report if bool(int(dataset["harvest"])) ] if frequency: valid_patterns = [ "^R/P\\d+(\\.\\d+)?[Y|M|W|D]$", "^R/PT\\d+(\\.\\d+)?[H|M|S]$" ] if any([re.match(pat, frequency) for pat in valid_patterns]): for dataset in harvester_config: dataset["dataset_accrualPeriodicity"] = frequency else: warnings.warn(""" {} no es una frecuencia de cosecha valida. Se conservara la frecuencia de actualizacion original de cada dataset.""".format(frequency)) if export_path: writers.write_table(harvester_config, export_path) else: return harvester_config
[documentos] def generate_harvestable_catalogs(self, catalogs, harvest='all', report=None, export_path=None): """Filtra los catálogos provistos según el criterio determinado en `harvest`. Args: catalogs (str, dict o list): Uno (str o dict) o varios (list de strs y/o dicts) catálogos. harvest (str): Criterio para determinar qué datasets conservar de cada catálogo ('all', 'none', 'valid' o 'report'). report (list o str): Tabla de reporte generada por generate_datasets_report() como lista de diccionarios o archivo en formato XLSX o CSV. Sólo se usa cuando `harvest=='report'`. export_path (str): Path a un archivo JSON o directorio donde exportar los catálogos filtrados. Si termina en ".json" se exportará la lista de catálogos a un único archivo. Si es un directorio, se guardará en él un JSON por catálogo. Si se especifica `export_path`, el método no devolverá nada. Returns: list of dicts: Lista de catálogos. """ assert isinstance(catalogs, (str, unicode, dict, list)) # Si se pasa un único catálogo, genero una lista que lo contenga if isinstance(catalogs, (str, unicode, dict)): catalogs = [catalogs] harvestable_catalogs = [readers.read_catalog(c) for c in catalogs] catalogs_urls = [catalog if isinstance(catalog, (str, unicode)) else None for catalog in catalogs] # aplica los criterios de cosecha if harvest == 'all': pass elif harvest == 'none': for catalog in harvestable_catalogs: catalog["dataset"] = [] elif harvest == 'valid': report = self.generate_datasets_report(catalogs, harvest) return self.generate_harvestable_catalogs( catalogs=catalogs, harvest='report', report=report, export_path=export_path) elif harvest == 'report': if not report: raise ValueError(""" Usted eligio 'report' como criterio de harvest, pero no proveyo un valor para el argumento 'report'. Por favor, intentelo nuevamente.""") datasets_to_harvest = self._extract_datasets_to_harvest(report) for idx_cat, catalog in enumerate(harvestable_catalogs): catalog_url = catalogs_urls[idx_cat] if ("dataset" in catalog and isinstance(catalog["dataset"], list)): catalog["dataset"] = [ dataset for dataset in catalog["dataset"] if (catalog_url, dataset.get("title")) in datasets_to_harvest ] else: catalog["dataset"] = [] else: raise ValueError(""" {} no es un criterio de harvest reconocido. Pruebe con 'all', 'none', 'valid' o 'report'.""".format(harvest)) # devuelve los catálogos harvesteables if export_path and os.path.isdir(export_path): # Creo un JSON por catálogo for idx, catalog in enumerate(harvestable_catalogs): filename = os.path.join(export_path, "catalog_{}".format(idx)) writers.write_json(catalog, filename) elif export_path: # Creo un único JSON con todos los catálogos writers.write_json(harvestable_catalogs, export_path) else: return harvestable_catalogs
[documentos] def generate_datasets_summary(self, catalog, export_path=None): """Genera un informe sobre los datasets presentes en un catálogo, indicando para cada uno: - Índice en la lista catalog["dataset"] - Título - Identificador - Cantidad de distribuciones - Estado de sus metadatos ["OK"|"ERROR"] Es utilizada por la rutina diaria de `libreria-catalogos` para reportar sobre los datasets de los catálogos mantenidos. Args: catalog (str o dict): Path a un catálogo en cualquier formato, JSON, XLSX, o diccionario de python. export_path (str): Path donde exportar el informe generado (en formato XLSX o CSV). Si se especifica, el método no devolverá nada. Returns: list: Contiene tantos dicts como datasets estén presentes en `catalogs`, con los datos antes mencionados. """ catalog = readers.read_catalog(catalog) # Trato de leer todos los datasets bien formados de la lista # catalog["dataset"], si existe. if "dataset" in catalog and isinstance(catalog["dataset"], list): datasets = [d if isinstance(d, dict) else {} for d in catalog["dataset"]] else: # Si no, considero que no hay datasets presentes datasets = [] validation = self.validate_catalog(catalog)["error"]["dataset"] def info_dataset(index, dataset): """Recolecta información básica de un dataset.""" info = OrderedDict() info["indice"] = index info["titulo"] = dataset.get("title") info["identificador"] = dataset.get("identifier") info["estado_metadatos"] = validation[index]["status"] info["cant_errores"] = len(validation[index]["errors"]) info["cant_distribuciones"] = len(dataset["distribution"]) return info summary = [info_dataset(i, ds) for i, ds in enumerate(datasets)] if export_path: writers.write_table(summary, export_path) else: return summary
[documentos] def generate_catalog_readme(self, catalog, export_path=None): """Genera una descripción textual en formato Markdown sobre los metadatos generales de un catálogo (título, editor, fecha de publicación, et cetera), junto con: - estado de los metadatos a nivel catálogo, - estado global de los metadatos, y - cantidad de datasets y distribuciones incluidas Es utilizada por la rutina diaria de `libreria-catalogos` para generar un README con información básica sobre los catálogos mantenidos. Args: catalog (str o dict): Path a un catálogo en cualquier formato, JSON, XLSX, o diccionario de python. export_path (str): Path donde exportar el texto generado (en formato Markdown). Si se especifica, el método no devolverá nada. Returns: str: Texto de la descripción generada. """ catalog = readers.read_catalog(catalog) validation = self.validate_catalog(catalog) readme_template = """ # Catálogo: {title} ## Información General - **Autor**: {publisher_name} - **Correo Electrónico**: {publisher_mbox} - **Nombre del catálogo**: {title} - **Descripción**: > {description} ## Estado de los metadatos y cantidad de recursos Estado metadatos globales | Estado metadatos catálogo | # de Datasets | # de Distribuciones --------------------------|---------------------------|---------------|-------------------- {global_status} | {catalog_status} | {no_of_datasets} | {no_of_distributions} ## Datasets incluidos Por favor, consulte el informe [`datasets.csv`](datasets.csv). """ content = { "title": catalog.get("title"), "publisher_name": helpers.traverse_dict( catalog, ["publisher", "name"]), "publisher_mbox": helpers.traverse_dict( catalog, ["publisher", "mbox"]), "description": catalog.get("description"), "global_status": validation["status"], "catalog_status": validation["error"]["catalog"]["status"], "no_of_datasets": len(catalog["dataset"]), "no_of_distributions": sum([len(dataset["distribution"]) for dataset in catalog["dataset"]]) } catalog_readme = readme_template.format(**content) if export_path: with io.open(export_path, 'w', encoding='utf-8') as target: target.write(catalog_readme) else: return catalog_readme
@classmethod def _extract_datasets_to_harvest(cls, report): """Extrae de un reporte los datos necesarios para reconocer qué datasets marcar para cosecha en cualquier generador. Args: report (str o list): Reporte (lista de dicts) o path a uno. Returns: list: Lista de tuplas con los títulos de catálogo y dataset de cada reporte extraído. """ assert isinstance(report, (str, unicode, list)) # Si `report` es una lista de tuplas con longitud 2, asumimos que es un # reporte procesado para extraer los datasets a harvestear. Se devuelve # intacta. if (isinstance(report, list) and all([isinstance(x, tuple) and len(x) == 2 for x in report])): return report table = readers.read_table(report) table_keys = table[0].keys() expected_keys = ["catalog_metadata_url", "dataset_title", "dataset_accrualPeriodicity"] # Verifico la presencia de las claves básicas de un config de harvester for key in expected_keys: if key not in table_keys: raise KeyError(""" El reporte no contiene la clave obligatoria {}. Pruebe con otro archivo. """.format(key)) if "harvest" in table_keys: # El archivo es un reporte de datasets. datasets_to_harvest = [ (row["catalog_metadata_url"], row["dataset_title"]) for row in table if int(row["harvest"])] else: # El archivo es un config de harvester. datasets_to_harvest = [ (row["catalog_metadata_url"], row["dataset_title"]) for row in table] return datasets_to_harvest
[documentos]def main(): """Permite ejecutar el módulo por línea de comandos. Valida un path o url a un archivo data.json devolviendo True/False si es válido y luego el resultado completo. Example: python pydatajson.py http://181.209.63.71/data.json python pydatajson.py ~/github/pydatajson/tests/samples/full_data.json """ try: datajson_file = sys.argv[1] dj_instance = DataJson() bool_res = dj_instance.is_valid_catalog(datajson_file) full_res = dj_instance.validate_catalog(datajson_file) pretty_full_res = json.dumps( full_res, indent=4, separators=(",", ": ")) print(bool_res) print(pretty_full_res) except IndexError as errmsg: format_str = """ {}: pydatajson.py fue ejecutado como script sin proveer un argumento """ print(format_str.format(errmsg))
if __name__ == '__main__': main()