# linux-utils: Linux system administration tools for Python.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: June 24, 2017
# URL: https://linux-utils.readthedocs.io
"""
Atomic filesystem operations for Linux in Python.
The most useful functions in this module are :func:`make_dirs()`,
:func:`touch()`, :func:`write_contents()` and :func:`write_file()`.
The :func:`copy_stat()` and :func:`get_temporary_file()` functions were
originally part of the logic in :func:`write_file()` but have since been
extracted to improve the readability and reusability of the code.
"""
# Standard library modules.
import codecs
import contextlib
import errno
import logging
import os
import stat
# External dependencies.
from humanfriendly import Timer
from six import text_type
# Public identifiers that require documentation.
__all__ = (
'copy_stat',
'get_temporary_file',
'make_dirs',
'touch',
'write_contents',
'write_file',
)
# Initialize a logger for this module.
logger = logging.getLogger(__name__)
[docs]def copy_stat(filename, reference=None, mode=None, uid=None, gid=None):
"""
The Python equivalent of ``chmod --reference && chown --reference``.
:param filename: The pathname of the file whose permissions and
ownership should be modified (a string).
:param reference: The pathname of the file to use as
reference (a string or :data:`None`).
:param mode: The permissions to set when `reference` isn't given or doesn't
exist (a number or :data:`None`).
:param uid: The user id to set when `reference` isn't given or doesn't
exist (a number or :data:`None`).
:param gid: The group id to set when `reference` isn't given or doesn't
exist (a number or :data:`None`).
"""
# Try to get the metadata from the reference file.
try:
if reference:
metadata = os.stat(reference)
mode = stat.S_IMODE(metadata.st_mode)
uid = metadata.st_uid
gid = metadata.st_gid
logger.debug("Copying permissions and ownership (%s) ..", reference)
except OSError as e:
# The only exception that we want to swallow
# is when the reference file doesn't exist.
if e.errno != errno.ENOENT:
raise
# Change the file's permissions?
if mode is not None:
logger.debug("Changing file permissions (%s) to %o ..", filename, mode)
os.chmod(filename, mode)
# Change the file's ownership?
if uid is not None or gid is not None:
logger.debug("Changing owner (%s) and group (%s) of file (%s) ..",
"unchanged" if uid is None else uid,
"unchanged" if gid is None else gid,
filename)
os.chown(filename, -1 if uid is None else uid, -1 if gid is None else gid)
[docs]def get_temporary_file(filename):
"""
Generate a non-obtrusive temporary filename.
:param filename: The filename on which the name of the temporary file
should be based (a string).
:returns: The filename of a temporary file (a string).
This function tries to generate the most non-obtrusive temporary filenames:
1. The temporary file will be located in the same directory as the file to
replace, because this is the only location somewhat guaranteed to
support "rename into place" semantics (see :func:`write_file()`).
2. The temporary file will be hidden from directory listings and common
filename patterns because it has a leading dot.
3. The temporary file will have a different extension then the file to
replace (in case of filename patterns that do match dotfiles).
4. The temporary filename has a decent chance of not conflicting with
temporary filenames generated by concurrent processes.
"""
directory, basename = os.path.split(filename)
return os.path.join(directory, '.%s.tmp-%i' % (basename, os.getpid()))
[docs]def make_dirs(directory, mode=0o777):
"""
Create a directory if it doesn't already exist (keeping concurrency in mind).
:param directory: The pathname of a directory (a string).
:returns: :data:`True` if the directory was created,
:data:`False` if it already existed.
:raises: Any exceptions raised by :func:`os.makedirs()`.
This function is a wrapper for :func:`os.makedirs()` that swallows
:exc:`~exceptions.OSError` in the case of :data:`~errno.EEXIST`.
"""
try:
logger.debug("Trying to create directory (%s) ..", directory)
os.makedirs(directory, mode)
logger.debug("Successfully created directory.")
return True
except OSError as e:
if e.errno == errno.EEXIST:
# The directory already exists.
logger.debug("Directory already exists.")
return False
else:
# Don't swallow errors other than EEXIST because we don't
# want to obscure real problems (e.g. permission denied).
logger.debug("Failed to create directory, propagating exception!")
raise
[docs]def touch(filename):
"""
The equivalent of the touch_ program in Python.
:param filename: The pathname of the file to touch (a string).
This function uses :func:`make_dirs()` to automatically create missing
directory components in `filename`.
.. _touch: https://manpages.debian.org/touch
"""
logger.debug("Touching file: %s", filename)
make_dirs(os.path.dirname(filename))
with open(filename, 'a'):
os.utime(filename, None)
[docs]def write_contents(filename, contents, encoding='UTF-8', mode=None):
"""
Atomically create or update a file's contents.
:param filename: The pathname of the file (a string).
:param contents: The (new) contents of the file (a
byte string or a Unicode string).
:param encoding: The text encoding used to encode `contents`
when it is a Unicode string.
:param mode: The permissions to use when the file doesn't exist yet (a
number like accepted by :func:`os.chmod()` or :data:`None`).
"""
if isinstance(contents, text_type):
contents = codecs.encode(contents, encoding)
with write_file(filename, mode=mode) as handle:
handle.write(contents)
[docs]@contextlib.contextmanager
def write_file(filename, mode=None):
"""
Atomically create or update a file (avoiding partial reads).
:param filename: The pathname of the file (a string).
:param mode: The permissions to use when the file doesn't exist yet (a
number like accepted by :func:`os.chmod()` or :data:`None`).
:returns: A writable file object whose contents will be used to create or
atomically replace `filename`.
"""
timer = Timer()
logger.debug("Preparing to create or atomically replace file (%s) ..", filename)
make_dirs(os.path.dirname(filename))
temporary_file = get_temporary_file(filename)
logger.debug("Opening temporary file for writing (%s) ..", temporary_file)
with open(temporary_file, 'wb') as handle:
yield handle
copy_stat(filename=temporary_file, reference=filename, mode=mode)
logger.debug("Moving new contents into place (%s -> %s) ..", temporary_file, filename)
os.rename(temporary_file, filename)
logger.debug("Took %s to create or replace file.", timer)