#include "Python.h"
#include "pycore_call.h"
#include "pycore_runtime.h"
#if defined(HAVE_GETC_UNLOCKED) && !defined(_Py_MEMORY_SANITIZER)
#define GETC(f) getc_unlocked(f)
#define FLOCKFILE(f) flockfile(f)
#define FUNLOCKFILE(f) funlockfile(f)
#else
#define GETC(f) getc(f)
#define FLOCKFILE(f)
#define FUNLOCKFILE(f)
#endif
#define NEWLINE_UNKNOWN 0
#define NEWLINE_CR 1
#define NEWLINE_LF 2
#define NEWLINE_CRLF 4
#ifdef __cplusplus
extern "C" {
#endif
PyObject *
PyFile_FromFd(int fd, const char *name, const char *mode, int buffering, const char *encoding,
const char *errors, const char *newline, int closefd)
{
PyObject *open, *stream;
open = _PyImport_GetModuleAttrString("_io", "open");
if (open == NULL)
return NULL;
stream = PyObject_CallFunction(open, "isisssO", fd, mode,
buffering, encoding, errors,
newline, closefd ? Py_True : Py_False);
Py_DECREF(open);
if (stream == NULL)
return NULL;
return stream;
}
PyObject *
PyFile_GetLine(PyObject *f, int n)
{
PyObject *result;
if (f == NULL) {
PyErr_BadInternalCall();
return NULL;
}
if (n <= 0) {
result = PyObject_CallMethodNoArgs(f, &_Py_ID(readline));
}
else {
result = _PyObject_CallMethod(f, &_Py_ID(readline), "i", n);
}
if (result != NULL && !PyBytes_Check(result) &&
!PyUnicode_Check(result)) {
Py_SETREF(result, NULL);
PyErr_SetString(PyExc_TypeError,
"object.readline() returned non-string");
}
if (n < 0 && result != NULL && PyBytes_Check(result)) {
const char *s = PyBytes_AS_STRING(result);
Py_ssize_t len = PyBytes_GET_SIZE(result);
if (len == 0) {
Py_SETREF(result, NULL);
PyErr_SetString(PyExc_EOFError,
"EOF when reading a line");
}
else if (s[len-1] == '\n') {
if (Py_REFCNT(result) == 1)
_PyBytes_Resize(&result, len-1);
else {
PyObject *v;
v = PyBytes_FromStringAndSize(s, len-1);
Py_SETREF(result, v);
}
}
}
if (n < 0 && result != NULL && PyUnicode_Check(result)) {
Py_ssize_t len = PyUnicode_GET_LENGTH(result);
if (len == 0) {
Py_SETREF(result, NULL);
PyErr_SetString(PyExc_EOFError,
"EOF when reading a line");
}
else if (PyUnicode_READ_CHAR(result, len-1) == '\n') {
PyObject *v;
v = PyUnicode_Substring(result, 0, len-1);
Py_SETREF(result, v);
}
}
return result;
}
int
PyFile_WriteObject(PyObject *v, PyObject *f, int flags)
{
PyObject *writer, *value, *result;
if (f == NULL) {
PyErr_SetString(PyExc_TypeError, "writeobject with NULL file");
return -1;
}
writer = PyObject_GetAttr(f, &_Py_ID(write));
if (writer == NULL)
return -1;
if (flags & Py_PRINT_RAW) {
value = PyObject_Str(v);
}
else
value = PyObject_Repr(v);
if (value == NULL) {
Py_DECREF(writer);
return -1;
}
result = PyObject_CallOneArg(writer, value);
Py_DECREF(value);
Py_DECREF(writer);
if (result == NULL)
return -1;
Py_DECREF(result);
return 0;
}
int
PyFile_WriteString(const char *s, PyObject *f)
{
if (f == NULL) {
if (!PyErr_Occurred())
PyErr_SetString(PyExc_SystemError,
"null file for PyFile_WriteString");
return -1;
}
else if (!PyErr_Occurred()) {
PyObject *v = PyUnicode_FromString(s);
int err;
if (v == NULL)
return -1;
err = PyFile_WriteObject(v, f, Py_PRINT_RAW);
Py_DECREF(v);
return err;
}
else
return -1;
}
int
PyObject_AsFileDescriptor(PyObject *o)
{
int fd;
PyObject *meth;
if (PyLong_Check(o)) {
fd = _PyLong_AsInt(o);
}
else if (_PyObject_LookupAttr(o, &_Py_ID(fileno), &meth) < 0) {
return -1;
}
else if (meth != NULL) {
PyObject *fno = _PyObject_CallNoArgs(meth);
Py_DECREF(meth);
if (fno == NULL)
return -1;
if (PyLong_Check(fno)) {
fd = _PyLong_AsInt(fno);
Py_DECREF(fno);
}
else {
PyErr_SetString(PyExc_TypeError,
"fileno() returned a non-integer");
Py_DECREF(fno);
return -1;
}
}
else {
PyErr_SetString(PyExc_TypeError,
"argument must be an int, or have a fileno() method.");
return -1;
}
if (fd == -1 && PyErr_Occurred())
return -1;
if (fd < 0) {
PyErr_Format(PyExc_ValueError,
"file descriptor cannot be a negative integer (%i)",
fd);
return -1;
}
return fd;
}
int
_PyLong_FileDescriptor_Converter(PyObject *o, void *ptr)
{
int fd = PyObject_AsFileDescriptor(o);
if (fd == -1) {
return 0;
}
*(int *)ptr = fd;
return 1;
}
char *
_Py_UniversalNewlineFgetsWithSize(char *buf, int n, FILE *stream, PyObject *fobj, size_t* size)
{
char *p = buf;
int c;
if (fobj) {
errno = ENXIO;
return NULL;
}
FLOCKFILE(stream);
while (--n > 0 && (c = GETC(stream)) != EOF ) {
if (c == '\r') {
c = GETC(stream);
if (c != '\n') {
ungetc(c, stream);
c = '\n';
}
}
*p++ = c;
if (c == '\n') {
break;
}
}
FUNLOCKFILE(stream);
*p = '\0';
if (p == buf) {
return NULL;
}
*size = p - buf;
return buf;
}
char *
Py_UniversalNewlineFgets(char *buf, int n, FILE *stream, PyObject *fobj) {
size_t size;
return _Py_UniversalNewlineFgetsWithSize(buf, n, stream, fobj, &size);
}
typedef struct {
PyObject_HEAD
int fd;
} PyStdPrinter_Object;
PyObject *
PyFile_NewStdPrinter(int fd)
{
PyStdPrinter_Object *self;
if (fd != fileno(stdout) && fd != fileno(stderr)) {
return NULL;
}
self = PyObject_New(PyStdPrinter_Object,
&PyStdPrinter_Type);
if (self != NULL) {
self->fd = fd;
}
return (PyObject*)self;
}
static PyObject *
stdprinter_write(PyStdPrinter_Object *self, PyObject *args)
{
PyObject *unicode;
PyObject *bytes = NULL;
const char *str;
Py_ssize_t n;
int err;
assert(!PyErr_Occurred());
if (self->fd < 0) {
Py_RETURN_NONE;
}
if (!PyArg_ParseTuple(args, "U", &unicode)) {
return NULL;
}
str = PyUnicode_AsUTF8AndSize(unicode, &n);
if (str == NULL) {
PyErr_Clear();
bytes = _PyUnicode_AsUTF8String(unicode, "backslashreplace");
if (bytes == NULL)
return NULL;
str = PyBytes_AS_STRING(bytes);
n = PyBytes_GET_SIZE(bytes);
}
n = _Py_write(self->fd, str, n);
err = errno;
Py_XDECREF(bytes);
if (n == -1) {
if (err == EAGAIN) {
PyErr_Clear();
Py_RETURN_NONE;
}
return NULL;
}
return PyLong_FromSsize_t(n);
}
static PyObject *
stdprinter_fileno(PyStdPrinter_Object *self, PyObject *Py_UNUSED(ignored))
{
return PyLong_FromLong((long) self->fd);
}
static PyObject *
stdprinter_repr(PyStdPrinter_Object *self)
{
return PyUnicode_FromFormat("<stdprinter(fd=%d) object at %p>",
self->fd, self);
}
static PyObject *
stdprinter_noop(PyStdPrinter_Object *self, PyObject *Py_UNUSED(ignored))
{
Py_RETURN_NONE;
}
static PyObject *
stdprinter_isatty(PyStdPrinter_Object *self, PyObject *Py_UNUSED(ignored))
{
long res;
if (self->fd < 0) {
Py_RETURN_FALSE;
}
Py_BEGIN_ALLOW_THREADS
res = isatty(self->fd);
Py_END_ALLOW_THREADS
return PyBool_FromLong(res);
}
static PyMethodDef stdprinter_methods[] = {
{"close", (PyCFunction)stdprinter_noop, METH_NOARGS, ""},
{"flush", (PyCFunction)stdprinter_noop, METH_NOARGS, ""},
{"fileno", (PyCFunction)stdprinter_fileno, METH_NOARGS, ""},
{"isatty", (PyCFunction)stdprinter_isatty, METH_NOARGS, ""},
{"write", (PyCFunction)stdprinter_write, METH_VARARGS, ""},
{NULL, NULL}
};
static PyObject *
get_closed(PyStdPrinter_Object *self, void *closure)
{
Py_RETURN_FALSE;
}
static PyObject *
get_mode(PyStdPrinter_Object *self, void *closure)
{
return PyUnicode_FromString("w");
}
static PyObject *
get_encoding(PyStdPrinter_Object *self, void *closure)
{
Py_RETURN_NONE;
}
static PyGetSetDef stdprinter_getsetlist[] = {
{"closed", (getter)get_closed, NULL, "True if the file is closed"},
{"encoding", (getter)get_encoding, NULL, "Encoding of the file"},
{"mode", (getter)get_mode, NULL, "String giving the file mode"},
{0},
};
PyTypeObject PyStdPrinter_Type = {
PyVarObject_HEAD_INIT(&PyType_Type, 0)
"stderrprinter",
sizeof(PyStdPrinter_Object),
0,
0,
0,
0,
0,
0,
(reprfunc)stdprinter_repr,
0,
0,
0,
0,
0,
0,
PyObject_GenericGetAttr,
0,
0,
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_DISALLOW_INSTANTIATION,
0,
0,
0,
0,
0,
0,
0,
stdprinter_methods,
0,
stdprinter_getsetlist,
0,
0,
0,
0,
0,
0,
PyType_GenericAlloc,
0,
PyObject_Del,
};
int
PyFile_SetOpenCodeHook(Py_OpenCodeHookFunction hook, void *userData) {
if (Py_IsInitialized() &&
PySys_Audit("setopencodehook", NULL) < 0) {
return -1;
}
if (_PyRuntime.open_code_hook) {
if (Py_IsInitialized()) {
PyErr_SetString(PyExc_SystemError,
"failed to change existing open_code hook");
}
return -1;
}
_PyRuntime.open_code_hook = hook;
_PyRuntime.open_code_userdata = userData;
return 0;
}
PyObject *
PyFile_OpenCodeObject(PyObject *path)
{
PyObject *f = NULL;
if (!PyUnicode_Check(path)) {
PyErr_Format(PyExc_TypeError, "'path' must be 'str', not '%.200s'",
Py_TYPE(path)->tp_name);
return NULL;
}
Py_OpenCodeHookFunction hook = _PyRuntime.open_code_hook;
if (hook) {
f = hook(path, _PyRuntime.open_code_userdata);
} else {
PyObject *open = _PyImport_GetModuleAttrString("_io", "open");
if (open) {
f = PyObject_CallFunction(open, "Os", path, "rb");
Py_DECREF(open);
}
}
return f;
}
PyObject *
PyFile_OpenCode(const char *utf8path)
{
PyObject *pathobj = PyUnicode_FromString(utf8path);
PyObject *f;
if (!pathobj) {
return NULL;
}
f = PyFile_OpenCodeObject(pathobj);
Py_DECREF(pathobj);
return f;
}
#ifdef __cplusplus
}
#endif