Código fuente para pydatajson.core

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Módulo principal de pydatajson

Contiene la clase DataJson que reúne los métodos públicos para trabajar con
archivos data.json.
"""

from __future__ import print_function
from __future__ import unicode_literals
from __future__ import with_statement

import io
import json
import os.path
import re
import sys
import warnings
from collections import OrderedDict
from datetime import datetime

from openpyxl.styles import Alignment, Font
from six import string_types, iteritems
from six.moves.urllib_parse import urljoin

from . import documentation
from . import helpers
from . import indicators
from . import readers
from . import search
from . import validation
from . import writers
from . import federation
from . import transformation
from . import backup

ABSOLUTE_PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
CENTRAL_CATALOG = "http://datos.gob.ar/data.json"
DATA_FORMATS = [
    "csv", "xls", "xlsx", "ods", "dta",
    "shp", "kml",
    "json", "xml",
    "zip"
]
MIN_DATASET_TITLE = 10
MIN_DATASET_DESCRIPTION = 20


[documentos]class DataJson(dict): """Métodos para trabajar con archivos data.json.""" # Variables por default CATALOG_FIELDS_PATH = os.path.join(ABSOLUTE_PROJECT_DIR, "fields") def __init__(self, catalog=None, schema_filename=None, schema_dir=None, default_values=None): """Crea un manipulador de `data.json`s. Salvo que se indique lo contrario, el validador de esquemas asociado es el definido por default en las constantes de clase. Args: schema_filename (str): Nombre del archivo que contiene el esquema validador. schema_dir (str): Directorio (absoluto) donde se encuentra el esquema validador (y sus referencias, de tenerlas). """ # se construye el objeto DataJson con la interfaz de un dicconario if catalog: # lee representaciones de un catálogo hacia un diccionario catalog = readers.read_catalog(catalog, default_values=default_values) # copia todos los atributos del diccionario hacia el objeto for key, value in iteritems(catalog): self[key] = value self.has_catalog = True # indexa los ids de datasets, distribuciones y fields self._build_index() else: self.has_catalog = False self.validator = validation.create_validator( schema_filename, schema_dir) # asigno docstrings de los métodos modularizados fn_doc = indicators.generate_catalogs_indicators.__doc__ self.generate_catalogs_indicators.__func__.__doc__ = fn_doc fn_doc = validation.is_valid_catalog.__doc__ self.is_valid_catalog.__func__.__doc__ = fn_doc # metodos para buscar entidades cuando DataJson tiene catalogo cargado get_themes = search.get_themes themes = property(get_themes) get_datasets = search.get_datasets datasets = property(get_datasets) get_distributions = search.get_distributions distributions = property(get_distributions) get_fields = search.get_fields fields = property(get_fields) get_time_series = search.get_time_series time_series = property(get_time_series) get_dataset = search.get_dataset get_distribution = search.get_distribution get_field = search.get_field get_theme = search.get_theme get_field_location = search.get_field_location get_catalog_metadata = search.get_catalog_metadata # metodos para realizar operaciones de transformación de metadatos generate_distribution_ids = transformation.generate_distribution_ids # metodos para guardar el catálogo en otros formatos to_xlsx = writers.write_xlsx_catalog to_json = writers.write_json_catalog # metodos para hacer backups make_catalog_backup = backup.make_catalog_backup # Metodos para interactuar con un portal de CKAN push_dataset_to_ckan = federation.push_dataset_to_ckan harvest_dataset_to_ckan = federation.harvest_dataset_to_ckan restore_dataset_to_ckan = federation.restore_dataset_to_ckan harvest_catalog_to_ckan = federation.harvest_catalog_to_ckan restore_catalog_to_ckan = federation.restore_catalog_to_ckan push_theme_to_ckan = federation.push_theme_to_ckan push_new_themes = federation.push_new_themes remove_harvested_ds_from_ckan = federation.remove_harvested_ds_from_ckan def _build_index(self): """Itera todos los datasets, distribucioens y fields indexandolos.""" datasets_index = {} distributions_index = {} fields_index = {} # recorre todos los datasets for dataset_index, dataset in enumerate(self.datasets): if "identifier" in dataset: datasets_index[dataset["identifier"]] = { "dataset_index": dataset_index } # recorre las distribuciones del dataset for distribution_index, distribution in enumerate( dataset.get("distribution", [])): if "identifier" in distribution: distributions_index[distribution["identifier"]] = { "distribution_index": distribution_index, "dataset_identifier": dataset["identifier"] } # recorre los fields de la distribucion for field_index, field in enumerate( distribution.get("field", [])): if "id" in field: fields_index[field["id"]] = { "field_index": field_index, "dataset_identifier": dataset["identifier"], "distribution_identifier": distribution[ "identifier"] } setattr(self, "_distributions_index", distributions_index) setattr(self, "_datasets_index", datasets_index) setattr(self, "_fields_index", fields_index)
[documentos] def remove_dataset(self, identifier): for index, dataset in enumerate(self["dataset"]): if dataset["identifier"] == identifier: self["dataset"].pop(index) print("Dataset {} en posicion {} fue eliminado.".format( identifier, index)) return print("No se encontro el dataset {}.".format(identifier))
[documentos] def remove_distribution(self, identifier, dataset_identifier=None): for dataset in self["dataset"]: for index, distribution in enumerate(dataset["distribution"]): if (distribution["identifier"] == identifier and (not dataset_identifier or dataset["identifier"] == dataset_identifier)): dataset["distribution"].pop(index) print("Distribution {} del dataset {} en posicion {} fue eliminada.".format( identifier, dataset["identifier"], index)) return print("No se encontro la distribucion {}.".format(identifier))
[documentos] def is_valid_catalog(self, catalog=None): catalog = catalog or self return validation.is_valid_catalog(catalog, validator=self.validator)
@staticmethod def _update_validation_response(error, response): """Actualiza la respuesta por default acorde a un error de validación.""" new_response = response.copy() # El status del catálogo entero será ERROR new_response["status"] = "ERROR" # Adapto la información del ValidationError recibido a los fines # del validador de DataJsons error_info = { # Error Code 1 para "campo obligatorio faltante" # Error Code 2 para "error en tipo o formato de campo" "error_code": 1 if error.validator == "required" else 2, "message": error.message, "validator": error.validator, "validator_value": error.validator_value, "path": list(error.path), # La instancia validada es irrelevante si el error es de tipo 1 "instance": (None if error.validator == "required" else error.instance) } # Identifico a qué nivel de jerarquía sucedió el error. if len(error.path) >= 2 and error.path[0] == "dataset": # El error está a nivel de un dataset particular o inferior position = new_response["error"]["dataset"][error.path[1]] else: # El error está a nivel de catálogo position = new_response["error"]["catalog"] position["status"] = "ERROR" position["errors"].append(error_info) return new_response
[documentos] def validate_catalog(self, catalog=None, only_errors=False, fmt="dict", export_path=None): catalog = catalog or self return validation.validate_catalog( catalog, only_errors, fmt, export_path, validator=self.validator)
@staticmethod def _stringify_list(str_or_list): if isinstance(str_or_list, list): strings = [s for s in str_or_list if isinstance(s, string_types)] stringified_list = ", ".join(strings) elif isinstance(str_or_list, string_types): stringified_list = str_or_list else: stringified_list = None return stringified_list @classmethod def _dataset_report_helper(cls, dataset, catalog_homepage=None): """Toma un dict con la metadata de un dataset, y devuelve un dict coni los valores que dataset_report() usa para reportar sobre él. Args: dataset (dict): Diccionario con la metadata de un dataset. Returns: dict: Diccionario con los campos a nivel dataset que requiere dataset_report(). """ publisher_name = helpers.traverse_dict(dataset, ["publisher", "name"]) languages = cls._stringify_list(dataset.get("language")) super_themes = cls._stringify_list(dataset.get("superTheme")) themes = cls._stringify_list(dataset.get("theme")) def _stringify_distribution(distribution): title = distribution.get("title") url = distribution.get("downloadURL") return "\"{}\": {}".format(title, url) distributions = [d for d in dataset["distribution"] if isinstance(d, dict)] # crea lista de distribuciones distributions_list = None if isinstance(distributions, list): distributions_strings = [ _stringify_distribution(d) for d in distributions ] distributions_list = "\n\n".join(distributions_strings) # crea lista de formatos distributions_formats = json.dumps( cls._count_distribution_formats_dataset(dataset)) fields = OrderedDict() fields["dataset_identifier"] = dataset.get("identifier") fields["dataset_title"] = dataset.get("title") fields["dataset_accrualPeriodicity"] = dataset.get( "accrualPeriodicity") fields["dataset_description"] = dataset.get("description") fields["dataset_publisher_name"] = publisher_name fields["dataset_superTheme"] = super_themes fields["dataset_theme"] = themes fields["dataset_landingPage"] = dataset.get("landingPage") fields["dataset_landingPage_generated"] = cls._generate_landingPage( catalog_homepage, dataset.get("identifier") ) fields["dataset_issued"] = dataset.get("issued") fields["dataset_modified"] = dataset.get("modified") fields["distributions_formats"] = distributions_formats fields["distributions_list"] = distributions_list fields["dataset_license"] = dataset.get("license") fields["dataset_language"] = languages fields["dataset_spatial"] = dataset.get("spatial") fields["dataset_temporal"] = dataset.get("temporal") return fields @classmethod def _generate_landingPage(cls, catalog_homepage, dataset_identifier): return urljoin(catalog_homepage, "dataset/{}".format(dataset_identifier)) @staticmethod def _catalog_report_helper(catalog, catalog_validation, url, catalog_id, catalog_org): """Toma un dict con la metadata de un catálogo, y devuelve un dict con los valores que catalog_report() usa para reportar sobre él. Args: catalog (dict): Diccionario con la metadata de un catálogo. validation (dict): Resultado, únicamente a nivel catálogo, de la validación completa de `catalog`. Returns: dict: Diccionario con los campos a nivel catálogo que requiere catalog_report(). """ fields = OrderedDict() fields["catalog_metadata_url"] = url fields["catalog_federation_id"] = catalog_id fields["catalog_federation_org"] = catalog_org fields["catalog_title"] = catalog.get("title") fields["catalog_description"] = catalog.get("description") fields["valid_catalog_metadata"] = ( 1 if catalog_validation["status"] == "OK" else 0) return fields def _dataset_report( self, dataset, dataset_validation, dataset_index, catalog_fields, harvest='none', report=None, catalog_homepage=None ): """ Genera una línea del `catalog_report`, correspondiente a un dataset de los que conforman el catálogo analizado.""" # hace un breve análisis de qa al dataset good_qa, notes = self._dataset_qa(dataset) dataset_report = OrderedDict(catalog_fields) dataset_report["valid_dataset_metadata"] = ( 1 if dataset_validation["status"] == "OK" else 0) dataset_report["dataset_index"] = dataset_index if isinstance(harvest, list): dataset_report["harvest"] = 1 if dataset["title"] in harvest else 0 elif harvest == 'all': dataset_report["harvest"] = 1 elif harvest == 'none': dataset_report["harvest"] = 0 elif harvest == 'valid': dataset_report["harvest"] = ( int(dataset_report["valid_dataset_metadata"])) elif harvest == 'good': valid_metadata = int(dataset_report["valid_dataset_metadata"]) == 1 dataset_report["harvest"] = 1 if valid_metadata and good_qa else 0 elif harvest == 'report': if not report: raise ValueError(""" Usted eligio 'report' como criterio de harvest, pero no proveyo un valor para el argumento 'report'. Por favor, intentelo nuevamente.""") datasets_to_harvest = self._extract_datasets_to_harvest(report) dataset_report["harvest"] = ( 1 if (dataset_report["catalog_metadata_url"], dataset.get("title")) in datasets_to_harvest else 0) else: raise ValueError(""" {} no es un criterio de harvest reconocido. Pruebe con 'all', 'none', 'valid' o 'report'.""".format(harvest)) dataset_report.update( self._dataset_report_helper( dataset, catalog_homepage=catalog_homepage) ) dataset_report["notas"] = "\n\n".join(notes) return dataset_report.copy() def _dataset_qa(self, dataset): """Chequea si el dataset tiene una calidad mínima para cosechar.""" # 1. VALIDACIONES # chequea que haya por lo menos algún formato de datos reconocido has_data_format = False formats = self._count_distribution_formats_dataset(dataset).keys() for distrib_format in formats: for data_format in DATA_FORMATS: if data_format.lower() in distrib_format.lower(): has_data_format = True break if has_data_format: break # chequea que algunos campos tengan longitudes mínimas has_title = "title" in dataset has_description = "description" in dataset if has_title: has_min_title = len(dataset["title"]) >= MIN_DATASET_TITLE else: has_min_title = False if has_description: has_min_desc = len( dataset["description"]) >= MIN_DATASET_DESCRIPTION else: has_min_desc = False # 2. EVALUACION DE COSECHA: evalua si se cosecha o no el dataset harvest = (has_title and has_description and has_data_format and has_min_title and has_min_desc) # 3. NOTAS: genera notas de validación notes = [] if not has_data_format: notes.append("No tiene distribuciones con datos.") if not has_title: notes.append("Dataset sin titulo {}".format(dataset)) else: if not has_min_title: notes.append("Titulo tiene menos de {} caracteres".format( MIN_DATASET_TITLE)) if not has_description: notes.append("Dataset sin descripcion {}".format(dataset)) else: if not has_min_desc: notes.append("Descripcion tiene menos de {} caracteres".format( MIN_DATASET_DESCRIPTION)) return harvest, notes
[documentos] def catalog_report(self, catalog, harvest='none', report=None, catalog_id=None, catalog_homepage=None, catalog_org=None): """Genera un reporte sobre los datasets de un único catálogo. Args: catalog (dict, str o unicode): Representación externa (path/URL) o interna (dict) de un catálogo. harvest (str): Criterio de cosecha ('all', 'none', 'valid', 'report' o 'good'). Returns: list: Lista de diccionarios, con un elemento por cada dataset presente en `catalog`. """ url = catalog if isinstance(catalog, string_types) else None catalog = readers.read_catalog(catalog) validation = self.validate_catalog(catalog) catalog_validation = validation["error"]["catalog"] datasets_validations = validation["error"]["dataset"] catalog_fields = self._catalog_report_helper( catalog, catalog_validation, url, catalog_id, catalog_org ) if "dataset" in catalog and isinstance(catalog["dataset"], list): datasets = [d if isinstance(d, dict) else {} for d in catalog["dataset"]] else: datasets = [] catalog_report = [ self._dataset_report( dataset, datasets_validations[index], index, catalog_fields, harvest, report=report, catalog_homepage=catalog_homepage ) for index, dataset in enumerate(datasets) ] return catalog_report
[documentos] def generate_datasets_report( self, catalogs, harvest='valid', report=None, export_path=None, catalog_ids=None, catalog_homepages=None, catalog_orgs=None ): """Genera un reporte sobre las condiciones de la metadata de los datasets contenidos en uno o varios catálogos. Args: catalogs (str, dict o list): Uno (str o dict) o varios (list de strs y/o dicts) catálogos. harvest (str): Criterio a utilizar para determinar el valor del campo "harvest" en el reporte generado ('all', 'none', 'valid', 'report' o 'good'). report (str): Path a un reporte/config especificando qué datasets marcar con harvest=1 (sólo si harvest=='report'). export_path (str): Path donde exportar el reporte generado (en formato XLSX o CSV). Si se especifica, el método no devolverá nada. catalog_id (str): Nombre identificador del catálogo para federación catalog_homepage (str): URL del portal de datos donde está implementado el catálogo. Sólo se pasa si el portal es un CKAN o respeta la estructura: https://datos.{organismo}.gob.ar/dataset/{dataset_identifier} Returns: list: Contiene tantos dicts como datasets estén presentes en `catalogs`, con la data del reporte generado. """ assert isinstance(catalogs, string_types + (dict, list)) if isinstance(catalogs, list): assert not catalog_ids or len(catalogs) == len(catalog_ids) assert not catalog_orgs or len(catalogs) == len(catalog_orgs) assert not catalog_homepages or len( catalogs) == len(catalog_homepages) # Si se pasa un único catálogo, genero una lista que lo contenga if isinstance(catalogs, string_types + (dict,)): catalogs = [catalogs] # convierto los catalogos a objetos DataJson catalogs = list(map(readers.read_catalog_obj, catalogs)) if not catalog_ids: catalog_ids = [] for catalog in catalogs: catalog_ids.append(catalog.get("identifier", "")) if isinstance(catalog_ids, string_types + (dict,)): catalog_ids = [catalog_ids] * len(catalogs) if not catalog_orgs or isinstance(catalog_orgs, string_types + (dict,)): catalog_orgs = [catalog_orgs] * len(catalogs) if not catalog_homepages or isinstance(catalog_homepages, string_types + (dict,)): catalog_homepages = [catalog_homepages] * len(catalogs) catalogs_reports = [ self.catalog_report( catalog, harvest, report, catalog_id=catalog_id, catalog_homepage=catalog_homepage, catalog_org=catalog_org ) for catalog, catalog_id, catalog_org, catalog_homepage in zip(catalogs, catalog_ids, catalog_orgs, catalog_homepages) ] full_report = [] for report in catalogs_reports: full_report.extend(report) if export_path: # config styles para reportes en excel alignment = Alignment( wrap_text=True, shrink_to_fit=True, vertical="center" ) column_styles = { "dataset_title": {"width": 35}, "dataset_description": {"width": 35}, "dataset_publisher_name": {"width": 35}, "dataset_issued": {"width": 20}, "dataset_modified": {"width": 20}, "distributions_formats": {"width": 15}, "distributions_list": {"width": 90}, "notas": {"width": 50}, } cell_styles = [ {"alignment": Alignment(vertical="center")}, {"row": 1, "font": Font(bold=True)}, {"col": "dataset_title", "alignment": alignment}, {"col": "dataset_description", "alignment": alignment}, {"col": "dataset_publisher_name", "alignment": alignment}, {"col": "distributions_formats", "alignment": alignment}, {"col": "distributions_list", "alignment": alignment}, {"col": "notas", "alignment": alignment}, ] # crea tabla writers.write_table(table=full_report, path=export_path, column_styles=column_styles, cell_styles=cell_styles) else: return full_report
[documentos] def generate_harvester_config(self, catalogs=None, harvest='valid', report=None, export_path=None): """Genera un archivo de configuración del harvester a partir de un reporte, o de un conjunto de catálogos y un criterio de cosecha (`harvest`). Args: catalogs (str, dict o list): Uno (str o dict) o varios (list de strs y/o dicts) catálogos. harvest (str): Criterio para determinar qué datasets incluir en el archivo de configuración generado ('all', 'none', 'valid', 'report' o 'good'). report (list o str): Tabla de reporte generada por generate_datasets_report() como lista de diccionarios o archivo en formato XLSX o CSV. Sólo se usa cuando `harvest=='report'`, en cuyo caso `catalogs` se ignora. export_path (str): Path donde exportar el reporte generado (en formato XLSX o CSV). Si se especifica, el método no devolverá nada. Returns: list of dicts: Un diccionario con variables de configuración por cada dataset a cosechar. """ # Si se pasa un único catálogo, genero una lista que lo contenga if isinstance(catalogs, string_types + (dict,)): catalogs = [catalogs] if harvest == 'report': if not report: raise ValueError(""" Usted eligio 'report' como criterio de harvest, pero no proveyo un valor para el argumento 'report'. Por favor, intentelo nuevamente.""") datasets_report = readers.read_table(report) elif harvest in ['valid', 'none', 'all']: # catalogs no puede faltar para estos criterios assert isinstance(catalogs, string_types + (dict, list)) datasets_report = self.generate_datasets_report(catalogs, harvest) else: raise ValueError(""" {} no es un criterio de harvest reconocido. Pruebe con 'all', 'none', 'valid' o 'report'.""".format(harvest)) # define los campos del reporte que mantiene para el config file config_keys = [ "catalog_federation_id", "catalog_federation_org", "dataset_identifier" ] # cambia algunos nombres de estos campos para el config file config_translator = { "catalog_federation_id": "catalog_id", "catalog_federation_org": "dataset_organization" } translated_keys = [config_translator.get(k, k) for k in config_keys] harvester_config = [ OrderedDict( # Retengo únicamente los campos que necesita el harvester [(config_translator.get(k, k), v) for (k, v) in dataset.items() if k in config_keys] ) # Para aquellost datasets marcados con 'harvest'==1 for dataset in datasets_report if bool(int(dataset["harvest"])) ] # chequea que el archivo de configuración tiene todos los campos required_keys = set(translated_keys) for row in harvester_config: row_keys = set(row.keys()) msg = "Hay una fila con claves {} y debe tener claves {}".format( row_keys, required_keys) assert row_keys == required_keys, msg if export_path: writers.write_table(harvester_config, export_path) else: return harvester_config
[documentos] def generate_harvestable_catalogs(self, catalogs, harvest='all', report=None, export_path=None): """Filtra los catálogos provistos según el criterio determinado en `harvest`. Args: catalogs (str, dict o list): Uno (str o dict) o varios (list de strs y/o dicts) catálogos. harvest (str): Criterio para determinar qué datasets conservar de cada catálogo ('all', 'none', 'valid' o 'report'). report (list o str): Tabla de reporte generada por generate_datasets_report() como lista de diccionarios o archivo en formato XLSX o CSV. Sólo se usa cuando `harvest=='report'`. export_path (str): Path a un archivo JSON o directorio donde exportar los catálogos filtrados. Si termina en ".json" se exportará la lista de catálogos a un único archivo. Si es un directorio, se guardará en él un JSON por catálogo. Si se especifica `export_path`, el método no devolverá nada. Returns: list of dicts: Lista de catálogos. """ assert isinstance(catalogs, string_types + (dict, list)) # Si se pasa un único catálogo, genero una lista que lo contenga if isinstance(catalogs, string_types + (dict,)): catalogs = [catalogs] harvestable_catalogs = [readers.read_catalog(c) for c in catalogs] catalogs_urls = [catalog if isinstance(catalog, string_types) else None for catalog in catalogs] # aplica los criterios de cosecha if harvest == 'all': pass elif harvest == 'none': for catalog in harvestable_catalogs: catalog["dataset"] = [] elif harvest == 'valid': report = self.generate_datasets_report(catalogs, harvest) return self.generate_harvestable_catalogs( catalogs=catalogs, harvest='report', report=report, export_path=export_path) elif harvest == 'report': if not report: raise ValueError(""" Usted eligio 'report' como criterio de harvest, pero no proveyo un valor para el argumento 'report'. Por favor, intentelo nuevamente.""") datasets_to_harvest = self._extract_datasets_to_harvest(report) for idx_cat, catalog in enumerate(harvestable_catalogs): catalog_url = catalogs_urls[idx_cat] if ("dataset" in catalog and isinstance(catalog["dataset"], list)): catalog["dataset"] = [ dataset for dataset in catalog["dataset"] if (catalog_url, dataset.get("title")) in datasets_to_harvest ] else: catalog["dataset"] = [] else: raise ValueError(""" {} no es un criterio de harvest reconocido. Pruebe con 'all', 'none', 'valid' o 'report'.""".format(harvest)) # devuelve los catálogos harvesteables if export_path and os.path.isdir(export_path): # Creo un JSON por catálogo for idx, catalog in enumerate(harvestable_catalogs): filename = os.path.join(export_path, "catalog_{}".format(idx)) writers.write_json(catalog, filename) elif export_path: # Creo un único JSON con todos los catálogos writers.write_json(harvestable_catalogs, export_path) else: return harvestable_catalogs
[documentos] def generate_datasets_summary(self, catalog, export_path=None): """Genera un informe sobre los datasets presentes en un catálogo, indicando para cada uno: - Índice en la lista catalog["dataset"] - Título - Identificador - Cantidad de distribuciones - Estado de sus metadatos ["OK"|"ERROR"] Es utilizada por la rutina diaria de `libreria-catalogos` para reportar sobre los datasets de los catálogos mantenidos. Args: catalog (str o dict): Path a un catálogo en cualquier formato, JSON, XLSX, o diccionario de python. export_path (str): Path donde exportar el informe generado (en formato XLSX o CSV). Si se especifica, el método no devolverá nada. Returns: list: Contiene tantos dicts como datasets estén presentes en `catalogs`, con los datos antes mencionados. """ catalog = readers.read_catalog(catalog) # Trato de leer todos los datasets bien formados de la lista # catalog["dataset"], si existe. if "dataset" in catalog and isinstance(catalog["dataset"], list): datasets = [d if isinstance(d, dict) else {} for d in catalog["dataset"]] else: # Si no, considero que no hay datasets presentes datasets = [] validation = self.validate_catalog(catalog)["error"]["dataset"] def info_dataset(index, dataset): """Recolecta información básica de un dataset.""" info = OrderedDict() info["indice"] = index info["titulo"] = dataset.get("title") info["identificador"] = dataset.get("identifier") info["estado_metadatos"] = validation[index]["status"] info["cant_errores"] = len(validation[index]["errors"]) info["cant_distribuciones"] = len(dataset["distribution"]) return info summary = [info_dataset(i, ds) for i, ds in enumerate(datasets)] if export_path: writers.write_table(summary, export_path) else: return summary
[documentos] def generate_catalog_readme(self, catalog, export_path=None): """Genera una descripción textual en formato Markdown sobre los metadatos generales de un catálogo (título, editor, fecha de publicación, et cetera), junto con: - estado de los metadatos a nivel catálogo, - estado global de los metadatos, - cantidad de datasets federados y no federados, - detalles de los datasets no federados - cantidad de datasets y distribuciones incluidas Es utilizada por la rutina diaria de `libreria-catalogos` para generar un README con información básica sobre los catálogos mantenidos. Args: catalog (str o dict): Path a un catálogo en cualquier formato, JSON, XLSX, o diccionario de python. export_path (str): Path donde exportar el texto generado (en formato Markdown). Si se especifica, el método no devolverá nada. Returns: str: Texto de la descripción generada. """ # Si se paso una ruta, guardarla if isinstance(catalog, string_types): catalog_path_or_url = catalog else: catalog_path_or_url = None catalog = readers.read_catalog(catalog) validation = self.validate_catalog(catalog) # Solo necesito indicadores para un catalogo indicators = self.generate_catalogs_indicators( catalog, CENTRAL_CATALOG)[0][0] readme_template = """ # Catálogo: {title} ## Información General - **Autor**: {publisher_name} - **Correo Electrónico**: {publisher_mbox} - **Ruta del catálogo**: {catalog_path_or_url} - **Nombre del catálogo**: {title} - **Descripción**: > {description} ## Estado de los metadatos y cantidad de recursos - **Estado metadatos globales**: {global_status} - **Estado metadatos catálogo**: {catalog_status} - **Cantidad Total de Datasets**: {no_of_datasets} - **Cantidad Total de Distribuciones**: {no_of_distributions} - **Cantidad de Datasets Federados**: {federated_datasets} - **Cantidad de Datasets NO Federados**: {not_federated_datasets} - **Porcentaje de Datasets NO Federados**: {not_federated_datasets_pct}% ## Datasets federados que fueron eliminados en el nodo original {federated_removed_datasets_list} ## Datasets no federados {not_federated_datasets_list} ## Datasets federados {federated_datasets_list} ## Reporte Por favor, consulte el informe [`datasets.csv`](datasets.csv). """ not_federated_datasets_list = "\n".join([ "- [{}]({})".format(dataset[0], dataset[1]) for dataset in indicators["datasets_no_federados"] ]) federated_removed_datasets_list = "\n".join([ "- [{}]({})".format(dataset[0], dataset[1]) for dataset in indicators["datasets_federados_eliminados"] ]) federated_datasets_list = "\n".join([ "- [{}]({})".format(dataset[0], dataset[1]) for dataset in indicators["datasets_federados"] ]) content = { "title": catalog.get("title"), "publisher_name": helpers.traverse_dict( catalog, ["publisher", "name"]), "publisher_mbox": helpers.traverse_dict( catalog, ["publisher", "mbox"]), "catalog_path_or_url": catalog_path_or_url, "description": catalog.get("description"), "global_status": validation["status"], "catalog_status": validation["error"]["catalog"]["status"], "no_of_datasets": len(catalog["dataset"]), "no_of_distributions": sum([len(dataset["distribution"]) for dataset in catalog["dataset"]]), "federated_datasets": indicators["datasets_federados_cant"], "not_federated_datasets": indicators["datasets_no_federados_cant"], "not_federated_datasets_pct": ( 100.0 - indicators["datasets_federados_pct"]), "not_federated_datasets_list": not_federated_datasets_list, "federated_removed_datasets_list": federated_removed_datasets_list, "federated_datasets_list": federated_datasets_list, } catalog_readme = readme_template.format(**content) if export_path: with io.open(export_path, 'w', encoding='utf-8') as target: target.write(catalog_readme) else: return catalog_readme
@classmethod def _extract_datasets_to_harvest(cls, report): """Extrae de un reporte los datos necesarios para reconocer qué datasets marcar para cosecha en cualquier generador. Args: report (str o list): Reporte (lista de dicts) o path a uno. Returns: list: Lista de tuplas con los títulos de catálogo y dataset de cada reporte extraído. """ assert isinstance(report, string_types + (list,)) # Si `report` es una lista de tuplas con longitud 2, asumimos que es un # reporte procesado para extraer los datasets a harvestear. Se devuelve # intacta. if (isinstance(report, list) and all([isinstance(x, tuple) and len(x) == 2 for x in report])): return report table = readers.read_table(report) table_keys = table[0].keys() expected_keys = ["catalog_metadata_url", "dataset_title", "dataset_accrualPeriodicity"] # Verifico la presencia de las claves básicas de un config de harvester for key in expected_keys: if key not in table_keys: raise KeyError(""" El reporte no contiene la clave obligatoria {}. Pruebe con otro archivo. """.format(key)) if "harvest" in table_keys: # El archivo es un reporte de datasets. datasets_to_harvest = [ (row["catalog_metadata_url"], row["dataset_title"]) for row in table if int(row["harvest"])] else: # El archivo es un config de harvester. datasets_to_harvest = [ (row["catalog_metadata_url"], row["dataset_title"]) for row in table] return datasets_to_harvest
[documentos] def generate_catalogs_indicators(self, catalogs=None, central_catalog=None): catalogs = catalogs or self return indicators.generate_catalogs_indicators( catalogs, central_catalog, validator=self.validator)
@staticmethod def _count_distribution_formats_dataset(dataset): formats = {} for distribution in dataset['distribution']: # 'format' es recomendado, no obligatorio. Puede no estar. distribution_format = distribution.get('format', None) if distribution_format: # Si no está en el diccionario, devuelvo 0 count = formats.get(distribution_format, 0) formats[distribution_format] = count + 1 return formats def _count_fields_recursive(self, dataset, fields): """Cuenta la información de campos optativos/recomendados/requeridos desde 'fields', y cuenta la ocurrencia de los mismos en 'dataset'. Args: dataset (dict): diccionario con claves a ser verificadas. fields (dict): diccionario con los campos a verificar en dataset como claves, y 'optativo', 'recomendado', o 'requerido' como valores. Puede tener objetios anidados pero no arrays. Returns: dict: diccionario con las claves 'recomendado', 'optativo', 'requerido', 'recomendado_total', 'optativo_total', 'requerido_total', con la cantidad como valores. """ key_count = { 'recomendado': 0, 'optativo': 0, 'requerido': 0, 'total_optativo': 0, 'total_recomendado': 0, 'total_requerido': 0 } for k, v in fields.items(): # Si la clave es un diccionario se implementa recursivamente el # mismo algoritmo if isinstance(v, dict): # dataset[k] puede ser o un dict o una lista, ej 'dataset' es # list, 'publisher' no. Si no es lista, lo metemos en una. # Si no es ninguno de los dos, dataset[k] es inválido # y se pasa un diccionario vacío para poder comparar elements = dataset.get(k) if not isinstance(elements, (list, dict)): elements = [{}] if isinstance(elements, dict): elements = [dataset[k].copy()] for element in elements: # Llamada recursiva y suma del resultado al nuestro result = self._count_fields_recursive(element, v) for key in result: key_count[key] += result[key] # Es un elemento normal (no iterable), se verifica si está en # dataset o no. Se suma 1 siempre al total de su tipo else: # total_requerido, total_recomendado, o total_optativo key_count['total_' + v] += 1 if k in dataset: key_count[v] += 1 return key_count
[documentos] def dataset_is_updated(self, catalog, dataset): catalog = readers.read_catalog(catalog) for catalog_dataset in catalog.get('dataset', []): if catalog_dataset.get('title') == dataset: periodicity = catalog_dataset.get('accrualPeriodicity') if not periodicity: return False if periodicity == 'eventual': return True if "modified" not in catalog_dataset: return False date = helpers.parse_date_string(catalog_dataset['modified']) days_diff = float((datetime.now() - date).days) interval = helpers.parse_repeating_time_interval(periodicity) if days_diff < interval: return True return False return False
[documentos] def generate_dataset_documentation(self, dataset_identifier, export_path=None, catalog=None): """Genera texto en markdown a partir de los metadatos de una `dataset`. Args: dataset_identifier (str): Identificador único de un dataset. export_path (str): Path donde exportar el texto generado. Si se especifica, el método no devolverá nada. catalog (dict, str o unicode): Representación externa (path/URL) o interna (dict) de un catálogo. Si no se especifica se usa el catálogo cargado en `self` (el propio objeto DataJson). Returns: str: Texto que describe una `dataset`. """ catalog = DataJson(catalog) or self dataset = catalog.get_dataset(dataset_identifier) text = documentation.dataset_to_markdown(dataset) if export_path: with open(export_path, "wb") as f: f.write(text.encode("utf-8")) else: return text
[documentos] def make_catalogs_backup(self, catalogs=None, local_catalogs_dir=".", copy_metadata=True, copy_data=True): """Realiza copia de los datos y metadatos de uno o más catálogos. Args: catalogs (list or dict): Lista de catálogos (elementos que pueden ser interpretados por DataJson como catálogos) o diccionario donde las keys se interpretan como los catalog_identifier: { "modernizacion": "http://infra.datos.gob.ar/catalog/modernizacion/data.json" } Cuando es una lista, los ids se toman de catalog_identifer, y se ignoran los catálogos que no tengan catalog_identifier. Cuando se pasa un diccionario, los keys reemplazan a los catalog_identifier (estos no se leeen). local_catalogs_dir (str): Directorio local en el cual se va a crear la carpeta "catalog/..." con todos los catálogos. copy_metadata (bool): Si es verdadero, se generan los archivos data.json y catalog.xlsx. copy_data (bool): Si es verdadero, se descargan todas las distribuciones de todos los catálogos. Return: None """ # TODO: implementar función pass
[documentos]def main(): """Permite ejecutar el módulo por línea de comandos. Valida un path o url a un archivo data.json devolviendo True/False si es válido y luego el resultado completo. Example: python pydatajson.py http://181.209.63.71/data.json python pydatajson.py ~/github/pydatajson/tests/samples/full_data.json """ try: datajson_file = sys.argv[1] dj_instance = DataJson() bool_res = dj_instance.is_valid_catalog(datajson_file) full_res = dj_instance.validate_catalog(datajson_file) pretty_full_res = json.dumps( full_res, indent=4, separators=(",", ": ")) print(bool_res) print(pretty_full_res) except IndexError as errmsg: format_str = """ {}: pydatajson.py fue ejecutado como script sin proveer un argumento """ print(format_str.format(errmsg))
if __name__ == '__main__': main()