Source code for pydatajson.pydatajson

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Módulo principal de pydatajson

Contiene la clase DataJson que reúne los métodos públicos para trabajar con
archivos data.json.
"""

from __future__ import unicode_literals
from __future__ import print_function
from __future__ import with_statement

import sys
import os.path
from urlparse import urljoin, urlparse
import warnings
import json
import jsonschema
import requests
import unicodecsv as csv

ABSOLUTE_PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))


[docs]class DataJson(object): """Métodos para trabajar con archivos data.json.""" # Variables por default ABSOLUTE_SCHEMA_DIR = os.path.join(ABSOLUTE_PROJECT_DIR, "schemas") DEFAULT_CATALOG_SCHEMA_FILENAME = "catalog.json" def __init__(self, schema_filename=DEFAULT_CATALOG_SCHEMA_FILENAME, schema_dir=ABSOLUTE_SCHEMA_DIR): """Crea un manipulador de `data.json`s. Salvo que se indique lo contrario, el validador de esquemas asociado es el definido por default en las constantes de clase. Args: schema_filename (str): Nombre del archivo que contiene el esquema validador. schema_dir (str): Directorio (absoluto) donde se encuentra el esquema validador (y sus referencias, de tenerlas). """ self.validator = self._create_validator(schema_filename, schema_dir) @classmethod def _create_validator(cls, schema_filename, schema_dir): """Crea el validador necesario para inicializar un objeto DataJson. Para poder resolver referencias inter-esquemas, un Validador requiere que se especifique un RefResolver (Resolvedor de Referencias) con el directorio base (absoluto) y el archivo desde el que se referencia el directorio. Para poder validar formatos, un Validador requiere que se provea explícitamente un FormatChecker. Actualmente se usa el default de la librería, jsonschema.FormatChecker(). Args: schema_filename (str): Nombre del archivo que contiene el esquema validador "maestro". schema_dir (str): Directorio (absoluto) donde se encuentra el esquema validador maestro y sus referencias, de tenerlas. Returns: Draft4Validator: Un validador de JSONSchema Draft #4. El validador se crea con un RefResolver que resuelve referencias de `schema_filename` dentro de `schema_dir`. """ schema_path = os.path.join(schema_dir, schema_filename) schema = cls._json_to_dict(schema_path) # Según https://github.com/Julian/jsonschema/issues/98 # Permite resolver referencias locales a otros esquemas. resolver = jsonschema.RefResolver( base_uri=urljoin('file:', schema_path), referrer=schema) format_checker = jsonschema.FormatChecker() validator = jsonschema.Draft4Validator( schema=schema, resolver=resolver, format_checker=format_checker) return validator @staticmethod def _json_to_dict(dict_or_json_path): """Toma el path a un JSON y devuelve el diccionario que representa. Si el argumento es un dict, lo deja pasar. Si es un string asume que el parámetro es una URL si comienza con 'http' o 'https', o un path local de lo contrario. Args: dict_or_json_path (dict or str): Si es un str, path local o URL remota a un archivo de texto plano en formato JSON. Returns: dict: El diccionario que resulta de deserializar dict_or_json_path. """ assert isinstance(dict_or_json_path, (dict, str, unicode)) if isinstance(dict_or_json_path, dict): return dict_or_json_path parsed_url = urlparse(dict_or_json_path) if parsed_url.scheme in ["http", "https"]: req = requests.get(dict_or_json_path) json_string = req.content else: # En caso de que dict_or_json_path parezca ser una URL remota, # advertirlo path_start = parsed_url.path.split(".")[0] if path_start == "www" or path_start.isdigit(): warnings.warn(""" La dirección del archivo JSON ingresada parece una URL, pero no comienza con 'http' o 'https' así que será tratada como una dirección local. ¿Tal vez quiso decir 'http://{}'? """.format(dict_or_json_path).encode("utf8")) with open(dict_or_json_path) as json_file: json_string = json_file.read() json_dict = json.loads(json_string, encoding="utf8") return json_dict @staticmethod def _traverse_dict(dicc, keys, default_value=None): """Recorre un diccionario siguiendo una lista de claves, y devuelve default_value en caso de que alguna de ellas no exista.""" for key in keys: if isinstance(dicc, dict) and key in dicc: dicc = dicc[key] elif isinstance(dicc, list): dicc = dicc[key] else: return default_value return dicc
[docs] def is_valid_catalog(self, datajson_path): """Valida que un archivo `data.json` cumpla con el schema definido. Chequea que el data.json tiene todos los campos obligatorios y que tanto los campos obligatorios como los opcionales siguen la estructura definida en el schema. Args: datajson_path (str): Path al archivo data.json a ser validado. Returns: bool: True si el data.json cumple con el schema, sino False. """ datajson = self._json_to_dict(datajson_path) res = self.validator.is_valid(datajson) return res
[docs] def validate_catalog(self, datajson_path): """Analiza un data.json registrando los errores que encuentra. Chequea que el data.json tiene todos los campos obligatorios y que tanto los campos obligatorios como los opcionales siguen la estructura definida en el schema. Args: datajson_path (str): Path al archivo data.json a ser validado. Returns: dict: Diccionario resumen de los errores encontrados:: { "status": "OK", # resultado de la validación global "error": { "catalog": { "status": "OK", "errors": [] "title": "Título Catalog"}, "dataset": [ { "status": "OK", "errors": [], "title": "Titulo Dataset 1" }, { "status": "ERROR", "errors": [error1_info, error2_info, ...], "title": "Titulo Dataset 2" } ] } } Donde errorN_info es un dict con la información del N-ésimo error encontrado, con las siguientes claves: "path", "instance", "message", "validator", "validator_value", "error_code". """ datajson = self._json_to_dict(datajson_path) # La respuesta por default se devuelve si no hay errores default_response = { "status": "OK", "error": { "catalog": { "status": "OK", "title": datajson.get("title"), "errors": [] }, # "dataset" contiene lista de rtas default si el catálogo # contiene la clave "dataset" y además su valor es una lista. # En caso contrario "dataset" es None. "dataset": [ { "status": "OK", "title": dataset.get("title"), "errors": [] } for dataset in datajson["dataset"] ] if ("dataset" in datajson and isinstance(datajson["dataset"], list)) else None } } def _update_response(error, response): """Actualiza la respuesta por default acorde a un error de validación.""" new_response = response.copy() # El status del catálogo entero será ERROR new_response["status"] = "ERROR" # Adapto la información del ValidationError recibido a los fines # del validador de DataJsons error_info = { # Error Code 1 para "campo obligatorio faltante" # Error Code 2 para "error en tipo o formato de campo" "error_code": 1 if error.validator == "required" else 2, "message": error.message, "validator": error.validator, "validator_value": error.validator_value, "path": list(error.path), # La instancia validada es irrelevante si el error es de tipo 1 "instance": (None if error.validator == "required" else error.instance) } # Identifico a qué nivel de jerarquía sucedió el error. if len(error.path) >= 2 and error.path[0] == "dataset": # El error está a nivel de un dataset particular o inferior position = new_response["error"]["dataset"][error.path[1]] else: # El error está a nivel de catálogo position = new_response["error"]["catalog"] position["status"] = "ERROR" position["errors"].append(error_info) return new_response # Genero la lista de errores en la instancia a validar errors_iterator = self.validator.iter_errors(datajson) final_response = default_response.copy() for error in errors_iterator: final_response = _update_response(error, final_response) return final_response
def _dataset_report_helper(self, dataset, dataset_validation): """Toma un dict con la metadata de un dataset, y devuelve un dict con los valores que generate_datasets_report() usa para reportar sobre él.""" valid_metadata = 1 if dataset_validation["status"] == "OK" else 0 publisher_name = self._traverse_dict(dataset, ["publisher", "name"]) super_themes = None if isinstance(dataset.get("superTheme"), list): strings = [s for s in dataset.get("superTheme") if isinstance(s, (str, unicode))] super_themes = ", ".join(strings) themes = None if isinstance(dataset.get("theme"), list): strings = [s for s in dataset.get("theme") if isinstance(s, (str, unicode))] themes = ", ".join(strings) def _stringify_distribution(distribution): title = distribution.get("title") url = distribution.get("downloadURL") return "\"{}\": {}".format(title, url) distributions = [d for d in dataset["distribution"] if isinstance(d, dict)] distributions_list = None if isinstance(distributions, list): distributions_strings = [ _stringify_distribution(d) for d in distributions ] distributions_list = "\n".join(distributions_strings) dataset_report = { "dataset_title": dataset.get("title"), "dataset_accrualPeriodicity": dataset.get("accrualPeriodicity"), "valid_dataset_metadata": valid_metadata, "harvest": 0, "dataset_description": dataset.get("description"), "dataset_publisher_name": publisher_name, "dataset_superTheme": super_themes, "dataset_theme": themes, "dataset_landingPage": dataset.get("landingPage"), "distributions_list": distributions_list } return dataset_report
[docs] def generate_datasets_report(self, catalogs, report_path): """Genera un reporte sobre las condiciones de la metadata de los datasets contenidos en uno o varios catálogos. El método no devuelve nada, pero genera un "reporte de datasets" en el `report_path` indicado. Dicho reporte es un CSV que consta de una línea por cada dataset presente en los catálogos provistos, con varios campos útiles (`report_fieldnames`) para decidir si harvestear o no cierto dataset. Args: catalogs (str, dict o list): Uno (str o dict) o varios (list de strs y/o dicts) elementos con la metadata de un catálogo. Tienen que poder ser interpretados por self._json_to_dict() report_path (str): Path donde se espera que se guarde el reporte sobre datasets generado. Returns: None """ report_fieldnames = [ 'catalog_metadata_url', 'catalog_title', 'catalog_description', 'valid_catalog_metadata', 'dataset_index', 'dataset_title', 'dataset_accrualPeriodicity', 'valid_dataset_metadata', 'harvest', 'dataset_description', 'dataset_publisher_name', 'dataset_superTheme', 'dataset_theme', 'dataset_landingPage', 'distributions_list' ] # Si se pasa un único catálogo, convertirlo en lista if isinstance(catalogs, (dict, str, unicode)): catalogs = [catalogs] with open(report_path, 'w') as report_file: writer = csv.DictWriter(report_file, report_fieldnames, lineterminator="\n", encoding="utf-8") writer.writeheader() for index, catalog in enumerate(catalogs): assert isinstance(catalog, (dict, str, unicode)) if isinstance(catalog, (str, unicode)): catalog_metadata_url = catalog catalog = self._json_to_dict(catalog) else: catalog_metadata_url = None if "dataset" not in catalog: warnings.warn(""" El catálogo en la posición {}, "{}", no contiene la clave "dataset", y por ende no se puede reportar sobre él. """.format(index, catalog_metadata_url).encode("utf-8")) continue validation = self.validate_catalog(catalog) datasets = [] if isinstance(catalog["dataset"], list): datasets = [d for d in catalog["dataset"] if isinstance(d, dict)] for index, dataset in enumerate(datasets): dataset_report = { "catalog_metadata_url": catalog_metadata_url, "catalog_title": catalog.get("title"), "catalog_description": catalog.get("description"), "valid_catalog_metadata": (1 if validation["error"][ "catalog"]["status"] == "OK" else 0), "dataset_index": index } dataset_validation = validation["error"]["dataset"][index] dataset_report.update( self._dataset_report_helper(dataset, dataset_validation)) writer.writerow(dataset_report)
@staticmethod
[docs] def generate_harvester_config(report_path, config_path): """Genera un archivo de configuración del harvester según el reporte provisto. Se espera que `report_path` apunte a un archivo producido por `generate_datasets_report(catalogs, report_path)`, al cual se le modificaron algunos 0 (ceros) por 1 (unos) en la columna "harvest". Este método no devuelve nada. Como efecto sencudario, genera un archivo de configuración en `config_path` manteniendo de `report_path` únicamente los campos necesarios para el harvester, **de aquellos datasets para los cuales el valor de "harvest" es igual a 1**. Args: report_path (str): Path a un reporte de datasets procesado. config_path (str): Path donde se generará el archivo de configuración del harvester. Returns: None """ with open(report_path) as report_file: reader = csv.DictReader(report_file) with open(config_path, 'w') as config_file: config_fieldnames = ["catalog_metadata_url", "dataset_title", "dataset_accrualPeriodicity"] writer = csv.DictWriter(config_file, fieldnames=config_fieldnames, lineterminator="\n", extrasaction='ignore', encoding='utf-8') writer.writeheader() for row in reader: if row["harvest"] == "1": writer.writerow(row)
[docs] def generate_harvestable_catalogs(self, catalogs, report_path, write_to_file, files_dir): """Genera archivo de configuración del harvester según el reporte. Args: report_path (str): config_path (str): write_to_file (bool): files_dir (str): """ raise NotImplementedError
[docs]def main(): """Permite ejecutar el módulo por línea de comandos. Valida un path o url a un archivo data.json devolviendo True/False si es válido y luego el resultado completo. Example: python pydatajson.py http://181.209.63.71/data.json python pydatajson.py ~/github/pydatajson/tests/samples/full_data.json """ datajson_file = sys.argv[1] dj_instance = DataJson() bool_res = dj_instance.is_valid_catalog(datajson_file) full_res = dj_instance.validate_catalog(datajson_file) print(bool_res) print(json.dumps(full_res, separators=(",", ": "), indent=4))
if __name__ == '__main__': main()