"""
gevent internals.
"""
from __future__ import absolute_import, print_function, division
try:
from errno import EBADF
except ImportError:
EBADF = 9
import io
import functools
import sys
import os
from gevent.hub import _get_hub_noargs as get_hub
from gevent._compat import PY2
from gevent._compat import integer_types
from gevent._compat import reraise
from gevent._compat import fspath
from gevent.lock import Semaphore, DummySemaphore
class cancel_wait_ex(IOError):
def __init__(self):
IOError.__init__(
self,
EBADF, 'File descriptor was closed in another greenlet')
class FileObjectClosed(IOError):
def __init__(self):
IOError.__init__(
self,
EBADF, 'Bad file descriptor (FileObject was closed)')
class UniversalNewlineBytesWrapper(io.TextIOWrapper):
"""
Uses TextWrapper to decode universal newlines, but returns the
results as bytes.
This is for Python 2 where the 'rU' mode did that.
"""
mode = None
def __init__(self, fobj, line_buffering):
# latin-1 has the ability to round-trip arbitrary bytes.
io.TextIOWrapper.__init__(self, fobj, encoding='latin-1',
newline=None,
line_buffering=line_buffering)
def read(self, *args, **kwargs):
result = io.TextIOWrapper.read(self, *args, **kwargs)
return result.encode('latin-1')
def readline(self, limit=-1):
result = io.TextIOWrapper.readline(self, limit)
return result.encode('latin-1')
def __iter__(self):
# readlines() is implemented in terms of __iter__
# and TextIOWrapper.__iter__ checks that readline returns
# a unicode object, which we don't, so we override
return self
def __next__(self):
line = self.readline()
if not line:
raise StopIteration
return line
next = __next__
class FlushingBufferedWriter(io.BufferedWriter):
def write(self, b):
ret = io.BufferedWriter.write(self, b)
self.flush()
return ret
class WriteallMixin(object):
def writeall(self, value):
"""
Similar to :meth:`socket.socket.sendall`, ensures that all the contents of
*value* have been written (though not necessarily flushed) before returning.
Returns the length of *value*.
.. versionadded:: 20.12.0
"""
# Do we need to play the same get_memory games we do with sockets?
# And what about chunking for large values? See _socketcommon.py
write = super(WriteallMixin, self).write
total = len(value)
while value:
l = len(value)
w = write(value)
if w == l:
break
value = value[w:]
return total
class FileIO(io.FileIO):
"""A subclass that we can dynamically assign __class__ for."""
__slots__ = ()
class WriteIsWriteallMixin(WriteallMixin):
def write(self, value):
return self.writeall(value)
class WriteallFileIO(WriteIsWriteallMixin, io.FileIO):
pass
class OpenDescriptor(object): # pylint:disable=too-many-instance-attributes
"""
Interprets the arguments to `open`. Internal use only.
Originally based on code in the stdlib's _pyio.py (Python implementation of
the :mod:`io` module), but modified for gevent:
- Native strings are returned on Python 2 when neither
'b' nor 't' are in the mode string and no encoding is specified.
- Universal newlines work in that mode.
- Allows externally unbuffered text IO.
:keyword bool atomic_write: If true, then if the opened, wrapped, stream
is unbuffered (meaning that ``write`` can produce short writes and the return
value needs to be checked), then the implementation will be adjusted so that
``write`` behaves like Python 2 on a built-in file object and writes the
entire value. Only set this on Python 2; the only intended user is
:class:`gevent.subprocess.Popen`.
"""
@staticmethod
def _collapse_arg(pref_name, preferred_val, old_name, old_val, default):
# We could play tricks with the callers ``locals()`` to avoid having to specify
# the name (which we only use for error handling) but ``locals()`` may be slow and
# inhibit JIT (on PyPy), so we just write it out long hand.
if preferred_val is not None and old_val is not None:
raise TypeError("Cannot specify both %s=%s and %s=%s" % (
pref_name, preferred_val,
old_name, old_val
))
if preferred_val is None and old_val is None:
return default
return preferred_val if preferred_val is not None else old_val
def __init__(self, fobj, mode='r', bufsize=None, close=None,
encoding=None, errors=None, newline=None,
buffering=None, closefd=None,
atomic_write=False):
# Based on code in the stdlib's _pyio.py from 3.8.
# pylint:disable=too-many-locals,too-many-branches,too-many-statements
closefd = self._collapse_arg('closefd', closefd, 'close', close, True)
del close
buffering = self._collapse_arg('buffering', buffering, 'bufsize', bufsize, -1)
del bufsize
if not hasattr(fobj, 'fileno'):
if not isinstance(fobj, integer_types):
# Not a fd. Support PathLike on Python 2 and Python <= 3.5.
fobj = fspath(fobj)
if not isinstance(fobj, (str, bytes) + integer_types): # pragma: no cover
raise TypeError("invalid file: %r" % fobj)
if isinstance(fobj, (str, bytes)):
closefd = True
if not isinstance(mode, str):
raise TypeError("invalid mode: %r" % mode)
if not isinstance(buffering, integer_types):
raise TypeError("invalid buffering: %r" % buffering)
if encoding is not None and not isinstance(encoding, str):
raise TypeError("invalid encoding: %r" % encoding)
if errors is not None and not isinstance(errors, str):
raise TypeError("invalid errors: %r" % errors)
modes = set(mode)
if modes - set("axrwb+tU") or len(mode) > len(modes):
raise ValueError("invalid mode: %r" % mode)
creating = "x" in modes
reading = "r" in modes
writing = "w" in modes
appending = "a" in modes
updating = "+" in modes
text = "t" in modes
binary = "b" in modes
universal = 'U' in modes
can_write = creating or writing or appending or updating
if universal:
if can_write:
raise ValueError("mode U cannot be combined with 'x', 'w', 'a', or '+'")
# Just because the stdlib deprecates this, no need for us to do so as well.
# Especially not while we still support Python 2.
# import warnings
# warnings.warn("'U' mode is deprecated",
# DeprecationWarning, 4)
reading = True
if text and binary:
raise ValueError("can't have text and binary mode at once")
if creating + reading + writing + appending > 1:
raise ValueError("can't have read/write/append mode at once")
if not (creating or reading or writing or appending):
raise ValueError("must have exactly one of read/write/append mode")
if binary and encoding is not None:
raise ValueError("binary mode doesn't take an encoding argument")
if binary and errors is not None:
raise ValueError("binary mode doesn't take an errors argument")
if binary and newline is not None:
raise ValueError("binary mode doesn't take a newline argument")
if binary and buffering == 1:
import warnings
warnings.warn("line buffering (buffering=1) isn't supported in binary "
"mode, the default buffer size will be used",
RuntimeWarning, 4)
self._fobj = fobj
self.fileio_mode = (
(creating and "x" or "")
+ (reading and "r" or "")
+ (writing and "w" or "")
+ (appending and "a" or "")
+ (updating and "+" or "")
)
self.mode = self.fileio_mode + ('t' if text else '') + ('b' if binary else '')
self.creating = creating
self.reading = reading
self.writing = writing
self.appending = appending
self.updating = updating
self.text = text
self.binary = binary
self.can_write = can_write
self.can_read = reading or updating
self.native = (
not self.text and not self.binary # Neither t nor b given.
and not encoding and not errors # And no encoding or error handling either.
)
self.universal = universal
self.buffering = buffering
self.encoding = encoding
self.errors = errors
self.newline = newline
self.closefd = closefd
self.atomic_write = atomic_write
default_buffer_size = io.DEFAULT_BUFFER_SIZE
_opened = None
_opened_raw = None
def is_fd(self):
return isinstance(self._fobj, integer_types)
def opened(self):
"""
Return the :meth:`wrapped` file object.
"""
if self._opened is None:
raw = self.opened_raw()
try:
self._opened = self.__wrapped(raw)
except:
# XXX: This might be a bug? Could we wind up closing
# something we shouldn't close?
raw.close()
raise
return self._opened
def _raw_object_is_new(self, raw):
return self._fobj is not raw
def opened_raw(self):
if self._opened_raw is None:
self._opened_raw = self._do_open_raw()
return self._opened_raw
def _do_open_raw(self):
if hasattr(self._fobj, 'fileno'):
return self._fobj
# io.FileIO doesn't allow assigning to its __class__,
# and we can't know for sure here whether we need the atomic write()
# method or not (it depends on the layers on top of us),
# so we use a subclass that *does* allow assigning.
return FileIO(self._fobj, self.fileio_mode, self.closefd)
@staticmethod
def is_buffered(stream):
return (
# buffering happens internally in the text codecs
isinstance(stream, (io.BufferedIOBase, io.TextIOBase))
or (hasattr(stream, 'buffer') and stream.buffer is not None)
)
@classmethod
def buffer_size_for_stream(cls, stream):
result = cls.default_buffer_size
try:
bs = os.fstat(stream.fileno()).st_blksize
except (OSError, AttributeError):
pass
else:
if bs > 1:
result = bs
return result
def __buffered(self, stream, buffering):
if self.updating:
Buffer = io.BufferedRandom
elif self.creating or self.writing or self.appending:
Buffer = io.BufferedWriter
elif self.reading:
Buffer = io.BufferedReader
else: # prgama: no cover
raise ValueError("unknown mode: %r" % self.mode)
try:
result = Buffer(stream, buffering)
except AttributeError:
# Python 2 file() objects don't have the readable/writable
# attributes. But they handle their own buffering.
result = stream
return result
def _make_atomic_write(self, result, raw):
# The idea was to swizzle the class with one that defines
# write() to call writeall(). This avoids setting any
# attribute on the return object, avoids an additional layer
# of proxying, and avoids any reference cycles (if setting a
# method on the object).
#
# However, this is not possible with the built-in io classes
# (static types defined in C cannot have __class__ assigned).
# Fortunately, we need this only for the specific case of
# opening a file descriptor (subprocess.py) on Python 2, in
# which we fully control the types involved.
#
# So rather than attempt that, we only implement exactly what we need.
if result is not raw or self._raw_object_is_new(raw):
if result.__class__ is FileIO:
result.__class__ = WriteallFileIO
else: # pragma: no cover
raise NotImplementedError(
"Don't know how to make %s have atomic write. "
"Please open a gevent issue with your use-case." % (
result
)
)
return result
def __wrapped(self, raw):
"""
Wraps the raw IO object (`RawIOBase` or `io.TextIOBase`) in
buffers, text decoding, and newline handling.
"""
if self.binary and isinstance(raw, io.TextIOBase):
# Can't do it. The TextIO object will have its own buffer, and
# trying to read from the raw stream or the buffer without going through
# the TextIO object is likely to lead to problems with the codec.
raise ValueError("Unable to perform binary IO on top of text IO stream")
result = raw
buffering = self.buffering
line_buffering = False
if buffering == 1 or buffering < 0 and raw.isatty():
buffering = -1
line_buffering = True
if buffering < 0:
buffering = self.buffer_size_for_stream(result)
if buffering < 0: # pragma: no cover
raise ValueError("invalid buffering size")
if buffering != 0 and not self.is_buffered(result):
# Need to wrap our own buffering around it. If it
# is already buffered, don't do so.
result = self.__buffered(result, buffering)
if not self.binary:
# Either native or text at this point.
if PY2 and self.native:
# Neither text mode nor binary mode specified.
if self.universal:
# universal was requested, e.g., 'rU'
result = UniversalNewlineBytesWrapper(result, line_buffering)
else:
# Python 2 and text mode, or Python 3 and either text or native (both are the same)
if not isinstance(raw, io.TextIOBase):
# Avoid double-wrapping a TextIOBase in another TextIOWrapper.
# That tends not to work. See https://github.com/gevent/gevent/issues/1542
result = io.TextIOWrapper(result, self.encoding, self.errors, self.newline,
line_buffering)
if result is not raw or self._raw_object_is_new(raw):
# Set the mode, if possible, but only if we created a new
# object.
try:
result.mode = self.mode
except (AttributeError, TypeError):
# AttributeError: No such attribute
# TypeError: Readonly attribute (py2)
pass
if (
self.atomic_write
and not self.is_buffered(result)
and not isinstance(result, WriteIsWriteallMixin)
):
# Let subclasses have a say in how they make this atomic, and
# whether or not they do so even if we're actually returning the raw object.
result = self._make_atomic_write(result, raw)
return result
class _ClosedIO(object):
# Used for FileObjectBase._io when FOB.close()
# is called. Lets us drop references to ``_io``
# for GC/resource cleanup reasons, but keeps some useful
# information around.
__slots__ = ('name',)
def __init__(self, io_obj):
try:
self.name = io_obj.name
except AttributeError:
pass
def __getattr__(self, name):
if name == 'name':
# We didn't set it in __init__ because there wasn't one
raise AttributeError
raise FileObjectClosed
def __bool__(self):
return False
__nonzero__ = __bool__
class FileObjectBase(object):
"""
Internal base class to ensure a level of consistency
between :class:`~.FileObjectPosix`, :class:`~.FileObjectThread`
and :class:`~.FileObjectBlock`.
"""
# List of methods we delegate to the wrapping IO object, if they
# implement them and we do not.
_delegate_methods = (
# General methods
'flush',
'fileno',
'writable',
'readable',
'seek',
'seekable',
'tell',
# Read
'read',
'readline',
'readlines',
'read1',
# Write.
# Note that we do not extend WriteallMixin,
# so writeall will be copied, if it exists, and
# wrapped.
'write',
'writeall',
'writelines',
'truncate',
)
_io = None
def __init__(self, descriptor):
# type: (OpenDescriptor) -> None
self._io = descriptor.opened()
# We don't actually use this property ourself, but we save it (and
# pass it along) for compatibility.
self._close = descriptor.closefd
self._do_delegate_methods()
io = property(lambda s: s._io,
# Historically we either hand-wrote all the delegation methods
# to use self.io, or we simply used __getattr__ to look them up at
# runtime. This meant people could change the io attribute on the fly
# and it would mostly work (subprocess.py used to do that). We don't recommend
# that, but we still support it.
lambda s, nv: setattr(s, '_io', nv) or s._do_delegate_methods())
def _do_delegate_methods(self):
for meth_name in self._delegate_methods:
meth = getattr(self._io, meth_name, None)
implemented_by_class = hasattr(type(self), meth_name)
if meth and not implemented_by_class:
setattr(self, meth_name, self._wrap_method(meth))
elif hasattr(self, meth_name) and not implemented_by_class:
delattr(self, meth_name)
def _wrap_method(self, method):
"""
Wrap a method we're copying into our dictionary from the underlying
io object to do something special or different, if necessary.
"""
return method
@property
def closed(self):
"""True if the file is closed"""
return isinstance(self._io, _ClosedIO)
def close(self):
if isinstance(self._io, _ClosedIO):
return
fobj = self._io
self._io = _ClosedIO(self._io)
try:
self._do_close(fobj, self._close)
finally:
fobj = None
# Remove delegate methods to drop remaining references to
# _io.
d = self.__dict__
for meth_name in self._delegate_methods:
d.pop(meth_name, None)
def _do_close(self, fobj, closefd):
raise NotImplementedError()
def __getattr__(self, name):
return getattr(self._io, name)
def __repr__(self):
return '<%s at 0x%x %s_fobj=%r%s>' % (
self.__class__.__name__,
id(self),
'closed' if self.closed else '',
self.io,
self._extra_repr()
)
def _extra_repr(self):
return ''
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def __iter__(self):
return self
def __next__(self):
line = self.readline()
if not line:
raise StopIteration
return line
next = __next__
def __bool__(self):
return True
__nonzero__ = __bool__
[docs]class FileObjectBlock(FileObjectBase):
"""
FileObjectBlock()
A simple synchronous wrapper around a file object.
Adds no concurrency or gevent compatibility.
"""
def __init__(self, fobj, *args, **kwargs):
descriptor = OpenDescriptor(fobj, *args, **kwargs)
FileObjectBase.__init__(self, descriptor)
def _do_close(self, fobj, closefd):
fobj.close()
[docs]class FileObjectThread(FileObjectBase):
"""
FileObjectThread()
A file-like object wrapping another file-like object, performing all blocking
operations on that object in a background thread.
.. caution::
Attempting to change the threadpool or lock of an existing FileObjectThread
has undefined consequences.
.. versionchanged:: 1.1b1
The file object is closed using the threadpool. Note that whether or
not this action is synchronous or asynchronous is not documented.
"""
def __init__(self, *args, **kwargs):
"""
:keyword bool lock: If True (the default) then all operations will
be performed one-by-one. Note that this does not guarantee that, if using
this file object from multiple threads/greenlets, operations will be performed
in any particular order, only that no two operations will be attempted at the
same time. You can also pass your own :class:`gevent.lock.Semaphore` to synchronize
file operations with an external resource.
:keyword bool closefd: If True (the default) then when this object is closed,
the underlying object is closed as well. If *fobj* is a path, then
*closefd* must be True.
"""
lock = kwargs.pop('lock', True)
threadpool = kwargs.pop('threadpool', None)
descriptor = OpenDescriptor(*args, **kwargs)
self.threadpool = threadpool or get_hub().threadpool
self.lock = lock
if self.lock is True:
self.lock = Semaphore()
elif not self.lock:
self.lock = DummySemaphore()
if not hasattr(self.lock, '__enter__'):
raise TypeError('Expected a Semaphore or boolean, got %r' % type(self.lock))
self.__io_holder = [descriptor.opened()] # signal for _wrap_method
FileObjectBase.__init__(self, descriptor)
def _do_close(self, fobj, closefd):
self.__io_holder[0] = None # for _wrap_method
try:
with self.lock:
self.threadpool.apply(fobj.flush)
finally:
if closefd:
# Note that we're not taking the lock; older code
# did fobj.close() without going through the threadpool at all,
# so acquiring the lock could potentially introduce deadlocks
# that weren't present before. Avoiding the lock doesn't make
# the existing race condition any worse.
# We wrap the close in an exception handler and re-raise directly
# to avoid the (common, expected) IOError from being logged by the pool
def close(_fobj=fobj):
try:
_fobj.close()
except: # pylint:disable=bare-except
return sys.exc_info()
finally:
_fobj = None
del fobj
exc_info = self.threadpool.apply(close)
del close
if exc_info:
reraise(*exc_info)
def _do_delegate_methods(self):
FileObjectBase._do_delegate_methods(self)
self.__io_holder[0] = self._io
def _extra_repr(self):
return ' threadpool=%r' % (self.threadpool,)
def _wrap_method(self, method):
# NOTE: We are careful to avoid introducing a refcycle
# within self. Our wrapper cannot refer to self.
io_holder = self.__io_holder
lock = self.lock
threadpool = self.threadpool
@functools.wraps(method)
def thread_method(*args, **kwargs):
if io_holder[0] is None:
# This is different than FileObjectPosix, etc,
# because we want to save the expensive trip through
# the threadpool.
raise FileObjectClosed
with lock:
return threadpool.apply(method, args, kwargs)
return thread_method