#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Extensión de pydatajson para leer catálogos de metadatos a través de la API
de CKAN v3.
"""
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import with_statement
import os.path
import logging
import json
from urlparse import urljoin
from ckanapi import RemoteCKAN
from helpers import clean_str, title_to_name
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
ABSOLUTE_PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(ABSOLUTE_PROJECT_DIR, "schemas",
"accrualPeriodicity.json")) as frequencies:
RAW_FREQUENCIES = json.load(frequencies)
FREQUENCIES = {row["description"]: row["id"] for row in RAW_FREQUENCIES}
with open(os.path.join(ABSOLUTE_PROJECT_DIR, "schemas",
"superThemeTaxonomy.json")) as super_themes:
RAW_SUPER_THEMES = json.load(super_themes)
SUPER_THEMES = {row["label"]: row["id"] for row in RAW_SUPER_THEMES}
logging.basicConfig(format='%(asctime)s [%(levelname)s]: %(message)s',
datefmt='%m/%d/%Y %I:%M:%S')
[documentos]def read_ckan_catalog(portal_url):
"""Convierte los metadatos de un portal disponibilizados por la Action API
v3 de CKAN al estándar data.json.
Args:
portal_url (str): URL de un portal de datos CKAN que soporte la API v3.
Returns:
dict: Representación interna de un catálogo para uso en las funciones
de esta librería.
"""
portal = RemoteCKAN(portal_url)
catalog = {}
try:
status = portal.call_action(
'status_show', requests_kwargs={"verify": False})
packages_list = portal.call_action(
'package_list', requests_kwargs={"verify": False})
groups_list = portal.call_action(
'group_list', requests_kwargs={"verify": False})
packages = [portal.call_action(
'package_show', {'name_or_id': pkg},
requests_kwargs={"verify": False})
for pkg in packages_list]
groups = [portal.call_action(
'group_show', {'id': grp},
requests_kwargs={"verify": False})
for grp in groups_list]
catalog = map_status_to_catalog(status)
catalog["dataset"] = map_packages_to_datasets(
packages, portal_url)
catalog["themeTaxonomy"] = map_groups_to_themes(groups)
except:
logging.error(
'Error al procesar el portal %s', portal_url, exc_info=True)
return catalog
[documentos]def map_status_to_catalog(status):
"""Convierte el resultado de action.status_show() en metadata a nivel de
catálogo."""
catalog = dict()
catalog_mapping = {
"site_title": "title",
"site_description": "description"
}
for status_key, catalog_key in catalog_mapping.iteritems():
try:
catalog[catalog_key] = status[status_key]
except:
logging.info("""
La clave '%s' no está en el endpoint de status. No se puede completar
catalog['%s'].""", status_key, catalog_key)
publisher_mapping = {
"site_title": "name",
"error_emails_to": "mbox"
}
if any([k in status for k in publisher_mapping.keys()]):
catalog["publisher"] = dict()
for status_key, publisher_key in publisher_mapping.iteritems():
try:
catalog['publisher'][publisher_key] = status[status_key]
except:
logging.info("""
La clave '%s' no está en el endpoint de status. No se puede completar
catalog['publisher'['%s'].""", status_key, publisher_key)
else:
logging.info("""
No hay ninguna información sobre catalog['publisher'] en el endpoint
de 'status'.""")
catalog['superThemeTaxonomy'] = (
'http://datos.gob.ar/superThemeTaxonomy.json')
return catalog
[documentos]def map_packages_to_datasets(packages, portal_url):
"""Mapea una lista de 'packages' de CKAN a 'datasets' de data.json."""
return [map_package_to_dataset(pkg, portal_url)
for pkg in packages]
[documentos]def map_package_to_dataset(package, portal_url):
"""Mapea un diccionario con metadatos de cierto 'package' de CKAN a un
diccionario con metadatos de un 'dataset' según el estándar data.json."""
dataset = dict()
resources = package["resources"]
groups = package["groups"]
tags = package["tags"]
dataset_mapping = {
'title': 'title',
'notes': 'description',
'metadata_created': 'issued',
'metadata_modified': 'modified',
'license_title': 'license',
'id': 'identifier',
'url': 'landingPage'
}
for package_key, dataset_key in dataset_mapping.iteritems():
try:
dataset[dataset_key] = package[package_key]
except:
logging.info("""
La clave '%s' no está en el endpoint 'package_show' para el package '%s'. No
se puede completar dataset['%s'].""",
package_key, package['name'], dataset_key)
publisher_mapping = {
'author': 'name',
'author_email': 'mbox'
}
if any([k in package for k in publisher_mapping.keys()]):
dataset["publisher"] = dict()
for package_key, publisher_key in publisher_mapping.iteritems():
try:
dataset['publisher'][publisher_key] = package[package_key]
except:
logging.info("""
La clave '%s' no está en el endpoint 'package_show' para el package '%s'. No
se puede completar dataset['publisher']['%s'].""",
package_key, package['name'], publisher_key)
contact_point_mapping = {
'maintainer': 'fn',
'maintainer_email': 'hasEmail'
}
if any([k in package for k in contact_point_mapping.keys()]):
dataset["contactPoint"] = dict()
for package_key, contact_key in contact_point_mapping.iteritems():
try:
dataset['contactPoint'][contact_key] = package[package_key]
except:
logging.info("""
La clave '%s' no está en el endpoint 'package_show' para el package '%s'. No
se puede completar dataset['contactPoint']['%s'].""",
package_key, package['name'], contact_key)
# Si existen campos extras en la información del package, busco las claves
# "Frecuencia de actualización" y "Temática global" para completar los
# campos "accrualPeriodicity" y "superTheme" del dataset, respectivamente.
if "extras" in package:
add_accrualPeriodicity(dataset, package)
add_superTheme(dataset, package)
add_temporal(dataset, package)
dataset["distribution"] = map_resources_to_distributions(resources,
portal_url)
dataset["theme"] = [grp['name'] for grp in groups]
dataset['keyword'] = [tag['name'] for tag in tags]
return dataset
[documentos]def add_temporal(dataset, package):
# "Cobertura temporal" => "temporal"
temporal = [
extra["value"] for extra in package["extras"] if
title_to_name(extra["key"]) == title_to_name("Cobertura temporal")
]
if len(temporal) > 1:
logging.info("""
Se encontro mas de un valor de cobertura temporal en 'extras' para el
'package' '%s'. No se puede completar dataset['temporal'].\n %s""",
package['name'], temporal)
elif len(temporal) == 1:
try:
dataset["temporal"] = temporal[0]
except KeyError:
logging.warn("""
Se encontró '%s' como cobertura temporal, pero no es mapeable a un
'temporal' conocido. La clave no se pudo completar.""", temporal[0])
# Busco claves que son casi "Cobertura temporal" para lanzar
# advertencias si las hay.
almost_temporal = [
extra for extra in package["extras"] if
clean_str(extra["key"]) == "cobertura temporal" and
extra["key"] != "Cobertura temporal"]
if almost_temporal:
logging.warn("""
Se encontraron claves con nombres similares pero no idénticos a
"Cobertura temporal" en 'extras' para el 'package' '%s'. Por favor, considere
corregirlas:
\n%s""", package['name'], almost_temporal)
[documentos]def add_superTheme(dataset, package):
# "Temática global" => "superTheme"
super_theme = [
extra["value"] for extra in package["extras"] if
title_to_name(extra["key"]) == title_to_name("Temática global")
]
if len(super_theme) == 0:
logging.info("""
No se encontraron valores de temática global en 'extras' para el
'package' '%s'. No se puede completar dataset['superTheme'].""",
package['name'])
elif len(super_theme) > 1:
logging.info("""
Se encontro mas de un valor de temática global en 'extras' para el
'package' '%s'. No se puede completar dataset['superTheme'].\n %s""",
package['name'], super_theme)
else:
try:
dataset["superTheme"] = [SUPER_THEMES[super_theme[0]]]
except KeyError:
logging.warn("""
Se encontró '%s' como temática global, pero no es mapeable a un
'superTheme' conocido. La clave no se pudo completar.""", super_theme[0])
# Busco claves que son casi "Temática global" para lanzar
# advertencias si las hay.
almost_super_theme = [
extra for extra in package["extras"] if
clean_str(extra["key"]) == "tematica global" and
extra["key"] != "Temática global"]
if almost_super_theme:
logging.warn("""
Se encontraron claves con nombres similares pero no idénticos a "Temática
global" en 'extras' para el 'package' '%s'. Por favor, considere corregirlas:
\n%s""", package['name'], almost_accrual)
[documentos]def add_accrualPeriodicity(dataset, package):
# "Frecuencia de actualización" => "accrualPeriodicity"
accrual = [
extra["value"] for extra in package["extras"] if
title_to_name(extra["key"]) == title_to_name(
"Frecuencia de actualización")
]
if len(accrual) == 0:
logging.info("""
No se encontraron valores de frecuencia de actualización en 'extras' para el
'package' '%s'. No se puede completar dataset['accrualPeriodicity'].""",
package['name'])
elif len(accrual) > 1:
logging.info("""
Se encontro mas de un valor de frecuencia de actualización en 'extras' para el
'package' '%s'. No se puede completar dataset['accrualPeriodicity'].\n %s""",
package['name'], accrual)
else:
try:
dataset["accrualPeriodicity"] = FREQUENCIES[accrual[0]]
except KeyError:
logging.warn("""
Se encontró '%s' como frecuencia de actualización, pero no es mapeable a una
'accrualPeriodicity' conocida. La clave no se pudo completar.""", accrual[0])
# Busco claves que son casi "Frecuencia de actualización" para lanzar
# advertencias si las hay.
almost_accrual = [
extra for extra in package["extras"] if
clean_str(extra["key"]) == "frecuencia de actualizacion" and
extra["key"] != "Frecuencia de actualización"]
if almost_accrual:
logging.warn("""
Se encontraron claves con nombres similares pero no idénticos a "Frecuencia de
actualización" en 'extras' para el 'package' '%s'. Por favor, considere
corregirlas:\n%s""", package['name'], almost_accrual)
[documentos]def map_resources_to_distributions(resources, portal_url):
"""Mapea una lista de 'resources' CKAN a 'distributions' de data.json."""
return [map_resource_to_distribution(res, portal_url) for res in resources]
[documentos]def map_resource_to_distribution(resource, portal_url):
"""Mapea un diccionario con metadatos de cierto 'resource' CKAN a dicts
con metadatos de una 'distribution' según el estándar data.json."""
distribution = dict()
distribution_mapping = {
'url': 'downloadURL',
'name': 'title',
'created': 'issued',
'description': 'description',
'format': 'format',
'last_modified': 'modified',
'mimetype': 'mediaType',
'size': 'byteSize',
'id': 'identifier' # No es parte del estandar de PAD pero es relevante
}
for resource_key, distribution_key in distribution_mapping.iteritems():
try:
distribution[distribution_key] = resource[resource_key]
except:
logging.info("""
La clave '%s' no está en la metadata del 'resource' '%s'. No
se puede completar distribution['%s'].""",
resource_key, resource['name'], distribution_key)
url_path = ['dataset', resource['package_id'], 'resource', resource['id']]
distribution["accessURL"] = urljoin(portal_url, "/".join(url_path))
return distribution
[documentos]def map_groups_to_themes(groups):
"""Mapea una lista de 'groups' de CKAN a 'themes' de data.json."""
return [map_group_to_theme(grp) for grp in groups]
[documentos]def map_group_to_theme(group):
"""Mapea un diccionario con metadatos de cierto 'group' de CKAN a un
diccionario con metadatos de un 'theme' según el estándar data.json."""
theme = dict()
theme_mapping = {
'name': 'id',
'display_name': 'label',
'description': 'description'
}
for group_key, theme_key in theme_mapping.iteritems():
try:
theme[theme_key] = group[group_key]
except:
logging.info("""
La clave '%s' no está en la metadata del 'group' '%s'. No
se puede completar theme['%s'].""",
group_key, theme['name'], theme_key)
return theme