#include "Python.h"
#include "pycore_call.h"
#include "pycore_long.h"
#include "pycore_object.h"
#include <stddef.h>
#include "_iomodule.h"
typedef struct {
PyObject_HEAD
PyObject *dict;
PyObject *weakreflist;
} iobase;
PyDoc_STRVAR(iobase_doc,
"The abstract base class for all I/O classes.\n"
"\n"
"This class provides dummy implementations for many methods that\n"
"derived classes can override selectively; the default implementations\n"
"represent a file that cannot be read, written or seeked.\n"
"\n"
"Even though IOBase does not declare read, readinto, or write because\n"
"their signatures will vary, implementations and clients should\n"
"consider those methods part of the interface. Also, implementations\n"
"may raise UnsupportedOperation when operations they do not support are\n"
"called.\n"
"\n"
"The basic type used for binary data read from or written to a file is\n"
"bytes. Other bytes-like objects are accepted as method arguments too.\n"
"In some cases (such as readinto), a writable object is required. Text\n"
"I/O classes work with str data.\n"
"\n"
"Note that calling any method (except additional calls to close(),\n"
"which are ignored) on a closed stream should raise a ValueError.\n"
"\n"
"IOBase (and its subclasses) support the iterator protocol, meaning\n"
"that an IOBase object can be iterated over yielding the lines in a\n"
"stream.\n"
"\n"
"IOBase also supports the :keyword:`with` statement. In this example,\n"
"fp is closed after the suite of the with statement is complete:\n"
"\n"
"with open('spam.txt', 'r') as fp:\n"
" fp.write('Spam and eggs!')\n");
static PyObject *
iobase_unsupported(_PyIO_State *state, const char *message)
{
PyErr_SetString(state->unsupported_operation, message);
return NULL;
}
static PyObject *
_io__IOBase_seek_impl(PyObject *self, PyTypeObject *cls,
int Py_UNUSED(offset), int Py_UNUSED(whence))
{
_PyIO_State *state = get_io_state_by_cls(cls);
return iobase_unsupported(state, "seek");
}
static PyObject *
_io__IOBase_tell_impl(PyObject *self)
{
return _PyObject_CallMethod(self, &_Py_ID(seek), "ii", 0, 1);
}
static PyObject *
_io__IOBase_truncate_impl(PyObject *self, PyTypeObject *cls,
PyObject *Py_UNUSED(size))
{
_PyIO_State *state = get_io_state_by_cls(cls);
return iobase_unsupported(state, "truncate");
}
static int
iobase_is_closed(PyObject *self)
{
PyObject *res;
int ret;
ret = _PyObject_LookupAttr(self, &_Py_ID(__IOBase_closed), &res);
Py_XDECREF(res);
return ret;
}
static PyObject *
_io__IOBase_flush_impl(PyObject *self)
{
int closed = iobase_is_closed(self);
if (!closed) {
Py_RETURN_NONE;
}
if (closed > 0) {
PyErr_SetString(PyExc_ValueError, "I/O operation on closed file.");
}
return NULL;
}
static PyObject *
iobase_closed_get(PyObject *self, void *context)
{
int closed = iobase_is_closed(self);
if (closed < 0) {
return NULL;
}
return PyBool_FromLong(closed);
}
static int
iobase_check_closed(PyObject *self)
{
PyObject *res;
int closed;
closed = _PyObject_LookupAttr(self, &_Py_ID(closed), &res);
if (closed > 0) {
closed = PyObject_IsTrue(res);
Py_DECREF(res);
if (closed > 0) {
PyErr_SetString(PyExc_ValueError, "I/O operation on closed file.");
return -1;
}
}
return closed;
}
PyObject *
_PyIOBase_check_closed(PyObject *self, PyObject *args)
{
if (iobase_check_closed(self)) {
return NULL;
}
if (args == Py_True) {
return Py_None;
}
Py_RETURN_NONE;
}
static PyObject *
iobase_check_seekable(PyObject *self, PyObject *args)
{
_PyIO_State *state = find_io_state_by_def(Py_TYPE(self));
return _PyIOBase_check_seekable(state, self, args);
}
static PyObject *
iobase_check_readable(PyObject *self, PyObject *args)
{
_PyIO_State *state = find_io_state_by_def(Py_TYPE(self));
return _PyIOBase_check_readable(state, self, args);
}
static PyObject *
iobase_check_writable(PyObject *self, PyObject *args)
{
_PyIO_State *state = find_io_state_by_def(Py_TYPE(self));
return _PyIOBase_check_writable(state, self, args);
}
PyObject *
_PyIOBase_cannot_pickle(PyObject *self, PyObject *args)
{
PyErr_Format(PyExc_TypeError,
"cannot pickle '%.100s' instances", _PyType_Name(Py_TYPE(self)));
return NULL;
}
static PyObject *
_io__IOBase_close_impl(PyObject *self)
{
int rc, closed = iobase_is_closed(self);
if (closed < 0) {
return NULL;
}
if (closed) {
Py_RETURN_NONE;
}
PyObject *res = PyObject_CallMethodNoArgs(self, &_Py_ID(flush));
PyObject *exc = PyErr_GetRaisedException();
rc = PyObject_SetAttr(self, &_Py_ID(__IOBase_closed), Py_True);
_PyErr_ChainExceptions1(exc);
if (rc < 0) {
Py_CLEAR(res);
}
if (res == NULL)
return NULL;
Py_DECREF(res);
Py_RETURN_NONE;
}
static void
iobase_finalize(PyObject *self)
{
PyObject *res;
int closed;
PyObject *exc = PyErr_GetRaisedException();
if (_PyObject_LookupAttr(self, &_Py_ID(closed), &res) <= 0) {
PyErr_Clear();
closed = -1;
}
else {
closed = PyObject_IsTrue(res);
Py_DECREF(res);
if (closed == -1)
PyErr_Clear();
}
if (closed == 0) {
if (PyObject_SetAttr(self, &_Py_ID(_finalizing), Py_True))
PyErr_Clear();
res = PyObject_CallMethodNoArgs((PyObject *)self, &_Py_ID(close));
if (res == NULL) {
PyErr_WriteUnraisable(self);
}
else {
Py_DECREF(res);
}
}
PyErr_SetRaisedException(exc);
}
int
_PyIOBase_finalize(PyObject *self)
{
int is_zombie;
is_zombie = (Py_REFCNT(self) == 0);
if (is_zombie)
return PyObject_CallFinalizerFromDealloc(self);
else {
PyObject_CallFinalizer(self);
return 0;
}
}
static int
iobase_traverse(iobase *self, visitproc visit, void *arg)
{
Py_VISIT(Py_TYPE(self));
Py_VISIT(self->dict);
return 0;
}
static int
iobase_clear(iobase *self)
{
Py_CLEAR(self->dict);
return 0;
}
static void
iobase_dealloc(iobase *self)
{
if (_PyIOBase_finalize((PyObject *) self) < 0) {
if (_PyType_HasFeature(Py_TYPE(self), Py_TPFLAGS_HEAPTYPE)) {
Py_INCREF(Py_TYPE(self));
}
return;
}
PyTypeObject *tp = Py_TYPE(self);
_PyObject_GC_UNTRACK(self);
if (self->weakreflist != NULL)
PyObject_ClearWeakRefs((PyObject *) self);
Py_CLEAR(self->dict);
tp->tp_free((PyObject *)self);
Py_DECREF(tp);
}
static PyObject *
_io__IOBase_seekable_impl(PyObject *self)
{
Py_RETURN_FALSE;
}
PyObject *
_PyIOBase_check_seekable(_PyIO_State *state, PyObject *self, PyObject *args)
{
PyObject *res = PyObject_CallMethodNoArgs(self, &_Py_ID(seekable));
if (res == NULL)
return NULL;
if (res != Py_True) {
Py_CLEAR(res);
iobase_unsupported(state, "File or stream is not seekable.");
return NULL;
}
if (args == Py_True) {
Py_DECREF(res);
}
return res;
}
static PyObject *
_io__IOBase_readable_impl(PyObject *self)
{
Py_RETURN_FALSE;
}
PyObject *
_PyIOBase_check_readable(_PyIO_State *state, PyObject *self, PyObject *args)
{
PyObject *res = PyObject_CallMethodNoArgs(self, &_Py_ID(readable));
if (res == NULL)
return NULL;
if (res != Py_True) {
Py_CLEAR(res);
iobase_unsupported(state, "File or stream is not readable.");
return NULL;
}
if (args == Py_True) {
Py_DECREF(res);
}
return res;
}
static PyObject *
_io__IOBase_writable_impl(PyObject *self)
{
Py_RETURN_FALSE;
}
PyObject *
_PyIOBase_check_writable(_PyIO_State *state, PyObject *self, PyObject *args)
{
PyObject *res = PyObject_CallMethodNoArgs(self, &_Py_ID(writable));
if (res == NULL)
return NULL;
if (res != Py_True) {
Py_CLEAR(res);
iobase_unsupported(state, "File or stream is not writable.");
return NULL;
}
if (args == Py_True) {
Py_DECREF(res);
}
return res;
}
static PyObject *
iobase_enter(PyObject *self, PyObject *args)
{
if (iobase_check_closed(self))
return NULL;
return Py_NewRef(self);
}
static PyObject *
iobase_exit(PyObject *self, PyObject *args)
{
return PyObject_CallMethodNoArgs(self, &_Py_ID(close));
}
static PyObject *
_io__IOBase_fileno_impl(PyObject *self, PyTypeObject *cls)
{
_PyIO_State *state = get_io_state_by_cls(cls);
return iobase_unsupported(state, "fileno");
}
static PyObject *
_io__IOBase_isatty_impl(PyObject *self)
{
if (iobase_check_closed(self))
return NULL;
Py_RETURN_FALSE;
}
static PyObject *
_io__IOBase_readline_impl(PyObject *self, Py_ssize_t limit)
{
PyObject *peek, *buffer, *result;
Py_ssize_t old_size = -1;
if (_PyObject_LookupAttr(self, &_Py_ID(peek), &peek) < 0) {
return NULL;
}
buffer = PyByteArray_FromStringAndSize(NULL, 0);
if (buffer == NULL) {
Py_XDECREF(peek);
return NULL;
}
while (limit < 0 || PyByteArray_GET_SIZE(buffer) < limit) {
Py_ssize_t nreadahead = 1;
PyObject *b;
if (peek != NULL) {
PyObject *readahead = PyObject_CallOneArg(peek, _PyLong_GetOne());
if (readahead == NULL) {
if (_PyIO_trap_eintr()) {
continue;
}
goto fail;
}
if (!PyBytes_Check(readahead)) {
PyErr_Format(PyExc_OSError,
"peek() should have returned a bytes object, "
"not '%.200s'", Py_TYPE(readahead)->tp_name);
Py_DECREF(readahead);
goto fail;
}
if (PyBytes_GET_SIZE(readahead) > 0) {
Py_ssize_t n = 0;
const char *buf = PyBytes_AS_STRING(readahead);
if (limit >= 0) {
do {
if (n >= PyBytes_GET_SIZE(readahead) || n >= limit)
break;
if (buf[n++] == '\n')
break;
} while (1);
}
else {
do {
if (n >= PyBytes_GET_SIZE(readahead))
break;
if (buf[n++] == '\n')
break;
} while (1);
}
nreadahead = n;
}
Py_DECREF(readahead);
}
b = _PyObject_CallMethod(self, &_Py_ID(read), "n", nreadahead);
if (b == NULL) {
if (_PyIO_trap_eintr()) {
continue;
}
goto fail;
}
if (!PyBytes_Check(b)) {
PyErr_Format(PyExc_OSError,
"read() should have returned a bytes object, "
"not '%.200s'", Py_TYPE(b)->tp_name);
Py_DECREF(b);
goto fail;
}
if (PyBytes_GET_SIZE(b) == 0) {
Py_DECREF(b);
break;
}
old_size = PyByteArray_GET_SIZE(buffer);
if (PyByteArray_Resize(buffer, old_size + PyBytes_GET_SIZE(b)) < 0) {
Py_DECREF(b);
goto fail;
}
memcpy(PyByteArray_AS_STRING(buffer) + old_size,
PyBytes_AS_STRING(b), PyBytes_GET_SIZE(b));
Py_DECREF(b);
if (PyByteArray_AS_STRING(buffer)[PyByteArray_GET_SIZE(buffer) - 1] == '\n')
break;
}
result = PyBytes_FromStringAndSize(PyByteArray_AS_STRING(buffer),
PyByteArray_GET_SIZE(buffer));
Py_XDECREF(peek);
Py_DECREF(buffer);
return result;
fail:
Py_XDECREF(peek);
Py_DECREF(buffer);
return NULL;
}
static PyObject *
iobase_iter(PyObject *self)
{
if (iobase_check_closed(self))
return NULL;
return Py_NewRef(self);
}
static PyObject *
iobase_iternext(PyObject *self)
{
PyObject *line = PyObject_CallMethodNoArgs(self, &_Py_ID(readline));
if (line == NULL)
return NULL;
if (PyObject_Size(line) <= 0) {
Py_DECREF(line);
return NULL;
}
return line;
}
static PyObject *
_io__IOBase_readlines_impl(PyObject *self, Py_ssize_t hint)
{
Py_ssize_t length = 0;
PyObject *result, *it = NULL;
result = PyList_New(0);
if (result == NULL)
return NULL;
if (hint <= 0) {
PyObject *ret = PyObject_CallMethodObjArgs(result, &_Py_ID(extend),
self, NULL);
if (ret == NULL) {
goto error;
}
Py_DECREF(ret);
return result;
}
it = PyObject_GetIter(self);
if (it == NULL) {
goto error;
}
while (1) {
Py_ssize_t line_length;
PyObject *line = PyIter_Next(it);
if (line == NULL) {
if (PyErr_Occurred()) {
goto error;
}
else
break;
}
if (PyList_Append(result, line) < 0) {
Py_DECREF(line);
goto error;
}
line_length = PyObject_Size(line);
Py_DECREF(line);
if (line_length < 0) {
goto error;
}
if (line_length > hint - length)
break;
length += line_length;
}
Py_DECREF(it);
return result;
error:
Py_XDECREF(it);
Py_DECREF(result);
return NULL;
}
static PyObject *
_io__IOBase_writelines(PyObject *self, PyObject *lines)
{
PyObject *iter, *res;
if (iobase_check_closed(self))
return NULL;
iter = PyObject_GetIter(lines);
if (iter == NULL)
return NULL;
while (1) {
PyObject *line = PyIter_Next(iter);
if (line == NULL) {
if (PyErr_Occurred()) {
Py_DECREF(iter);
return NULL;
}
else
break;
}
res = NULL;
do {
res = PyObject_CallMethodObjArgs(self, &_Py_ID(write), line, NULL);
} while (res == NULL && _PyIO_trap_eintr());
Py_DECREF(line);
if (res == NULL) {
Py_DECREF(iter);
return NULL;
}
Py_DECREF(res);
}
Py_DECREF(iter);
Py_RETURN_NONE;
}
#define clinic_state() (find_io_state_by_def(Py_TYPE(self)))
#include "clinic/iobase.c.h"
#undef clinic_state
static PyMethodDef iobase_methods[] = {
_IO__IOBASE_SEEK_METHODDEF
_IO__IOBASE_TELL_METHODDEF
_IO__IOBASE_TRUNCATE_METHODDEF
_IO__IOBASE_FLUSH_METHODDEF
_IO__IOBASE_CLOSE_METHODDEF
_IO__IOBASE_SEEKABLE_METHODDEF
_IO__IOBASE_READABLE_METHODDEF
_IO__IOBASE_WRITABLE_METHODDEF
{"_checkClosed", _PyIOBase_check_closed, METH_NOARGS},
{"_checkSeekable", iobase_check_seekable, METH_NOARGS},
{"_checkReadable", iobase_check_readable, METH_NOARGS},
{"_checkWritable", iobase_check_writable, METH_NOARGS},
_IO__IOBASE_FILENO_METHODDEF
_IO__IOBASE_ISATTY_METHODDEF
{"__enter__", iobase_enter, METH_NOARGS},
{"__exit__", iobase_exit, METH_VARARGS},
_IO__IOBASE_READLINE_METHODDEF
_IO__IOBASE_READLINES_METHODDEF
_IO__IOBASE_WRITELINES_METHODDEF
{NULL, NULL}
};
static PyGetSetDef iobase_getset[] = {
{"__dict__", PyObject_GenericGetDict, NULL, NULL},
{"closed", (getter)iobase_closed_get, NULL, NULL},
{NULL}
};
static struct PyMemberDef iobase_members[] = {
{"__weaklistoffset__", T_PYSSIZET, offsetof(iobase, weakreflist), READONLY},
{"__dictoffset__", T_PYSSIZET, offsetof(iobase, dict), READONLY},
{NULL},
};
static PyType_Slot iobase_slots[] = {
{Py_tp_dealloc, iobase_dealloc},
{Py_tp_doc, (void *)iobase_doc},
{Py_tp_traverse, iobase_traverse},
{Py_tp_clear, iobase_clear},
{Py_tp_iter, iobase_iter},
{Py_tp_iternext, iobase_iternext},
{Py_tp_methods, iobase_methods},
{Py_tp_members, iobase_members},
{Py_tp_getset, iobase_getset},
{Py_tp_finalize, iobase_finalize},
{0, NULL},
};
PyType_Spec iobase_spec = {
.name = "_io._IOBase",
.basicsize = sizeof(iobase),
.flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC |
Py_TPFLAGS_IMMUTABLETYPE),
.slots = iobase_slots,
};
PyDoc_STRVAR(rawiobase_doc,
"Base class for raw binary I/O.");
static PyObject *
_io__RawIOBase_read_impl(PyObject *self, Py_ssize_t n)
{
PyObject *b, *res;
if (n < 0) {
return PyObject_CallMethodNoArgs(self, &_Py_ID(readall));
}
b = PyByteArray_FromStringAndSize(NULL, n);
if (b == NULL)
return NULL;
res = PyObject_CallMethodObjArgs(self, &_Py_ID(readinto), b, NULL);
if (res == NULL || res == Py_None) {
Py_DECREF(b);
return res;
}
n = PyNumber_AsSsize_t(res, PyExc_ValueError);
Py_DECREF(res);
if (n == -1 && PyErr_Occurred()) {
Py_DECREF(b);
return NULL;
}
res = PyBytes_FromStringAndSize(PyByteArray_AsString(b), n);
Py_DECREF(b);
return res;
}
static PyObject *
_io__RawIOBase_readall_impl(PyObject *self)
{
int r;
PyObject *chunks = PyList_New(0);
PyObject *result;
if (chunks == NULL)
return NULL;
while (1) {
PyObject *data = _PyObject_CallMethod(self, &_Py_ID(read),
"i", DEFAULT_BUFFER_SIZE);
if (!data) {
if (_PyIO_trap_eintr()) {
continue;
}
Py_DECREF(chunks);
return NULL;
}
if (data == Py_None) {
if (PyList_GET_SIZE(chunks) == 0) {
Py_DECREF(chunks);
return data;
}
Py_DECREF(data);
break;
}
if (!PyBytes_Check(data)) {
Py_DECREF(chunks);
Py_DECREF(data);
PyErr_SetString(PyExc_TypeError, "read() should return bytes");
return NULL;
}
if (PyBytes_GET_SIZE(data) == 0) {
Py_DECREF(data);
break;
}
r = PyList_Append(chunks, data);
Py_DECREF(data);
if (r < 0) {
Py_DECREF(chunks);
return NULL;
}
}
result = _PyBytes_Join((PyObject *)&_Py_SINGLETON(bytes_empty), chunks);
Py_DECREF(chunks);
return result;
}
static PyObject *
rawiobase_readinto(PyObject *self, PyObject *args)
{
PyErr_SetNone(PyExc_NotImplementedError);
return NULL;
}
static PyObject *
rawiobase_write(PyObject *self, PyObject *args)
{
PyErr_SetNone(PyExc_NotImplementedError);
return NULL;
}
static PyMethodDef rawiobase_methods[] = {
_IO__RAWIOBASE_READ_METHODDEF
_IO__RAWIOBASE_READALL_METHODDEF
{"readinto", rawiobase_readinto, METH_VARARGS},
{"write", rawiobase_write, METH_VARARGS},
{NULL, NULL}
};
static PyType_Slot rawiobase_slots[] = {
{Py_tp_doc, (void *)rawiobase_doc},
{Py_tp_methods, rawiobase_methods},
{0, NULL},
};
PyType_Spec rawiobase_spec = {
.name = "_io._RawIOBase",
.flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
Py_TPFLAGS_IMMUTABLETYPE),
.slots = rawiobase_slots,
};