#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: hashivaultlib.py
#
# Copyright 2018 Costas Tyfoxylos
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
#
"""
Main code for hashivaultlib.
.. _Google Python Style Guide:
http://google.github.io/styleguide/pyguide.html
"""
import concurrent.futures
import json
import logging
from datetime import timedelta
from pathlib import PurePosixPath
from dateutil.parser import parse
from hvac import Client
from hvac.exceptions import InvalidPath
__author__ = '''Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''2018-05-25'''
__copyright__ = '''Copyright 2018, Costas Tyfoxylos'''
__credits__ = ["Costas Tyfoxylos"]
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development''' # "Prototype", "Development", "Production".
# This is the main prefix used for logging
LOGGER_BASENAME = '''hashivaultlib'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())
[docs]class Vault(Client):
"""Extends the hvac client for vault with some extra handy usability."""
def __init__(self, *args, **kwargs):
super(Vault, self).__init__(*args, **kwargs)
logger_name = u'{base}.{suffix}'.format(base=LOGGER_BASENAME,
suffix=self.__class__.__name__)
self._logger = logging.getLogger(logger_name)
self.secrets.kv.v1.delete_path = self.delete_path
self.secrets.kv.v1.retrieve_secrets_from_path = self.retrieve_secrets_from_path
self.secrets.kv.v1.restore_secrets = self.restore_secrets
self.secrets.kv.v2.delete_path = self._delete_path_v2
self.secrets.kv.v2.retrieve_secrets_from_path = self._retrieve_secrets_from_path_v2
self.secrets.kv.v2.restore_secrets = self._restore_secrets_v2
[docs] def delete_path(self, path):
"""Deletes recursively a path from vault.
Args:
path: The path to remove
"""
try:
subdirs = self.list(path).get('data', {}).get('keys')
for subdir in subdirs:
self.delete_path(PurePosixPath(path, subdir))
self._logger.info('Deleting directory %s', path)
self.delete(path)
except AttributeError:
self._logger.info('Deleting secret %s', path)
self.delete(path)
def _delete_path_v2(self, path, mount_point):
"""Deletes recursively a path from vault using v2 engine.
Args:
path: The path to remove
mount_point: Mountpoint for path
"""
secrets = self._retrieve_secrets_from_path_v2(path=path, mount_point=mount_point)
for secret in secrets:
self._logger.info('Deleting %s', secret)
self.secrets.kv.v2.delete_metadata_and_all_versions(path=secret.get('original_path', ''),
mount_point=mount_point)
[docs] def retrieve_secrets_from_path(self, path):
"""Retrieves recursively all the secrets from a path in vault.
Args:
path: The path to retrieve all the secrets for
"""
secrets = []
def recurse(vault, path):
"""Recurses through a path."""
try:
subdirs = vault.list(path).get('data', {}).get('keys')
for subdir in subdirs:
recurse(vault, PurePosixPath(path, subdir))
vault._logger.info('Reached directory %s', path) # pylint: disable=protected-access
except AttributeError:
vault._logger.info('Extracting secret %s', path) # pylint: disable=protected-access
secret = vault.read(path)
secret['original_path'] = path
secrets.append(secret)
recurse(self, path)
return secrets
def _retrieve_secrets_from_path_v2(self, path, mount_point):
"""Retrieves recursively all the secrets from a path in vault using v2 engine.
Args:
path: The path to retrieve all the secrets for
mount_point: Mountpoint for path
"""
secrets = []
def recurse(vault, path):
"""Recurses through a path."""
try:
subdirs = vault.secrets.kv.v2.list_secrets(path=path,
mount_point=mount_point).get('data', {}).get('keys')
for subdir in subdirs:
recurse(vault, PurePosixPath(path, subdir))
vault._logger.info('Reached directory %s', path) # pylint: disable=protected-access
except InvalidPath:
vault._logger.info('Extracting secret %s', path) # pylint: disable=protected-access
secret = vault.secrets.kv.v2.read_secret_version(path=path,
mount_point=mount_point)
secret['original_path'] = path
secrets.append(secret)
recurse(self, path)
return secrets
[docs] def restore_secrets(self, secrets):
"""Restores secrets to vault in their original path.
Args:
secrets: List of secret dictionaries with "original_path" attribute set
Returns:
True on success, False otherwise
"""
if not isinstance(secrets, (list, tuple)):
self._logger.error('Please provide a list or tuple of secrets to restore.')
return False
for secret in secrets:
path = secret.get('original_path')
if not path:
self._logger.error('No "original_path" found, cannot restore.')
continue
data = secret.get('data')
self._logger.info('Adding secrets to path %s', path)
self.write(path, **data)
return True
def _restore_secrets_v2(self, secrets, mount_point):
"""Restores secrets to vault in their original path using v2 engine.
Args:
secrets: List of secret dictionaries with "original_path" attribute set
mount_point: Mountpoint for path
Returns:
True on success, False otherwise
"""
if not isinstance(secrets, (list, tuple)):
self._logger.error('Please provide a list or tuple of secrets to restore.')
return False
for secret in secrets:
path = secret.get('original_path')
if not path:
self._logger.error('No "original_path" found, cannot restore.')
continue
data = secret.get('data', {}).get('data')
self._logger.info('Adding secrets to path %s', path)
self.secrets.kv.v2.create_or_update_secret(mount_point=mount_point,
path=path,
secret=data)
return True
@property
def _token_accessors(self):
headers = {'X-Vault-Token': self.token}
url = '{host}/v1/auth/token/accessors?vaultaddr={host}&list=true'.format(host=self.url)
response = self.session.get(url, headers=headers)
if not response.ok:
self._logger.error('Error retrieving accessors.')
return response.json().get('data', {}).get('keys', []) if response.ok else None
@property
def tokens(self):
"""Models the tokens of a vault installation.
Returns:
list:All tokens of a vault in a Token object format
"""
headers = {'X-Vault-Token': self.token}
url = '{host}/v1/auth/token/lookup-accessor?vaultaddr={host}'.format(host=self.url)
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
futures = [executor.submit(self.session.post,
url,
headers=headers,
data=json.dumps({"accessor": accessor}))
for accessor in self._token_accessors]
for future in concurrent.futures.as_completed(futures):
try:
response = future.result()
response_data = response.json()
response.close()
yield TokenFactory(self, response_data)
except Exception: # pylint: disable=broad-except
self._logger.exception('Future failed...')
[docs]class TokenFactory: # pylint: disable=too-few-public-methods
"""Factory to create the appropriate Token type."""
def __new__(cls, vault_instance, data):
try:
if 'errors' in data.keys():
token = BrokenToken(vault_instance, data)
else:
token = Token(vault_instance, data)
except (AttributeError, TypeError):
vault_instance._logger.error('Response for token seems broken, got :%s', data)
return token
[docs]class Token: # pylint: disable=too-many-public-methods
"""Models a vault token and provides delete capabilities."""
def __init__(self, vault_instance, data):
self._vault = vault_instance
self._data = data
@property
def raw_data(self):
"""The raw data of the token.
Returns:
dict: The raw data of the token
"""
return self._data
@property
def auth(self):
"""Auth data for the token.
Returns:
The auth data
"""
return self._data.get('auth')
@staticmethod
def _seconds_to_day_format(seconds_):
str(timedelta(seconds=int(seconds_)))
@property
def lease_duration(self):
"""The duration of the lease of the token.
Returns:
string: The duration of the lease of the token
"""
return self._data.get('lease_duration')
@property
def lease_id(self):
"""The lease ID.
Returns:
string: The lease ID
"""
return self._data.get('lease_id')
@property
def renewable(self):
"""A flag on whether the token is renewable.
Returns:
bool: True if token is renewable, False otherwise
"""
return self._data.get('renewable')
@property
def request_id(self):
"""The id of the request for the token.
Returns:
string: The id of the request for the token
"""
return self._data.get('request_id')
@property
def warnings(self):
"""The warnings of the token.
Returns:
The warnings of the token
"""
return self._data.get('warnings')
@property
def wrap_info(self):
"""The wrap info of the token.
Returns:
The wrap info of the token
"""
return self._data.get('wrap_info')
@property
def accessor(self):
"""The accessor token of the token.
Returns:
string: The accessor token of the token
"""
return self._data.get('data', {}).get('accessor')
@property
def creation_time(self):
"""The creation time of the token in seconds.
Returns:
string: The creation time of the token in seconds
"""
return self._data.get('data', {}).get('creation_time')
@property
def creation_time_day_format(self):
"""The creation time of the token in a day duration format.
Returns:
string: The creation time of the token in a day duration format
"""
return self._seconds_to_day_format(self.creation_time)
@property
def creation_ttl(self):
"""The creation ttl of the token in seconds.
Returns:
string: The creation ttl of the token in seconds
"""
return self._data.get('data', {}).get('creation_ttl')
@property
def creation_ttl_day_format(self):
"""The creation ttl of the token in a day duration format.
Returns:
string: The creation ttl of the token in a day duration format
"""
return self._seconds_to_day_format(self.creation_ttl)
@property
def display_name(self):
"""The display name of the token.
Returns:
string: The display name of the token
"""
return self._data.get('data', {}).get('display_name', '')
@property
def expire_time(self):
"""The expire time of the token.
Returns:
datetime: The expire time of the token if any, None otherwise
"""
try:
date_ = parse(self._data.get('data', {}).get('expire_time'))
except (ValueError, TypeError):
date_ = None
return date_
@property
def issue_time(self):
"""The issue time of the token.
Returns:
datetime: The issue time of the token
"""
try:
date_ = parse(self._data.get('data', {}).get('issue_time'))
except (ValueError, TypeError):
date_ = None
return date_
@property
def explicit_max_ttl(self):
"""The explicit max ttl.
Returns:
string: The explicit max ttl
"""
return self._data.get('data', {}).get('explicit_max_ttl')
@property
def explicit_max_ttl_day_format(self):
"""The explicit max ttl in a day duration format.
Returns:
string: The explicit max ttl in a day duration format
"""
return self._seconds_to_day_format(self.explicit_max_ttl)
@property
def id(self): # pylint: disable=invalid-name
"""The id of the token.
Returns:
string: The id of the token
"""
return self._data.get('data', {}).get('id')
@property
def meta(self):
"""The meta of the token.
Returns:
string: The meta of the token
"""
return self._data.get('data', {}).get('meta')
@property
def num_uses(self):
"""The number of uses of the token.
Returns:
string: The number of uses of the token
"""
return self._data.get('data', {}).get('num_uses')
@property
def orphan(self):
"""Flag on whether the token is orphan.
Returns:
bool: True if the token is orphan, False otherwise
"""
return self._data.get('data', {}).get('orphan')
@property
def path(self):
"""The path to create the token.
Returns:
string: The path to create the token
"""
return self._data.get('data', {}).get('path')
@property
def policies(self):
"""The policies this token has enforced upon.
Returns:
list: The policies of the token
"""
return self._data.get('data', {}).get('policies', [])
@property
def ttl(self):
"""The ttl is seconds.
Returns:
string: The ttl is seconds
"""
return self._data.get('data', {}).get('ttl')
@property
def ttl_day_format(self):
"""The ttl in a day duration format.
Returns:
string: The ttl in a day duration format
"""
return self._seconds_to_day_format(self.ttl)
[docs] def delete(self):
"""Deletes the token by removing the accessor from the vault instance."""
self._vault.revoke_token(self.accessor, accessor=True)
[docs]class BrokenToken(Token):
"""Models a broken token with only an accessor ID and errors messages."""
@property
def errors(self):
"""The errors of the token."""
return self._data.get('errors')