/* API for managing interactions between isolated interpreters */
#include "Python.h"
#include "marshal.h" // PyMarshal_WriteObjectToString()
#include "osdefs.h" // MAXPATHLEN
#include "pycore_ceval.h" // _Py_simple_func
#include "pycore_crossinterp.h" // _PyXIData_t
#include "pycore_function.h" // _PyFunction_VerifyStateless()
#include "pycore_global_strings.h" // _Py_ID()
#include "pycore_import.h" // _PyImport_SetModule()
#include "pycore_initconfig.h" // _PyStatus_OK()
#include "pycore_namespace.h" // _PyNamespace_New()
#include "pycore_pythonrun.h" // _Py_SourceAsString()
#include "pycore_runtime.h" // _PyRuntime
#include "pycore_setobject.h" // _PySet_NextEntry()
#include "pycore_typeobject.h" // _PyStaticType_InitBuiltin()
static Py_ssize_t
_Py_GetMainfile(char *buffer, size_t maxlen)
{
// We don't expect subinterpreters to have the __main__ module's
// __name__ set, but proceed just in case.
PyThreadState *tstate = _PyThreadState_GET();
PyObject *module = _Py_GetMainModule(tstate);
if (_Py_CheckMainModule(module) < 0) {
Py_XDECREF(module);
return -1;
}
Py_ssize_t size = _PyModule_GetFilenameUTF8(module, buffer, maxlen);
Py_DECREF(module);
return size;
}
static PyObject *
runpy_run_path(const char *filename, const char *modname)
{
PyObject *run_path = PyImport_ImportModuleAttrString("runpy", "run_path");
if (run_path == NULL) {
return NULL;
}
PyObject *args = Py_BuildValue("(sOs)", filename, Py_None, modname);
if (args == NULL) {
Py_DECREF(run_path);
return NULL;
}
PyObject *ns = PyObject_Call(run_path, args, NULL);
Py_DECREF(run_path);
Py_DECREF(args);
return ns;
}
static void
set_exc_with_cause(PyObject *exctype, const char *msg)
{
PyObject *cause = PyErr_GetRaisedException();
PyErr_SetString(exctype, msg);
PyObject *exc = PyErr_GetRaisedException();
PyException_SetCause(exc, cause);
PyErr_SetRaisedException(exc);
}
/****************************/
/* module duplication utils */
/****************************/
struct sync_module_result {
PyObject *module;
PyObject *loaded;
PyObject *failed;
};
struct sync_module {
const char *filename;
char _filename[MAXPATHLEN+1];
struct sync_module_result cached;
};
static void
sync_module_clear(struct sync_module *data)
{
data->filename = NULL;
Py_CLEAR(data->cached.module);
Py_CLEAR(data->cached.loaded);
Py_CLEAR(data->cached.failed);
}
static void
sync_module_capture_exc(PyThreadState *tstate, struct sync_module *data)
{
assert(_PyErr_Occurred(tstate));
PyObject *context = data->cached.failed;
PyObject *exc = _PyErr_GetRaisedException(tstate);
_PyErr_SetRaisedException(tstate, Py_NewRef(exc));
if (context != NULL) {
PyException_SetContext(exc, context);
}
data->cached.failed = exc;
}
static int
ensure_isolated_main(PyThreadState *tstate, struct sync_module *main)
{
// Load the module from the original file (or from a cache).
// First try the local cache.
if (main->cached.failed != NULL) {
// We'll deal with this in apply_isolated_main().
assert(main->cached.module == NULL);
assert(main->cached.loaded == NULL);
return 0;
}
else if (main->cached.loaded != NULL) {
assert(main->cached.module != NULL);
return 0;
}
assert(main->cached.module == NULL);
if (main->filename == NULL) {
_PyErr_SetString(tstate, PyExc_NotImplementedError, "");
return -1;
}
// It wasn't in the local cache so we'll need to populate it.
PyObject *mod = _Py_GetMainModule(tstate);
if (_Py_CheckMainModule(mod) < 0) {
// This is probably unrecoverable, so don't bother caching the error.
assert(_PyErr_Occurred(tstate));
Py_XDECREF(mod);
return -1;
}
PyObject *loaded = NULL;
// Try the per-interpreter cache for the loaded module.
// XXX Store it in sys.modules?
PyObject *interpns = PyInterpreterState_GetDict(tstate->interp);
assert(interpns != NULL);
PyObject *key = PyUnicode_FromString("CACHED_MODULE_NS___main__");
if (key == NULL) {
// It's probably unrecoverable, so don't bother caching the error.
Py_DECREF(mod);
return -1;
}
else if (PyDict_GetItemRef(interpns, key, &loaded) < 0) {
// It's probably unrecoverable, so don't bother caching the error.
Py_DECREF(mod);
Py_DECREF(key);
return -1;
}
else if (loaded == NULL) {
// It wasn't already loaded from file.
loaded = PyModule_NewObject(&_Py_ID(__main__));
if (loaded == NULL) {
goto error;
}
PyObject *ns = _PyModule_GetDict(loaded);
// We don't want to trigger "if __name__ == '__main__':",
// so we use a bogus module name.
PyObject *loaded_ns =
runpy_run_path(main->filename, "");
if (loaded_ns == NULL) {
goto error;
}
int res = PyDict_Update(ns, loaded_ns);
Py_DECREF(loaded_ns);
if (res < 0) {
goto error;
}
// Set the per-interpreter cache entry.
if (PyDict_SetItem(interpns, key, loaded) < 0) {
goto error;
}
}
Py_DECREF(key);
main->cached = (struct sync_module_result){
.module = mod,
.loaded = loaded,
};
return 0;
error:
sync_module_capture_exc(tstate, main);
Py_XDECREF(loaded);
Py_DECREF(mod);
Py_XDECREF(key);
return -1;
}
#ifndef NDEBUG
static int
main_mod_matches(PyObject *expected)
{
PyObject *mod = PyImport_GetModule(&_Py_ID(__main__));
Py_XDECREF(mod);
return mod == expected;
}
#endif
static int
apply_isolated_main(PyThreadState *tstate, struct sync_module *main)
{
assert((main->cached.loaded == NULL) == (main->cached.loaded == NULL));
if (main->cached.failed != NULL) {
// It must have failed previously.
assert(main->cached.loaded == NULL);
_PyErr_SetRaisedException(tstate, main->cached.failed);
return -1;
}
assert(main->cached.loaded != NULL);
assert(main_mod_matches(main->cached.module));
if (_PyImport_SetModule(&_Py_ID(__main__), main->cached.loaded) < 0) {
sync_module_capture_exc(tstate, main);
return -1;
}
return 0;
}
static void
restore_main(PyThreadState *tstate, struct sync_module *main)
{
assert(main->cached.failed == NULL);
assert(main->cached.module != NULL);
assert(main->cached.loaded != NULL);
PyObject *exc = _PyErr_GetRaisedException(tstate);
assert(main_mod_matches(main->cached.loaded));
int res = _PyImport_SetModule(&_Py_ID(__main__), main->cached.module);
assert(res == 0);
if (res < 0) {
PyErr_FormatUnraisable("Exception ignored while restoring __main__");
}
_PyErr_SetRaisedException(tstate, exc);
}
/**************/
/* exceptions */
/**************/
typedef struct xi_exceptions exceptions_t;
static int init_static_exctypes(exceptions_t *, PyInterpreterState *);
static void fini_static_exctypes(exceptions_t *, PyInterpreterState *);
static int init_heap_exctypes(exceptions_t *);
static void fini_heap_exctypes(exceptions_t *);
#include "crossinterp_exceptions.h"
/***************************/
/* cross-interpreter calls */
/***************************/
int
_Py_CallInInterpreter(PyInterpreterState *interp,
_Py_simple_func func, void *arg)
{
if (interp == PyInterpreterState_Get()) {
return func(arg);
}
// XXX Emit a warning if this fails?
_PyEval_AddPendingCall(interp, (_Py_pending_call_func)func, arg, 0);
return 0;
}
int
_Py_CallInInterpreterAndRawFree(PyInterpreterState *interp,
_Py_simple_func func, void *arg)
{
if (interp == PyInterpreterState_Get()) {
int res = func(arg);
PyMem_RawFree(arg);
return res;
}
// XXX Emit a warning if this fails?
_PyEval_AddPendingCall(interp, func, arg, _Py_PENDING_RAWFREE);
return 0;
}
/**************************/
/* cross-interpreter data */
/**************************/
/* registry of {type -> _PyXIData_getdata_t} */
/* For now we use a global registry of shareable classes.
An alternative would be to add a tp_* slot for a class's
_PyXIData_getdata_t. It would be simpler and more efficient. */
static void xid_lookup_init(_PyXIData_lookup_t *);
static void xid_lookup_fini(_PyXIData_lookup_t *);
struct _dlcontext;
static _PyXIData_getdata_t lookup_getdata(struct _dlcontext *, PyObject *);
#include "crossinterp_data_lookup.h"
/* lifecycle */
_PyXIData_t *
_PyXIData_New(void)
{
_PyXIData_t *xid = PyMem_RawCalloc(1, sizeof(_PyXIData_t));
if (xid == NULL) {
PyErr_NoMemory();
}
return xid;
}
void
_PyXIData_Free(_PyXIData_t *xid)
{
PyInterpreterState *interp = PyInterpreterState_Get();
_PyXIData_Clear(interp, xid);
PyMem_RawFree(xid);
}
/* defining cross-interpreter data */
static inline void
_xidata_init(_PyXIData_t *xidata)
{
// If the value is being reused
// then _xidata_clear() should have been called already.
assert(xidata->data == NULL);
assert(xidata->obj == NULL);
*xidata = (_PyXIData_t){0};
_PyXIData_INTERPID(xidata) = -1;
}
static inline void
_xidata_clear(_PyXIData_t *xidata)
{
// _PyXIData_t only has two members that need to be
// cleaned up, if set: "xidata" must be freed and "obj" must be decref'ed.
// In both cases the original (owning) interpreter must be used,
// which is the caller's responsibility to ensure.
if (xidata->data != NULL) {
if (xidata->free != NULL) {
xidata->free(xidata->data);
}
xidata->data = NULL;
}
Py_CLEAR(xidata->obj);
}
void
_PyXIData_Init(_PyXIData_t *xidata,
PyInterpreterState *interp,
void *shared, PyObject *obj,
xid_newobjfunc new_object)
{
assert(xidata != NULL);
assert(new_object != NULL);
_xidata_init(xidata);
xidata->data = shared;
if (obj != NULL) {
assert(interp != NULL);
// released in _PyXIData_Clear()
xidata->obj = Py_NewRef(obj);
}
// Ideally every object would know its owning interpreter.
// Until then, we have to rely on the caller to identify it
// (but we don't need it in all cases).
_PyXIData_INTERPID(xidata) = (interp != NULL)
? PyInterpreterState_GetID(interp)
: -1;
xidata->new_object = new_object;
}
int
_PyXIData_InitWithSize(_PyXIData_t *xidata,
PyInterpreterState *interp,
const size_t size, PyObject *obj,
xid_newobjfunc new_object)
{
assert(size > 0);
// For now we always free the shared data in the same interpreter
// where it was allocated, so the interpreter is required.
assert(interp != NULL);
_PyXIData_Init(xidata, interp, NULL, obj, new_object);
xidata->data = PyMem_RawCalloc(1, size);
if (xidata->data == NULL) {
return -1;
}
xidata->free = PyMem_RawFree;
return 0;
}
void
_PyXIData_Clear(PyInterpreterState *interp, _PyXIData_t *xidata)
{
assert(xidata != NULL);
// This must be called in the owning interpreter.
assert(interp == NULL
|| _PyXIData_INTERPID(xidata) == -1
|| _PyXIData_INTERPID(xidata) == PyInterpreterState_GetID(interp));
_xidata_clear(xidata);
}
/* getting cross-interpreter data */
static inline void
_set_xid_lookup_failure(PyThreadState *tstate, PyObject *obj, const char *msg,
PyObject *cause)
{
if (msg != NULL) {
assert(obj == NULL);
set_notshareableerror(tstate, cause, 0, msg);
}
else if (obj == NULL) {
msg = "object does not support cross-interpreter data";
set_notshareableerror(tstate, cause, 0, msg);
}
else {
msg = "%R does not support cross-interpreter data";
format_notshareableerror(tstate, cause, 0, msg, obj);
}
}
int
_PyObject_CheckXIData(PyThreadState *tstate, PyObject *obj)
{
dlcontext_t ctx;
if (get_lookup_context(tstate, &ctx) < 0) {
return -1;
}
_PyXIData_getdata_t getdata = lookup_getdata(&ctx, obj);
if (getdata.basic == NULL && getdata.fallback == NULL) {
if (!_PyErr_Occurred(tstate)) {
_set_xid_lookup_failure(tstate, obj, NULL, NULL);
}
return -1;
}
return 0;
}
static int
_check_xidata(PyThreadState *tstate, _PyXIData_t *xidata)
{
// xidata->data can be anything, including NULL, so we don't check it.
// xidata->obj may be NULL, so we don't check it.
if (_PyXIData_INTERPID(xidata) < 0) {
PyErr_SetString(PyExc_SystemError, "missing interp");
return -1;
}
if (xidata->new_object == NULL) {
PyErr_SetString(PyExc_SystemError, "missing new_object func");
return -1;
}
// xidata->free may be NULL, so we don't check it.
return 0;
}
static int
_get_xidata(PyThreadState *tstate,
PyObject *obj, xidata_fallback_t fallback, _PyXIData_t *xidata)
{
PyInterpreterState *interp = tstate->interp;
assert(xidata->data == NULL);
assert(xidata->obj == NULL);
if (xidata->data != NULL || xidata->obj != NULL) {
_PyErr_SetString(tstate, PyExc_ValueError, "xidata not cleared");
return -1;
}
// Call the "getdata" func for the object.
dlcontext_t ctx;
if (get_lookup_context(tstate, &ctx) < 0) {
return -1;
}
Py_INCREF(obj);
_PyXIData_getdata_t getdata = lookup_getdata(&ctx, obj);
if (getdata.basic == NULL && getdata.fallback == NULL) {
if (PyErr_Occurred()) {
Py_DECREF(obj);
return -1;
}
// Fall back to obj
Py_DECREF(obj);
if (!_PyErr_Occurred(tstate)) {
_set_xid_lookup_failure(tstate, obj, NULL, NULL);
}
return -1;
}
int res = getdata.basic != NULL
? getdata.basic(tstate, obj, xidata)
: getdata.fallback(tstate, obj, fallback, xidata);
Py_DECREF(obj);
if (res != 0) {
PyObject *cause = _PyErr_GetRaisedException(tstate);
assert(cause != NULL);
_set_xid_lookup_failure(tstate, obj, NULL, cause);
Py_XDECREF(cause);
return -1;
}
// Fill in the blanks and validate the result.
_PyXIData_INTERPID(xidata) = PyInterpreterState_GetID(interp);
if (_check_xidata(tstate, xidata) != 0) {
(void)_PyXIData_Release(xidata);
return -1;
}
return 0;
}
int
_PyObject_GetXIDataNoFallback(PyThreadState *tstate,
PyObject *obj, _PyXIData_t *xidata)
{
return _get_xidata(tstate, obj, _PyXIDATA_XIDATA_ONLY, xidata);
}
int
_PyObject_GetXIData(PyThreadState *tstate,
PyObject *obj, xidata_fallback_t fallback,
_PyXIData_t *xidata)
{
switch (fallback) {
case _PyXIDATA_XIDATA_ONLY:
return _get_xidata(tstate, obj, fallback, xidata);
case _PyXIDATA_FULL_FALLBACK:
if (_get_xidata(tstate, obj, fallback, xidata) == 0) {
return 0;
}
PyObject *exc = _PyErr_GetRaisedException(tstate);
if (PyFunction_Check(obj)) {
if (_PyFunction_GetXIData(tstate, obj, xidata) == 0) {
Py_DECREF(exc);
return 0;
}
_PyErr_Clear(tstate);
}
// We could try _PyMarshal_GetXIData() but we won't for now.
if (_PyPickle_GetXIData(tstate, obj, xidata) == 0) {
Py_DECREF(exc);
return 0;
}
// Raise the original exception.
_PyErr_SetRaisedException(tstate, exc);
return -1;
default:
#ifdef Py_DEBUG
Py_FatalError("unsupported xidata fallback option");
#endif
_PyErr_SetString(tstate, PyExc_SystemError,
"unsupported xidata fallback option");
return -1;
}
}
/* pickle C-API */
struct _pickle_context {
PyThreadState *tstate;
};
static PyObject *
_PyPickle_Dumps(struct _pickle_context *ctx, PyObject *obj)
{
PyObject *dumps = PyImport_ImportModuleAttrString("pickle", "dumps");
if (dumps == NULL) {
return NULL;
}
PyObject *bytes = PyObject_CallOneArg(dumps, obj);
Py_DECREF(dumps);
return bytes;
}
struct _unpickle_context {
PyThreadState *tstate;
// We only special-case the __main__ module,
// since other modules behave consistently.
struct sync_module main;
};
static void
_unpickle_context_clear(struct _unpickle_context *ctx)
{
sync_module_clear(&ctx->main);
}
static int
check_missing___main___attr(PyObject *exc)
{
assert(!PyErr_Occurred());
if (!PyErr_GivenExceptionMatches(exc, PyExc_AttributeError)) {
return 0;
}
// Get the error message.
PyObject *args = PyException_GetArgs(exc);
if (args == NULL || args == Py_None || PyObject_Size(args) < 1) {
Py_XDECREF(args);
assert(!PyErr_Occurred());
return 0;
}
PyObject *msgobj = args;
if (!PyUnicode_Check(msgobj)) {
msgobj = PySequence_GetItem(args, 0);
Py_DECREF(args);
if (msgobj == NULL) {
PyErr_Clear();
return 0;
}
}
const char *err = PyUnicode_AsUTF8(msgobj);
// Check if it's a missing __main__ attr.
int cmp = strncmp(err, "module '__main__' has no attribute '", 36);
Py_DECREF(msgobj);
return cmp == 0;
}
static PyObject *
_PyPickle_Loads(struct _unpickle_context *ctx, PyObject *pickled)
{
PyThreadState *tstate = ctx->tstate;
PyObject *exc = NULL;
PyObject *loads = PyImport_ImportModuleAttrString("pickle", "loads");
if (loads == NULL) {
return NULL;
}
// Make an initial attempt to unpickle.
PyObject *obj = PyObject_CallOneArg(loads, pickled);
if (obj != NULL) {
goto finally;
}
assert(_PyErr_Occurred(tstate));
if (ctx == NULL) {
goto finally;
}
exc = _PyErr_GetRaisedException(tstate);
if (!check_missing___main___attr(exc)) {
goto finally;
}
// Temporarily swap in a fake __main__ loaded from the original
// file and cached. Note that functions will use the cached ns
// for __globals__, // not the actual module.
if (ensure_isolated_main(tstate, &ctx->main) < 0) {
goto finally;
}
if (apply_isolated_main(tstate, &ctx->main) < 0) {
goto finally;
}
// Try to unpickle once more.
obj = PyObject_CallOneArg(loads, pickled);
restore_main(tstate, &ctx->main);
if (obj == NULL) {
goto finally;
}
Py_CLEAR(exc);
finally:
if (exc != NULL) {
if (_PyErr_Occurred(tstate)) {
sync_module_capture_exc(tstate, &ctx->main);
}
// We restore the original exception.
// It might make sense to chain it (__context__).
_PyErr_SetRaisedException(tstate, exc);
}
Py_DECREF(loads);
return obj;
}
/* pickle wrapper */
struct _pickle_xid_context {
// __main__.__file__
struct {
const char *utf8;
size_t len;
char _utf8[MAXPATHLEN+1];
} mainfile;
};
static int
_set_pickle_xid_context(PyThreadState *tstate, struct _pickle_xid_context *ctx)
{
// Set mainfile if possible.
Py_ssize_t len = _Py_GetMainfile(ctx->mainfile._utf8, MAXPATHLEN);
if (len < 0) {
// For now we ignore any exceptions.
PyErr_Clear();
}
else if (len > 0) {
ctx->mainfile.utf8 = ctx->mainfile._utf8;
ctx->mainfile.len = (size_t)len;
}
return 0;
}
struct _shared_pickle_data {
_PyBytes_data_t pickled; // Must be first if we use _PyBytes_FromXIData().
struct _pickle_xid_context ctx;
};
PyObject *
_PyPickle_LoadFromXIData(_PyXIData_t *xidata)
{
PyThreadState *tstate = _PyThreadState_GET();
struct _shared_pickle_data *shared =
(struct _shared_pickle_data *)xidata->data;
// We avoid copying the pickled data by wrapping it in a memoryview.
// The alternative is to get a bytes object using _PyBytes_FromXIData().
PyObject *pickled = PyMemoryView_FromMemory(
(char *)shared->pickled.bytes, shared->pickled.len, PyBUF_READ);
if (pickled == NULL) {
return NULL;
}
// Unpickle the object.
struct _unpickle_context ctx = {
.tstate = tstate,
.main = {
.filename = shared->ctx.mainfile.utf8,
},
};
PyObject *obj = _PyPickle_Loads(&ctx, pickled);
Py_DECREF(pickled);
_unpickle_context_clear(&ctx);
if (obj == NULL) {
PyObject *cause = _PyErr_GetRaisedException(tstate);
assert(cause != NULL);
_set_xid_lookup_failure(
tstate, NULL, "object could not be unpickled", cause);
Py_DECREF(cause);
}
return obj;
}
int
_PyPickle_GetXIData(PyThreadState *tstate, PyObject *obj, _PyXIData_t *xidata)
{
// Pickle the object.
struct _pickle_context ctx = {
.tstate = tstate,
};
PyObject *bytes = _PyPickle_Dumps(&ctx, obj);
if (bytes == NULL) {
PyObject *cause = _PyErr_GetRaisedException(tstate);
assert(cause != NULL);
_set_xid_lookup_failure(
tstate, NULL, "object could not be pickled", cause);
Py_DECREF(cause);
return -1;
}
// If we had an "unwrapper" mechnanism, we could call
// _PyObject_GetXIData() on the bytes object directly and add
// a simple unwrapper to call pickle.loads() on the bytes.
size_t size = sizeof(struct _shared_pickle_data);
struct _shared_pickle_data *shared =
(struct _shared_pickle_data *)_PyBytes_GetXIDataWrapped(
tstate, bytes, size, _PyPickle_LoadFromXIData, xidata);
Py_DECREF(bytes);
if (shared == NULL) {
return -1;
}
// If it mattered, we could skip getting __main__.__file__
// when "__main__" doesn't show up in the pickle bytes.
if (_set_pickle_xid_context(tstate, &shared->ctx) < 0) {
_xidata_clear(xidata);
return -1;
}
return 0;
}
/* marshal wrapper */
PyObject *
_PyMarshal_ReadObjectFromXIData(_PyXIData_t *xidata)
{
PyThreadState *tstate = _PyThreadState_GET();
_PyBytes_data_t *shared = (_PyBytes_data_t *)xidata->data;
PyObject *obj = PyMarshal_ReadObjectFromString(shared->bytes, shared->len);
if (obj == NULL) {
PyObject *cause = _PyErr_GetRaisedException(tstate);
assert(cause != NULL);
_set_xid_lookup_failure(
tstate, NULL, "object could not be unmarshalled", cause);
Py_DECREF(cause);
return NULL;
}
return obj;
}
int
_PyMarshal_GetXIData(PyThreadState *tstate, PyObject *obj, _PyXIData_t *xidata)
{
PyObject *bytes = PyMarshal_WriteObjectToString(obj, Py_MARSHAL_VERSION);
if (bytes == NULL) {
PyObject *cause = _PyErr_GetRaisedException(tstate);
assert(cause != NULL);
_set_xid_lookup_failure(
tstate, NULL, "object could not be marshalled", cause);
Py_DECREF(cause);
return -1;
}
size_t size = sizeof(_PyBytes_data_t);
_PyBytes_data_t *shared = _PyBytes_GetXIDataWrapped(
tstate, bytes, size, _PyMarshal_ReadObjectFromXIData, xidata);
Py_DECREF(bytes);
if (shared == NULL) {
return -1;
}
return 0;
}
/* script wrapper */
static int
verify_script(PyThreadState *tstate, PyCodeObject *co, int checked, int pure)
{
// Make sure it isn't a closure and (optionally) doesn't use globals.
PyObject *builtins = NULL;
if (pure) {
builtins = _PyEval_GetBuiltins(tstate);
assert(builtins != NULL);
}
if (checked) {
assert(_PyCode_VerifyStateless(tstate, co, NULL, NULL, builtins) == 0);
}
else if (_PyCode_VerifyStateless(tstate, co, NULL, NULL, builtins) < 0) {
return -1;
}
// Make sure it doesn't have args.
if (co->co_argcount > 0
|| co->co_posonlyargcount > 0
|| co->co_kwonlyargcount > 0
|| co->co_flags & (CO_VARARGS | CO_VARKEYWORDS))
{
_PyErr_SetString(tstate, PyExc_ValueError,
"code with args not supported");
return -1;
}
// Make sure it doesn't return anything.
if (!_PyCode_ReturnsOnlyNone(co)) {
_PyErr_SetString(tstate, PyExc_ValueError,
"code that returns a value is not a script");
return -1;
}
return 0;
}
static int
get_script_xidata(PyThreadState *tstate, PyObject *obj, int pure,
_PyXIData_t *xidata)
{
// Get the corresponding code object.
PyObject *code = NULL;
int checked = 0;
if (PyCode_Check(obj)) {
code = obj;
Py_INCREF(code);
}
else if (PyFunction_Check(obj)) {
code = PyFunction_GET_CODE(obj);
assert(code != NULL);
Py_INCREF(code);
if (pure) {
if (_PyFunction_VerifyStateless(tstate, obj) < 0) {
goto error;
}
checked = 1;
}
}
else {
const char *filename = "