Source code for hdfs.client

#!/usr/bin/env python
# encoding: utf-8

"""WebHDFS API clients."""

from .util import AsyncWriter, HdfsError
from collections import deque
from contextlib import closing, contextmanager
from getpass import getuser
from itertools import repeat
from multiprocessing.pool import ThreadPool
from random import sample
from shutil import move, rmtree
from six import add_metaclass
from six.moves.urllib.parse import quote
from threading import Lock
import codecs
import logging as lg
import os
import os.path as osp
import posixpath as psp
import re
import requests as rq
import sys
import time


_logger = lg.getLogger(__name__)


def _to_error(response):
  """Callback when an API response has a non 2XX status code.

  :param response: Response.

  """
  if response.status_code == 401:
    _logger.error(response.content)
    raise HdfsError('Authentication failure. Check your credentials.')
  try:
    # Cf. http://hadoop.apache.org/docs/r1.0.4/webhdfs.html#Error+Responses
    message = response.json()['RemoteException']['message']
  except ValueError:
    # No clear one thing to display, display entire message content
    message = response.content
  try:
    exception = response.json()['RemoteException']['exception']
  except ValueError:
    exception = None
  return HdfsError(message, exception=exception)


class _Request(object):

  """Class to define API requests.

  :param verb: HTTP verb (`'GET'`, `'PUT'`, etc.).
  :param kwargs: Keyword arguments passed to the request handler.

  """

  webhdfs_prefix = '/webhdfs/v1'
  doc_url = 'https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html'

  def __init__(self, method, **kwargs):
    self.method = method
    self.kwargs = kwargs

  def __call__(self):
    pass # make pylint happy

  def to_method(self, operation):
    """Returns method associated with request to attach to client.

    :param operation: operation name.

    This is called inside the metaclass to switch :class:`_Request` objects
    with the method they represent.

    """

    def api_handler(client, hdfs_path, data=None, strict=True, **params):
      """Wrapper function."""
      params['op'] = operation
      if client._proxy is not None:
        params['doas'] = client._proxy

      attempted_hosts = set()
      while True:
        with client._lock:
          while client._urls[0] in attempted_hosts:
            client._urls.rotate(-1)
          host = client._urls[0]
        url = '{}{}{}'.format(
          host.rstrip('/'),
          self.webhdfs_prefix,
          quote(client.resolve(hdfs_path), '/= '),
        )

        err = None
        try:
          res = client._request(
            method=self.method,
            url=url,
            data=data,
            params=params,
            **self.kwargs
          )
        except (rq.exceptions.ReadTimeout, rq.exceptions.ConnectTimeout,
                rq.exceptions.ConnectionError) as retriable_err:
          err = retriable_err # Retry.
        else:
          if res: # 2XX status code.
            return res
          err = _to_error(res)
          if err.exception not in ('RetriableException', 'StandbyException'):
            if strict:
              raise err
            return res

        attempted_hosts.add(host)
        if len(attempted_hosts) == len(client._urls):
          if len(client._urls) > 1:
            _logger.warning('No reachable host, raising last error.')
          raise err

    api_handler.__name__ = '{}_handler'.format(operation.lower())
    api_handler.__doc__ = 'Cf. {}#{}'.format(self.doc_url, operation)
    return api_handler


class _ClientType(type):

  """Metaclass that enables short and dry request definitions.

  This metaclass transforms any :class:`_Request` instances into their
  corresponding API handlers. Note that the operation used is determined
  directly from the name of the attribute (trimming numbers and underscores and
  uppercasing it).

  """

  pattern = re.compile(r'_|\d')

  def __new__(mcs, name, bases, attrs):
    for key, value in attrs.items():
      if isinstance(value, _Request):
        attrs[key] = value.to_method(mcs.pattern.sub('', key).upper())
    client = super(_ClientType, mcs).__new__(mcs, name, bases, attrs)
    client.__registry__[client.__name__] = client
    return client


