Source code for hdfs.ext.avro

#!/usr/bin/env python
# encoding: utf-8
# pylint: disable=protected-access

"""Read and write Avro_ files directly from HDFS.

This extension enables streaming decoding and encoding of files from and to
HDFS. It requires the `fastavro` library.

+ :class:`AvroWriter` writes Avro files on HDFS from python objects.
+ :class:`AvroReader` reads Avro files from HDFS into an iterable of records.

Sample usage:

.. literalinclude:: ../examples/avro.py

It also features an entry point (named `hdfscli-avro` by default) which
provides access to the above functionality from the shell. For usage examples
and more information:

.. code-block:: bash

  $ hdfscli-avro --help

.. _Avro: https://avro.apache.org/docs/1.7.7/index.html

"""

from ...util import AsyncWriter, HdfsError
from json import dumps
from six import integer_types, string_types
import fastavro
import io
import logging as lg
import os
import posixpath as psp
import sys


_logger = lg.getLogger(__name__)


# The number of bytes in a sync marker (http://mtth.xyz/_9lc9t3hjtx69x54).
SYNC_SIZE = 16

class _SchemaInferrer(object):

  """Utility to infer Avro schemas from python values."""

  def __init__(self):
    self.record_index = 0

  def infer(self, obj):
    """Infer Avro type corresponding to a python object.

    :param obj: Python primitive.

    There are multiple limitations with this functions, among which:

    + Nullable fields aren't supported.
    + Only Avro integers will be inferred, so some values may overflow.
    + Record names are auto-generated.

    """
    if isinstance(obj, bool):
      return 'boolean'
    elif isinstance(obj, string_types):
      return 'string'
    elif isinstance(obj, integer_types): # Python 3 doesn't have `long`.
      return 'int'
    elif isinstance(obj, float):
      return 'float'
    elif isinstance(obj, list):
      if not obj:
        raise ValueError('Cannot infer type of empty array.')
      return {
        'type': 'array',
        'items': self.infer(obj[0])
      }
    elif isinstance(obj, dict):
      if not obj:
        raise ValueError('Cannot infer type of empty record.')
      self.record_index += 1
      return {
        'name': '__Record{}'.format(self.record_index),
        'type': 'record',
        'fields': [
          {'name': k, 'type': self.infer(v)}
          for k, v in sorted(obj.items()) # Sort fields by name.
        ]
      }
    raise ValueError('Cannot infer type from {}: {!r}'.format(type(obj), obj))


class _SeekableReader(object):

  """Customized reader for Avro.

  :param reader: Non-seekable reader.
  :param size: For testing.

  It detects reads of sync markers' sizes and will buffer these. Note that this
  reader is heavily particularized to how the `fastavro` library performs Avro
  decoding.

  """

  def __init__(self, reader, size=None):
    self._reader = reader
    self._size = size or SYNC_SIZE
    self._buffer = None
    self._saught = False

  def read(self, nbytes):
    """Read bytes, caching the read if the size matches."""
    buf = self._buffer
    if self._saught:
      assert buf
      missing_bytes = nbytes - len(buf)
      if missing_bytes < 0:
        chunk = buf[:nbytes]
        self._buffer = buf[nbytes:]
      else:
        chunk = buf
        if missing_bytes:
          chunk += self._reader.read(missing_bytes)
        self._buffer = None
        self._saught = False
    else:
      self._buffer = None
      chunk = self._reader.read(nbytes)
      if nbytes == self._size:
        self._buffer = chunk
    return chunk

  def seek(self, offset, whence):
    """Go back using the cached bytes."""
    assert offset == - self._size
    assert whence == os.SEEK_CUR
    assert self._buffer
    self._saught = True


[docs]class AvroReader(object): """HDFS Avro file reader. :param client: :class:`hdfs.client.Client` instance. :param hdfs_path: Remote path. :param parts: Part-files to read, when reading a distributed file. The default is to read all part-files in order. See :meth:`hdfs.client.Client.parts` for details. :param reader_schema: Schema to read the data as. If specified, it must be compatible with the writer's schema (the default). The contents of the file will be decoded in a streaming manner, as the data is transferred. This makes it possible to use on files of arbitrary size. As a convenience, the content summary object of the remote file is available on the reader's `content` attribute. Usage: .. code-block:: python with AvroReader(client, 'foo.avro') as reader: schema = reader.writer_schema # The remote file's Avro schema. content = reader.content # Content metadata (e.g. size). for record in reader: pass # and its records """ def __init__(self, client, hdfs_path, parts=None, reader_schema=None): self.content = client.content(hdfs_path) #: Content summary of Avro file. self.metadata = None #: Avro header metadata. self.reader_schema = reader_schema #: Input reader schema. self._writer_schema = None if self.content['directoryCount']: # This is a folder. self._paths = [ psp.join(hdfs_path, fname) for fname in client.parts(hdfs_path, parts) ] else: # This is a single file. self._paths = [hdfs_path] self._client = client self._records = None _logger.debug('Instantiated %r.', self) def __repr__(self): return '<AvroReader(paths={!r})>'.format(self._paths) def __enter__(self): def _reader(): """Record generator over all part-files.""" for path in self._paths: with self._client.read(path) as bytes_reader: reader = fastavro.reader( _SeekableReader(bytes_reader), reader_schema=self.reader_schema ) if not self._writer_schema: schema = reader.writer_schema _logger.debug('Read schema from %r.', path) yield (schema, reader.metadata) for record in reader: yield record self._records = _reader() self._writer_schema, self.metadata = next(self._records) return self def __exit__(self, exc_type, exc_value, traceback): self._records.close() _logger.debug('Closed records iterator for %r.', self) def __iter__(self): # pylint: disable=non-iterator-returned if not self._records: raise HdfsError('Iteration is only supported inside a `with` block.') return self._records @property def writer_schema(self): """Get the underlying file's schema. The schema will only be available after entering the reader's corresponding `with` block. """ if not self._writer_schema: raise HdfsError('Schema not yet inferred.') return self._writer_schema # Legacy property, preserved for backwards-compatibility. schema = writer_schema
[docs]class AvroWriter(object): r"""Write an Avro file on HDFS from python dictionaries. :param client: :class:`hdfs.client.Client` instance. :param hdfs_path: Remote path. :param schema: Avro schema. If not specified, the writer will try to infer it from the first record sent. There are however limitations regarding what can be inferred. :param codec: Compression codec. The default is `'null'` (no compression). :param sync_interval: Number of bytes after which a block will be written. :param sync_marker: 16 byte tag used for synchronization. If not specified, one will be generated at random. :param metadata: Additional metadata to include in the container file's header. Keys starting with `'avro.'` are reserved. :param \*\*kwargs: Keyword arguments forwarded to :meth:`hdfs.client.Client.write`. Usage: .. code-block:: python with AvroWriter(client, 'data.avro') as writer: for record in records: writer.write(record) """ def __init__(self, client, hdfs_path, schema=None, codec=None, sync_interval=None, sync_marker=None, metadata=None, **kwargs): self._hdfs_path = hdfs_path self._fo = client.write(hdfs_path, **kwargs) self._schema = schema self._writer_kwargs = { 'codec': codec or 'null', 'metadata': metadata, 'sync_interval': sync_interval or 1000 * SYNC_SIZE, 'sync_marker': sync_marker or os.urandom(SYNC_SIZE), } self._entered = False self._writer = None _logger.info('Instantiated %r.', self) def __repr__(self): return '<AvroWriter(hdfs_path={!r})>'.format(self._hdfs_path) def __enter__(self): if self._entered: raise HdfsError('Avro writer cannot be reused.') self._entered = True if self._schema: self._start_writer() return self def __exit__(self, *exc_info): if not self._writer: return # No header or records were written. try: self._writer.__exit__(*exc_info) _logger.debug('Closed underlying writer.') finally: self._fo.__exit__(*exc_info) @property def schema(self): """Avro schema.""" if not self._schema: raise HdfsError('Schema not yet inferred.') return self._schema
[docs] def write(self, record): """Store a record. :param record: Record object to store. Only available inside the `with` block. """ if not self._entered: raise HdfsError('Avro writer not available outside context block.') if not self._schema: self._schema = _SchemaInferrer().infer(record) _logger.info('Inferred schema: %s', dumps(self._schema)) self._start_writer() self._writer.write(record)
def _start_writer(self): _logger.debug('Starting underlying writer.') def write(records): fastavro.writer( fo=self._fo.__enter__(), schema=self._schema, records=records, **self._writer_kwargs ) self._writer = AsyncWriter(write).__enter__()