[docs]@add_metaclass(_ClientType) class Client(object): """Base HDFS web client. :param url: Hostname or IP address of HDFS namenode, prefixed with protocol, followed by WebHDFS port on namenode. You may also specify multiple URLs separated by semicolons for High Availability support. :param proxy: User to proxy as. :param root: Root path, this will be prefixed to all HDFS paths passed to the client. If the root is relative, the path will be assumed relative to the user's home directory. :param timeout: Connection timeouts, forwarded to the request handler. How long to wait for the server to send data before giving up, as a float, or a `(connect_timeout, read_timeout)` tuple. If the timeout is reached, an appropriate exception will be raised. See the requests_ documentation for details. :param session: `requests.Session` instance, used to emit all requests. In general, this client should only be used directly when its subclasses (e.g. :class:`InsecureClient`, :class:`TokenClient`, and others provided by extensions) do not provide enough flexibility. .. _requests: http://docs.python-requests.org/en/latest/api/#requests.request """ __registry__ = {} def __init__(self, url, root=None, proxy=None, timeout=None, session=None): self.root = root self.url = url self.urls = [u for u in url.split(';') if u] self._urls = deque(self.urls) # this is rotated and used internally self._session = session or rq.Session() self._proxy = proxy self._timeout = timeout self._lock = Lock() _logger.info('Instantiated %r.', self) def __repr__(self): return '<{}(url={!r})>'.format(self.__class__.__name__, self.url) # Generic request handler def _request(self, method, url, **kwargs): r"""Send request to WebHDFS API. :param method: HTTP verb. :param url: Url to send the request to. :param \*\*kwargs: Extra keyword arguments forwarded to the request handler. If any `params` are defined, these will take precendence over the instance's defaults. """ return self._session.request( method=method, url=url, timeout=self._timeout, headers={'content-type': 'application/octet-stream'}, # For HttpFS. **kwargs ) # Raw API endpoints _append = _Request('POST', allow_redirects=False) # cf. `read` _create = _Request('PUT', allow_redirects=False) # cf. `write` _delete = _Request('DELETE') _get_acl_status = _Request('GET') _get_content_summary = _Request('GET') _get_file_checksum = _Request('GET') _get_file_status = _Request('GET') _get_home_directory = _Request('GET') _get_trash_root = _Request('GET') _list_status = _Request('GET') _mkdirs = _Request('PUT') _modify_acl_entries = _Request('PUT') _remove_acl_entries = _Request('PUT') _remove_default_acl = _Request('PUT') _remove_acl = _Request('PUT') _open = _Request('GET', stream=True) _rename = _Request('PUT') _set_acl = _Request('PUT') _set_owner = _Request('PUT') _set_permission = _Request('PUT') _set_replication = _Request('PUT') _set_times = _Request('PUT') _allow_snapshot = _Request('PUT') _disallow_snapshot = _Request('PUT') _create_snapshot = _Request('PUT') _delete_snapshot = _Request('DELETE') _rename_snapshot = _Request('PUT') # Exposed endpoints
[docs] def resolve(self, hdfs_path): """Return absolute, normalized path, with special markers expanded. :param hdfs_path: Remote path. Currently supported markers: * `'#LATEST'`: this marker gets expanded to the most recently updated file or folder. They can be combined using the `'{N}'` suffix. For example, `'foo/#LATEST{2}'` is equivalent to `'foo/#LATEST/#LATEST'`. """ path = hdfs_path if not psp.isabs(path): if not self.root or not psp.isabs(self.root): root = self._get_home_directory('/').json()['Path'] self.root = psp.join(root, self.root) if self.root else root _logger.debug('Updated root to %r.', self.root) path = psp.join(self.root, path) path = psp.normpath(path) def expand_latest(match): """Substitute #LATEST marker.""" prefix = match.string[:match.start()] suffix = '' n = match.group(1) # n as in {N} syntax for _ in repeat(None, int(n) if n else 1): statuses = self._list_status(psp.join(prefix, suffix)).json() candidates = sorted( (-status['modificationTime'], status['pathSuffix']) for status in statuses['FileStatuses']['FileStatus'] ) if not candidates: raise HdfsError('Cannot expand #LATEST. %r is empty.', prefix) elif len(candidates) == 1 and candidates[0][1] == '': raise HdfsError('Cannot expand #LATEST. %r is a file.', prefix) suffix = psp.join(suffix, candidates[0][1]) return '/' + suffix path = re.sub(r'/?#LATEST(?:{(\d+)})?(?=/|$)', expand_latest, path) # #LATEST expansion (could cache the pattern, but not worth it) _logger.debug('Resolved path %r to %r.', hdfs_path, path) return path
[docs] def content(self, hdfs_path, strict=True): """Get ContentSummary_ for a file or folder on HDFS. :param hdfs_path: Remote path. :param strict: If `False`, return `None` rather than raise an exception if the path doesn't exist. .. _ContentSummary: CS_ .. _CS: http://hadoop.apache.org/docs/r1.0.4/webhdfs.html#ContentSummary """ _logger.info('Fetching content summary for %r.', hdfs_path) res = self._get_content_summary(hdfs_path, strict=strict) return res.json()['ContentSummary'] if res else None
[docs] def status(self, hdfs_path, strict=True): """Get FileStatus_ for a file or folder on HDFS. :param hdfs_path: Remote path. :param strict: If `False`, return `None` rather than raise an exception if the path doesn't exist. .. _FileStatus: FS_ .. _FS: http://hadoop.apache.org/docs/r1.0.4/webhdfs.html#FileStatus """ _logger.info('Fetching status for %r.', hdfs_path) res = self._get_file_status(hdfs_path, strict=strict) return res.json()['FileStatus'] if res else None
[docs] def acl_status(self, hdfs_path, strict=True): """Get AclStatus_ for a file or folder on HDFS. :param hdfs_path: Remote path. :param strict: If `False`, return `None` rather than raise an exception if the path doesn't exist. .. _AclStatus: https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Get_ACL_Status """ _logger.info('Fetching ACL status for %r.', hdfs_path) res = self._get_acl_status(hdfs_path, strict=strict) return res.json()['AclStatus'] if res else None
[docs] def set_acl(self, hdfs_path, acl_spec, clear=True): """SetAcl_ or ModifyAcl_ for a file or folder on HDFS. :param hdfs_path: Path to an existing remote file or directory. An :class:`HdfsError` will be raised if the path doesn't exist. :param acl_spec: String representation of an ACL spec. Must be a valid string with entries for user, group and other. For example: `"user::rwx,user:foo:rw-,group::r--,other::---"`. :param clear: Clear existing ACL entries. If set to false, all existing ACL entries that are not specified in this call are retained without changes, behaving like ModifyAcl_. For example: `"user:foo:rwx"`. .. _SetAcl: https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Set_ACL .. _ModifyAcl: https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Modify_ACL_Entries """ if clear: _logger.info('Setting ACL spec for %r to %r.', hdfs_path, acl_spec) self._set_acl(hdfs_path, aclspec=acl_spec) else: _logger.info('Modifying ACL spec for %r to %r.', hdfs_path, acl_spec) self._modify_acl_entries(hdfs_path, aclspec=acl_spec)
[docs] def remove_acl_entries(self, hdfs_path, acl_spec): """RemoveAclEntries_ for a file or folder on HDFS. :param hdfs_path: Path to an existing remote file or directory. An :class:`HdfsError` will be raised if the path doesn't exist. :param acl_spec: String representation of an ACL spec. Must be a valid string with entries for user, group and other. For example: `"user::rwx,user:foo:rw-,group::r--,other::---"`. .. _RemoveAclEntries: https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Remove_ACL_Entries """ _logger.info('Removing ACL spec on %r for %r.', hdfs_path, acl_spec) self._remove_acl_entries(hdfs_path, aclspec=acl_spec)
[docs] def remove_default_acl(self, hdfs_path): """RemoveDefaultAcl_ for a file or folder on HDFS. :param hdfs_path: Path to an existing remote file or directory. An :class:`HdfsError` will be raised if the path doesn't exist. .. _RemoveDefaultAcl: https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Remove_Default_ACL """ _logger.info('Removing default acl for %r', hdfs_path) self._remove_default_acl(hdfs_path)
[docs] def remove_acl(self, hdfs_path): """RemoveAcl_ for a file or folder on HDFS. :param hdfs_path: Path to an existing remote file or directory. An :class:`HdfsError` will be raised if the path doesn't exist. .. _RemoveAcl: https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Remove_ACL """ _logger.info('Removing all ACL for %r', hdfs_path) self._remove_acl(hdfs_path)
[docs] def parts(self, hdfs_path, parts=None, status=False): """Returns a dictionary of part-files corresponding to a path. :param hdfs_path: Remote path. This directory should contain at most one part file per partition (otherwise one will be picked arbitrarily). :param parts: List of part-files numbers or total number of part-files to select. If a number, that many partitions will be chosen at random. By default all part-files are returned. If `parts` is a list and one of the parts is not found or too many samples are demanded, an :class:`~hdfs.util.HdfsError` is raised. :param status: Also return each file's corresponding FileStatus_. """ _logger.debug('Fetching parts for %r.', hdfs_path) pattern = re.compile(r'^part-(?:(?:m|r)-|)(\d+)[^/]*$') matches = ( (name, pattern.match(name), s) for name, s in self.list(hdfs_path, status=True) ) part_files = { int(match.group(1)): (name, s) for name, match, s in matches if match } if not part_files: raise HdfsError('No part-files found in %r.', hdfs_path) _logger.debug('Found %s part-files for %r.', len(part_files), hdfs_path) if parts: if isinstance(parts, int): _logger.debug('Choosing %s parts randomly.', parts) if parts > len(part_files): raise HdfsError('Not enough part-files in %r.', hdfs_path) parts = sample(part_files, parts) try: infos = list(part_files[p] for p in parts) except KeyError as err: raise HdfsError('No part-file %r in %r.', err.args[0], hdfs_path) _logger.info( 'Returning %s of %s part-files for %r: %s.', len(infos), len(part_files), hdfs_path, ', '.join(name for name, _ in infos) ) else: infos = list(sorted(part_files.values())) _logger.info('Returning all %s part-files at %r.', len(infos), hdfs_path) return infos if status else [name for name, _ in infos]
[docs] def write(self, hdfs_path, data=None, overwrite=False, permission=None, blocksize=None, replication=None, buffersize=None, append=False, encoding=None): """Create a file on HDFS. :param hdfs_path: Path where to create file. The necessary directories will be created appropriately. :param data: Contents of file to write. Can be a string, a generator or a file object. The last two options will allow streaming upload (i.e. without having to load the entire contents into memory). If `None`, this method will return a file-like object and should be called using a `with` block (see below for examples). :param overwrite: Overwrite any existing file or directory. :param permission: Octal permission to set on the newly created file. Leading zeros may be omitted. :param blocksize: Block size of the file. :param replication: Number of replications of the file. :param buffersize: Size of upload buffer. :param append: Append to a file rather than create a new one. :param encoding: Encoding used to serialize data written. Sample usages: .. code-block:: python from json import dump, dumps records = [ {'name': 'foo', 'weight': 1}, {'name': 'bar', 'weight': 2}, ] # As a context manager: with client.write('data/records.jsonl', encoding='utf-8') as writer: dump(records, writer) # Or, passing in a generator directly: client.write('data/records.jsonl', data=dumps(records), encoding='utf-8') """ # TODO: Figure out why this function generates a "Connection pool is full, # discarding connection" warning when passed a generator. if append: if overwrite: raise ValueError('Cannot both overwrite and append.') if permission or blocksize or replication: raise ValueError('Cannot change file properties while appending.') _logger.info('Appending to %r.', hdfs_path) res = self._append(hdfs_path, buffersize=buffersize) else: _logger.info('Writing to %r.', hdfs_path) res = self._create( hdfs_path, overwrite=overwrite, permission=permission, blocksize=blocksize, replication=replication, buffersize=buffersize, ) loc = res.headers['location'] def consumer(_data): """Thread target.""" res = self._request( method='POST' if append else 'PUT', url=loc, data=(c.encode(encoding) for c in _data) if encoding else _data, ) if not res: raise _to_error(res) if data is None: return AsyncWriter(consumer) else: consumer(data)
[docs] def upload(self, hdfs_path, local_path, n_threads=1, temp_dir=None, chunk_size=2 ** 16, progress=None, cleanup=True, **kwargs): r"""Upload a file or directory to HDFS. :param hdfs_path: Target HDFS path. If it already exists and is a directory, files will be uploaded inside. :param local_path: Local path to file or folder. If a folder, all the files inside of it will be uploaded (note that this implies that folders empty of files will not be created remotely). :param n_threads: Number of threads to use for parallelization. A value of `0` (or negative) uses as many threads as there are files. :param temp_dir: Directory under which the files will first be uploaded when `overwrite=True` and the final remote path already exists. Once the upload successfully completes, it will be swapped in. :param chunk_size: Interval in bytes by which the files will be uploaded. :param progress: Callback function to track progress, called every `chunk_size` bytes. It will be passed two arguments, the path to the file being uploaded and the number of bytes transferred so far. On completion, it will be called once with `-1` as second argument. :param cleanup: Delete any uploaded files if an error occurs during the upload. :param \*\*kwargs: Keyword arguments forwarded to :meth:`write`. In particular, set `overwrite` to overwrite any existing file or directory. On success, this method returns the remote upload path. """ if chunk_size <= 0: raise ValueError('Upload chunk size must be positive.') _logger.info('Uploading %r to %r.', local_path, hdfs_path) def _upload(_path_tuple): """Upload a single file.""" _local_path, _temp_path = _path_tuple _logger.debug('Uploading %r to %r.', _local_path, _temp_path) def wrap(_reader, _chunk_size, _progress): """Generator that can track progress.""" nbytes = 0 while True: chunk = _reader.read(_chunk_size) if chunk: if _progress: nbytes += len(chunk) _progress(_local_path, nbytes) yield chunk else: break if _progress: _progress(_local_path, -1) with open(_local_path, 'rb') as reader: self.write(_temp_path, wrap(reader, chunk_size, progress), **kwargs) # First, we gather information about remote paths. hdfs_path = self.resolve(hdfs_path) temp_path = None try: statuses = [status for _, status in self.list(hdfs_path, status=True)] except HdfsError as err: if 'not a directory' in err.message: # Remote path is a normal file. if not kwargs.get('overwrite'): raise HdfsError('Remote path %r already exists.', hdfs_path) elif 'does not exist' in err.message: # Remote path doesn't exist. temp_path = hdfs_path else: # An unexpected error occurred. raise err else: # Remote path is a directory. suffixes = {status['pathSuffix'] for status in statuses} local_name = osp.basename(local_path) hdfs_path = psp.join(hdfs_path, local_name) if local_name in suffixes: if not kwargs.get('overwrite'): raise HdfsError('Remote path %r already exists.', hdfs_path) else: temp_path = hdfs_path if not temp_path: # The remote path already exists, we need to generate a temporary one. remote_dpath, remote_name = psp.split(hdfs_path) temp_dir = temp_dir or remote_dpath temp_path = psp.join( temp_dir, '{}.temp-{}'.format(remote_name, _current_micros()) ) _logger.debug( 'Upload destination %r already exists. Using temporary path %r.', hdfs_path, temp_path ) # Then we figure out which files we need to upload, and where. if osp.isdir(local_path): local_fpaths = [ osp.join(dpath, fpath) for dpath, _, fpaths in os.walk(local_path) for fpath in fpaths ] if not local_fpaths: raise HdfsError('No files to upload found inside %r.', local_path) offset = len(local_path.rstrip(os.sep)) + len(os.sep) fpath_tuples = [ (fpath, psp.join(temp_path, fpath[offset:].replace(os.sep, '/'))) for fpath in local_fpaths ] elif osp.exists(local_path): fpath_tuples = [(local_path, temp_path)] else: raise HdfsError('Local path %r does not exist.', local_path) # Finally, we upload all files (optionally, in parallel). if n_threads <= 0: n_threads = len(fpath_tuples) else: n_threads = min(n_threads, len(fpath_tuples)) _logger.debug( 'Uploading %s files using %s thread(s).', len(fpath_tuples), n_threads ) try: if n_threads == 1: for path_tuple in fpath_tuples: _upload(path_tuple) else: _map_async(n_threads, _upload, fpath_tuples) except Exception as err: # pylint: disable=broad-except if cleanup: _logger.exception('Error while uploading. Attempting cleanup.') try: self.delete(temp_path, recursive=True) except Exception: _logger.error('Unable to cleanup temporary folder.') finally: raise err else: raise err else: if temp_path != hdfs_path: _logger.debug( 'Upload of %r complete. Moving from %r to %r.', local_path, temp_path, hdfs_path ) self.delete(hdfs_path, recursive=True) self.rename(temp_path, hdfs_path) else: _logger.debug( 'Upload of %r to %r complete.', local_path, hdfs_path ) return hdfs_path
[docs] @contextmanager def read(self, hdfs_path, offset=0, length=None, buffer_size=None, encoding=None, chunk_size=0, delimiter=None, progress=None): """Read a file from HDFS. :param hdfs_path: HDFS path. :param offset: Starting byte position. :param length: Number of bytes to be processed. `None` will read the entire file. :param buffer_size: Size of the buffer in bytes used for transferring the data. Defaults the the value set in the HDFS configuration. :param encoding: Encoding used to decode the request. By default the raw data is returned. This is mostly helpful in python 3, for example to deserialize JSON data (as the decoder expects unicode). :param chunk_size: If set to a positive number, the context manager will return a generator yielding every `chunk_size` bytes instead of a file-like object (unless `delimiter` is also set, see below). :param delimiter: If set, the context manager will return a generator yielding each time the delimiter is encountered. This parameter requires the `encoding` to be specified. :param progress: Callback function to track progress, called every `chunk_size` bytes (not available if the chunk size isn't specified). It will be passed two arguments, the path to the file being uploaded and the number of bytes transferred so far. On completion, it will be called once with `-1` as second argument. This method must be called using a `with` block: .. code-block:: python with client.read('foo') as reader: content = reader.read() This ensures that connections are always properly closed. .. note:: The raw file-like object returned by this method (when called without an encoding, chunk size, or delimiter) can have a very different performance profile than local files. In particular, line-oriented methods are often slower. The recommended workaround is to specify an encoding when possible or read the entire file before splitting it. """ if chunk_size < 0: raise ValueError('Read chunk size must be non-negative.') if progress and not chunk_size: raise ValueError('Progress callback requires a positive chunk size.') if delimiter: if not encoding: raise ValueError('Delimiter splitting requires an encoding.') if chunk_size: raise ValueError('Delimiter splitting incompatible with chunk size.') _logger.info('Reading file %r.', hdfs_path) res = self._open( hdfs_path, offset=offset, length=length, buffersize=buffer_size, ) try: if not chunk_size and not delimiter: yield codecs.getreader(encoding)(res.raw) if encoding else res.raw else: # Patch in encoding on the response object so that `iter_content` and # `iter_lines` can pick it up. If `None`, it is ignored and no decoding # happens (which is why we can always set `decode_unicode=True`). res.encoding = encoding if delimiter: data = res.iter_lines(delimiter=delimiter, decode_unicode=True) else: data = res.iter_content(chunk_size=chunk_size, decode_unicode=True) if progress: def reader(_hdfs_path, _progress): """Generator that tracks progress.""" nbytes = 0 for chunk in data: nbytes += len(chunk) _progress(_hdfs_path, nbytes) yield chunk _progress(_hdfs_path, -1) yield reader(hdfs_path, progress) else: yield data finally: res.close() _logger.debug('Closed response for reading file %r.', hdfs_path)
[docs] def download(self, hdfs_path, local_path, overwrite=False, n_threads=1, temp_dir=None, **kwargs): r"""Download a file or folder from HDFS and save it locally. :param hdfs_path: Path on HDFS of the file or folder to download. If a folder, all the files under it will be downloaded. :param local_path: Local path. If it already exists and is a directory, the files will be downloaded inside of it. :param overwrite: Overwrite any existing file or directory. :param n_threads: Number of threads to use for parallelization. A value of `0` (or negative) uses as many threads as there are files. :param temp_dir: Directory under which the files will first be downloaded when `overwrite=True` and the final destination path already exists. Once the download successfully completes, it will be swapped in. :param \*\*kwargs: Keyword arguments forwarded to :meth:`read`. If no `chunk_size` argument is passed, a default value of 64 kB will be used. If a `progress` argument is passed and threading is used, care must be taken to ensure correct behavior. On success, this method returns the local download path. """ _logger.info('Downloading %r to %r.', hdfs_path, local_path) kwargs.setdefault('chunk_size', 2 ** 16) lock = Lock() def _download(_path_tuple): """Download a single file.""" _remote_path, _temp_path = _path_tuple _logger.debug('Downloading %r to %r.', _remote_path, _temp_path) _dpath = osp.dirname(_temp_path) with lock: # Prevent race condition when using multiple threads. if not osp.exists(_dpath): os.makedirs(_dpath) with open(_temp_path, 'wb') as _writer: with self.read(_remote_path, **kwargs) as reader: for chunk in reader: _writer.write(chunk) # First, we figure out where we will download the files to. hdfs_path = self.resolve(hdfs_path) local_path = osp.realpath(local_path) if osp.isdir(local_path): local_path = osp.join(local_path, psp.basename(hdfs_path)) if osp.exists(local_path): if not overwrite: raise HdfsError('Path %r already exists.', local_path) local_dpath, local_name = osp.split(local_path) temp_dir = temp_dir or local_dpath temp_path = osp.join( temp_dir, '{}.temp-{}'.format(local_name, _current_micros()) ) _logger.debug( 'Download destination %r already exists. Using temporary path %r.', local_path, temp_path ) else: if not osp.isdir(osp.dirname(local_path)): raise HdfsError('Parent directory of %r does not exist.', local_path) temp_path = local_path # Then we figure out which files we need to download and where. remote_paths = list(self.walk(hdfs_path, depth=0, status=False)) if not remote_paths: # This is a single file. remote_fpaths = [hdfs_path] else: remote_fpaths = [ psp.join(dpath, fname) for dpath, _, fnames in remote_paths for fname in fnames ] if not remote_fpaths: raise HdfsError('No files to download found inside %r.', hdfs_path) offset = len(hdfs_path) + 1 # Prefix length. fpath_tuples = [ ( fpath, osp.join(temp_path, fpath[offset:].replace('/', os.sep)).rstrip(os.sep) ) for fpath in remote_fpaths ] # Finally, we download all of them. if n_threads <= 0: n_threads = len(fpath_tuples) else: n_threads = min(n_threads, len(fpath_tuples)) _logger.debug( 'Downloading %s files using %s thread(s).', len(fpath_tuples), n_threads ) try: if n_threads == 1: for fpath_tuple in fpath_tuples: _download(fpath_tuple) else: _map_async(n_threads, _download, fpath_tuples) except Exception as err: # pylint: disable=broad-except _logger.exception('Error while downloading. Attempting cleanup.') try: if osp.isdir(temp_path): rmtree(temp_path) else: os.remove(temp_path) except Exception: _logger.error('Unable to cleanup temporary folder.') finally: raise err else: if temp_path != local_path: _logger.debug( 'Download of %r complete. Moving from %r to %r.', hdfs_path, temp_path, local_path ) if osp.isdir(local_path): rmtree(local_path) else: os.remove(local_path) move(temp_path, local_path) else: _logger.debug( 'Download of %s to %r complete.', hdfs_path, local_path ) return local_path
[docs] def delete(self, hdfs_path, recursive=False, skip_trash=True): """Remove a file or directory from HDFS. :param hdfs_path: HDFS path. :param recursive: Recursively delete files and directories. By default, this method will raise an :class:`HdfsError` if trying to delete a non-empty directory. :param skip_trash: When false, the deleted path will be moved to an appropriate trash folder rather than deleted. This requires Hadoop 2.9+ and trash to be enabled on the cluster. This function returns `True` if the deletion was successful and `False` if no file or directory previously existed at `hdfs_path`. """ verb = 'Deleting' if skip_trash else 'Trashing' _logger.info( '%s %r%s.', verb, hdfs_path, ' recursively' if recursive else '' ) if skip_trash: return self._delete(hdfs_path, recursive=recursive).json()['boolean'] hdfs_path = self.resolve(hdfs_path) status = self.status(hdfs_path, strict=False) if not status: return False if status['type'] == 'DIRECTORY' and not recursive: raise HdfsError('Non-recursive trashing of directory %r.', hdfs_path) _logger.info('Fetching trash root for %r.', hdfs_path) trash_path = self._get_trash_root(hdfs_path).json()['Path'] # The default trash policy (http://mtth.xyz/_9lc9t3hjtz276rx) expects # folders to be under a `"Current"` subfolder. We also add a timestamped # folder as a simple safeguard against path conflicts (note that the above # policy implements a more involved variant). dst_path = psp.join(trash_path, 'Current', _current_micros()) self.makedirs(dst_path) # Note that there is a (hopefully small) race condition here: the path might # have been deleted between the status call above and the rename here. self.rename(hdfs_path, dst_path) _logger.info('%r moved to trash at %r.', hdfs_path, dst_path) return True
[docs] def rename(self, hdfs_src_path, hdfs_dst_path): """Move a file or folder. :param hdfs_src_path: Source path. :param hdfs_dst_path: Destination path. If the path already exists and is a directory, the source will be moved into it. If the path exists and is a file, or if a parent destination directory is missing, this method will raise an :class:`HdfsError`. """ _logger.info('Renaming %r to %r.', hdfs_src_path, hdfs_dst_path) hdfs_dst_path = self.resolve(hdfs_dst_path) res = self._rename(hdfs_src_path, destination=hdfs_dst_path) if not res.json()['boolean']: raise HdfsError( 'Unable to rename %r to %r.', self.resolve(hdfs_src_path), hdfs_dst_path )
[docs] def set_owner(self, hdfs_path, owner=None, group=None): """Change the owner of file. :param hdfs_path: HDFS path. :param owner: Optional, new owner for file. :param group: Optional, new group for file. At least one of `owner` and `group` must be specified. """ if not owner and not group: raise ValueError('Must set at least one of owner or group.') messages = [] if owner: messages.append('owner to {!r}'.format(owner)) if group: messages.append('group to {!r}'.format(group)) _logger.info('Changing %s of %r.', ', and'.join(messages), hdfs_path) self._set_owner(hdfs_path, owner=owner, group=group)
[docs] def set_permission(self, hdfs_path, permission): """Change the permissions of file. :param hdfs_path: HDFS path. :param permission: New octal permissions string of file. """ _logger.info( 'Changing permissions of %r to %r.', hdfs_path, permission ) self._set_permission(hdfs_path, permission=permission)
[docs] def set_times(self, hdfs_path, access_time=None, modification_time=None): """Change remote timestamps. :param hdfs_path: HDFS path. :param access_time: Timestamp of last file access. :param modification_time: Timestamps of last file access. """ if not access_time and not modification_time: raise ValueError('At least one of time must be specified.') msgs = [] if access_time: msgs.append('access time to {!r}'.format(access_time)) if modification_time: msgs.append('modification time to {!r}'.format(modification_time)) _logger.info('Updating %s of %r.', ' and '.join(msgs), hdfs_path) self._set_times( hdfs_path, accesstime=access_time, modificationtime=modification_time, )
[docs] def set_replication(self, hdfs_path, replication): """Set file replication. :param hdfs_path: Path to an existing remote file. An :class:`HdfsError` will be raised if the path doesn't exist or points to a directory. :param replication: Replication factor. """ _logger.info( 'Setting replication factor to %r for %r.', replication, hdfs_path ) res = self._set_replication(hdfs_path, replication=replication) if not res.json()['boolean']: raise HdfsError('%r is not a file.', hdfs_path)
[docs] def makedirs(self, hdfs_path, permission=None): """Create a remote directory, recursively if necessary. :param hdfs_path: Remote path. Intermediate directories will be created appropriately. :param permission: Octal permission to set on the newly created directory. These permissions will only be set on directories that do not already exist. This function currently has no return value as WebHDFS doesn't return a meaningful flag. """ _logger.info('Creating directories to %r.', hdfs_path) self._mkdirs(hdfs_path, permission=permission)
[docs] def checksum(self, hdfs_path): """Get a remote file's checksum. :param hdfs_path: Remote path. Must point to a file. """ _logger.info('Getting checksum for %r.', hdfs_path) return self._get_file_checksum(hdfs_path).json()['FileChecksum']
[docs] def allow_snapshot(self, hdfs_path): """Allow snapshots for a remote folder. :param hdfs_path: Remote path to a direcotry. If `hdfs_path` doesn't exist or does points to a normal file, an :class:`HdfsError` will be raised. No-op if snapshotting is already allowed. """ _logger.info('Allowing snapshots in %r.', hdfs_path) hdfs_path = self.resolve(hdfs_path) self._allow_snapshot(hdfs_path)
[docs] def disallow_snapshot(self, hdfs_path): """Disallow snapshots for a remote folder. :param hdfs_path: Remote path to a direcotry. If `hdfs_path` doesn't exist, points to a normal file or there are some snapshots, an :class:`HdfsError` will be raised. No-op if snapshotting is disallowed/never allowed. """ _logger.info('Disallowing snapshots in %r.', hdfs_path) hdfs_path = self.resolve(hdfs_path) self._disallow_snapshot(hdfs_path)
[docs] def create_snapshot(self, hdfs_path, snapshotname=None): """Create snapshot for a remote folder where it was allowed. :param hdfs_path: Remote path to a direcotry. If `hdfs_path` doesn't exist, doesn't allow to create snapshot or points to a normal file, an :class:`HdfsError` will be raised. :param snapshotname snapshot name; if absent, name is generated by the server. Returns a path to created snapshot. """ _logger.info('Creating snapshot %r in %r.', snapshotname, hdfs_path) hdfs_path = self.resolve(hdfs_path) return self._create_snapshot(hdfs_path, snapshotname=snapshotname).json()['Path']
[docs] def delete_snapshot(self, hdfs_path, snapshotname): """Remove snapshot for a remote folder where it was allowed. :param hdfs_path: Remote path to a direcotry. If `hdfs_path` doesn't exist or points to a normal file, an :class:`HdfsError` will be raised. :param snapshotname snapshot name; if it does not exist, an :class:`HdfsError` will be raised. """ _logger.info('Deleting snapshot %r in %r.', snapshotname, hdfs_path) hdfs_path = self.resolve(hdfs_path) self._delete_snapshot(hdfs_path, snapshotname=snapshotname)
[docs] def rename_snapshot(self, hdfs_path, oldsnapshotname, snapshotname): """Rename snapshot for a remote folder. :param hdfs_path: Remote path to a direcotry. If `hdfs_path` doesn't exist or points to a normal file, an :class:`HdfsError` will be raised. :param oldsnapshotname snapshot name; if it does not exist, an :class:`HdfsError` will be raised. :param snapshotname new snapshot name; if it does already exist, an :class:`HdfsError` will be raised. """ _logger.info('Renaming snapshot %r to %r in %r.', oldsnapshotname, snapshotname, hdfs_path) hdfs_path = self.resolve(hdfs_path) self._rename_snapshot(hdfs_path, oldsnapshotname=oldsnapshotname, snapshotname=snapshotname)
[docs] def list(self, hdfs_path, status=False): """Return names of files contained in a remote folder. :param hdfs_path: Remote path to a directory. If `hdfs_path` doesn't exist or points to a normal file, an :class:`HdfsError` will be raised. :param status: Also return each file's corresponding FileStatus_. """ _logger.info('Listing %r.', hdfs_path) hdfs_path = self.resolve(hdfs_path) statuses = self._list_status(hdfs_path).json()['FileStatuses']['FileStatus'] if len(statuses) == 1 and ( not statuses[0]['pathSuffix'] or self.status(hdfs_path)['type'] == 'FILE' # HttpFS behaves incorrectly here, we sometimes need an extra call to # make sure we always identify if we are dealing with a file. ): raise HdfsError('%r is not a directory.', hdfs_path) if status: return [(s['pathSuffix'], s) for s in statuses] else: return [s['pathSuffix'] for s in statuses]
[docs] def walk(self, hdfs_path, depth=0, status=False, ignore_missing=False, allow_dir_changes=False): """Depth-first walk of remote filesystem. :param hdfs_path: Starting path. If the path doesn't exist, an :class:`HdfsError` will be raised. If it points to a file, the returned generator will be empty. :param depth: Maximum depth to explore. `0` for no limit. :param status: Also return each file or folder's corresponding FileStatus_. :param ignore_missing: Ignore missing nested folders rather than raise an exception. This can be useful when the tree is modified during a walk. :param allow_dir_changes: Allow changes to the directories' list to affect the walk. For example clearing it by setting `dirs[:] = []` would prevent the walk from entering any nested directories. This option can only be set when `status` is false. This method returns a generator yielding tuples `(path, dirs, files)` where `path` is the absolute path to the current directory, `dirs` is the list of directory names it contains, and `files` is the list of file names it contains. """ _logger.info('Walking %r (depth %r).', hdfs_path, depth) if status and allow_dir_changes: raise ValueError('Cannot set both status and allow_dir_changes') def _walk(dir_path, dir_status, depth): """Recursion helper.""" try: infos = self.list(dir_path, status=True) except HdfsError as err: if ignore_missing and 'does not exist' in err.message: return raise dir_infos = [info for info in infos if info[1]['type'] == 'DIRECTORY'] file_infos = [info for info in infos if info[1]['type'] == 'FILE'] if status: yield ((dir_path, dir_status), dir_infos, file_infos) else: dir_names = [dir_name for dir_name, _ in dir_infos] yield ( dir_path, dir_names, [file_name for file_name, _ in file_infos], ) if allow_dir_changes: infos_by_name = dict(dir_infos) strict = not ignore_missing dir_infos = [] for dir_name in dir_names: info = infos_by_name.get(dir_name) if not info: info = self.status(psp.join(dir_path, dir_name), strict=strict) if info: dir_infos.append((dir_name, info)) if depth != 1: for name, s in dir_infos: path = psp.join(dir_path, name) for infos in _walk(path, s, depth - 1): yield infos hdfs_path = self.resolve(hdfs_path) # Cache resolution. s = self.status(hdfs_path) if s['type'] == 'DIRECTORY': for infos in _walk(hdfs_path, s, depth): yield infos
# Class loader.
[docs] @classmethod def from_options(cls, options, class_name='Client'): """Load client from options. :param options: Options dictionary. :param class_name: Client class name. Defaults to the base :class:`Client` class. This method provides a single entry point to instantiate any registered :class:`Client` subclass. To register a subclass, simply load its containing module. If using the CLI, you can use the `autoload.modules` and `autoload.paths` options. """ try: return cls.__registry__[class_name](**options) except KeyError: raise HdfsError('Unknown client class: %r', class_name) except TypeError: raise HdfsError('Invalid options: %r', options)
# Custom client classes # ---------------------
[docs]class InsecureClient(Client): r"""HDFS web client to use when security is off. :param url: Hostname or IP address of HDFS namenode, prefixed with protocol, followed by WebHDFS port on namenode :param user: User default. Defaults to the current user's (as determined by `whoami`). :param \*\*kwargs: Keyword arguments passed to the base class' constructor. Note that if a session argument is passed in, it will be modified in-place to support authentication. """ def __init__(self, url, user=None, **kwargs): user = user or getuser() session = kwargs.setdefault('session', rq.Session()) if not session.params: session.params = {} session.params['user.name'] = user super(InsecureClient, self).__init__(url, **kwargs)
[docs]class TokenClient(Client): r"""HDFS web client using Hadoop token delegation security. :param url: Hostname or IP address of HDFS namenode, prefixed with protocol, followed by WebHDFS port on namenode :param token: Hadoop delegation token. :param \*\*kwargs: Keyword arguments passed to the base class' constructor. Note that if a session argument is passed in, it will be modified in-place to support authentication. """ def __init__(self, url, token, **kwargs): session = kwargs.setdefault('session', rq.Session()) if not session.params: session.params = {} session.params['delegation'] = token super(TokenClient, self).__init__(url, **kwargs)
# Helpers # ------- def _current_micros(): """Returns a string representing the current time in microseconds.""" return str(int(time.time() * 1e6)) def _map_async(pool_size, func, args): """Async map (threading), handling python 2.6 edge case. :param pool_size: Maximum number of threads. :param func: Function to run. :param args: Iterable of arguments (one per thread). This is necessary since using `map` will in general prevent keyboard interrupts from functioning properly (see this thread for more details - http://stackoverflow.com/a/1408476/1062617), but `map_async` hangs in python 2.6. """ pool = ThreadPool(pool_size) with closing(pool): if sys.version_info <= (2, 6): results = pool.map(func, args) else: results = pool.map_async(func, args).get(1 << 22) # 6+ weeks. pool.join() return results