use crate::{
conversion::FromPyObjectBound,
exceptions::PyTypeError,
ffi,
pyclass::boolean_struct::False,
types::{any::PyAnyMethods, dict::PyDictMethods, tuple::PyTupleMethods, PyDict, PyTuple},
Borrowed, Bound, PyAny, PyClass, PyErr, PyRef, PyRefMut, PyResult, PyTypeCheck, Python,
};
type PyArg<'py> = Borrowed<'py, 'py, PyAny>;
pub trait PyFunctionArgument<'a, 'py>: Sized + 'a {
type Holder: FunctionArgumentHolder;
fn extract(obj: &'a Bound<'py, PyAny>, holder: &'a mut Self::Holder) -> PyResult<Self>;
}
impl<'a, 'py, T> PyFunctionArgument<'a, 'py> for T
where
T: FromPyObjectBound<'a, 'py> + 'a,
{
type Holder = ();
#[inline]
fn extract(obj: &'a Bound<'py, PyAny>, _: &'a mut ()) -> PyResult<Self> {
obj.extract()
}
}
impl<'a, 'py, T: 'py> PyFunctionArgument<'a, 'py> for &'a Bound<'py, T>
where
T: PyTypeCheck,
{
type Holder = Option<()>;
#[inline]
fn extract(obj: &'a Bound<'py, PyAny>, _: &'a mut Option<()>) -> PyResult<Self> {
obj.downcast().map_err(Into::into)
}
}
impl<'a, 'py, T: 'py> PyFunctionArgument<'a, 'py> for Option<&'a Bound<'py, T>>
where
T: PyTypeCheck,
{
type Holder = ();
#[inline]
fn extract(obj: &'a Bound<'py, PyAny>, _: &'a mut ()) -> PyResult<Self> {
if obj.is_none() {
Ok(None)
} else {
Ok(Some(obj.downcast()?))
}
}
}
#[cfg(all(Py_LIMITED_API, not(any(feature = "gil-refs", Py_3_10))))]
impl<'a> PyFunctionArgument<'a, '_> for &'a str {
type Holder = Option<std::borrow::Cow<'a, str>>;
#[inline]
fn extract(
obj: &'a Bound<'_, PyAny>,
holder: &'a mut Option<std::borrow::Cow<'a, str>>,
) -> PyResult<Self> {
Ok(holder.insert(obj.extract()?))
}
}
pub trait FunctionArgumentHolder: Sized {
const INIT: Self;
}
impl FunctionArgumentHolder for () {
const INIT: Self = ();
}
impl<T> FunctionArgumentHolder for Option<T> {
const INIT: Self = None;
}
#[inline]
pub fn extract_pyclass_ref<'a, 'py: 'a, T: PyClass>(
obj: &'a Bound<'py, PyAny>,
holder: &'a mut Option<PyRef<'py, T>>,
) -> PyResult<&'a T> {
Ok(&*holder.insert(obj.extract()?))
}
#[inline]
pub fn extract_pyclass_ref_mut<'a, 'py: 'a, T: PyClass<Frozen = False>>(
obj: &'a Bound<'py, PyAny>,
holder: &'a mut Option<PyRefMut<'py, T>>,
) -> PyResult<&'a mut T> {
Ok(&mut *holder.insert(obj.extract()?))
}
#[doc(hidden)]
pub fn extract_argument<'a, 'py, T>(
obj: &'a Bound<'py, PyAny>,
holder: &'a mut T::Holder,
arg_name: &str,
) -> PyResult<T>
where
T: PyFunctionArgument<'a, 'py>,
{
match PyFunctionArgument::extract(obj, holder) {
Ok(value) => Ok(value),
Err(e) => Err(argument_extraction_error(obj.py(), arg_name, e)),
}
}
#[doc(hidden)]
pub fn extract_optional_argument<'a, 'py, T>(
obj: Option<&'a Bound<'py, PyAny>>,
holder: &'a mut T::Holder,
arg_name: &str,
default: fn() -> Option<T>,
) -> PyResult<Option<T>>
where
T: PyFunctionArgument<'a, 'py>,
{
match obj {
Some(obj) => {
if obj.is_none() {
Ok(None)
} else {
extract_argument(obj, holder, arg_name).map(Some)
}
}
_ => Ok(default()),
}
}
#[doc(hidden)]
pub fn extract_argument_with_default<'a, 'py, T>(
obj: Option<&'a Bound<'py, PyAny>>,
holder: &'a mut T::Holder,
arg_name: &str,
default: fn() -> T,
) -> PyResult<T>
where
T: PyFunctionArgument<'a, 'py>,
{
match obj {
Some(obj) => extract_argument(obj, holder, arg_name),
None => Ok(default()),
}
}
#[doc(hidden)]
pub fn from_py_with<'a, 'py, T>(
obj: &'a Bound<'py, PyAny>,
arg_name: &str,
extractor: impl Into<super::frompyobject::Extractor<'a, 'py, T>>,
) -> PyResult<T> {
match extractor.into().call(obj) {
Ok(value) => Ok(value),
Err(e) => Err(argument_extraction_error(obj.py(), arg_name, e)),
}
}
#[doc(hidden)]
pub fn from_py_with_with_default<'a, 'py, T>(
obj: Option<&'a Bound<'py, PyAny>>,
arg_name: &str,
extractor: impl Into<super::frompyobject::Extractor<'a, 'py, T>>,
default: fn() -> T,
) -> PyResult<T> {
match obj {
Some(obj) => from_py_with(obj, arg_name, extractor),
None => Ok(default()),
}
}
#[doc(hidden)]
#[cold]
pub fn argument_extraction_error(py: Python<'_>, arg_name: &str, error: PyErr) -> PyErr {
if error
.get_type_bound(py)
.is(&py.get_type_bound::<PyTypeError>())
{
let remapped_error = PyTypeError::new_err(format!(
"argument '{}': {}",
arg_name,
error.value_bound(py)
));
remapped_error.set_cause(py, error.cause(py));
remapped_error
} else {
error
}
}
#[doc(hidden)]
#[inline]
pub unsafe fn unwrap_required_argument<'a, 'py>(
argument: Option<&'a Bound<'py, PyAny>>,
) -> &'a Bound<'py, PyAny> {
match argument {
Some(value) => value,
#[cfg(debug_assertions)]
None => unreachable!("required method argument was not extracted"),
#[cfg(not(debug_assertions))]
None => std::hint::unreachable_unchecked(),
}
}
pub struct KeywordOnlyParameterDescription {
pub name: &'static str,
pub required: bool,
}
pub struct FunctionDescription {
pub cls_name: Option<&'static str>,
pub func_name: &'static str,
pub positional_parameter_names: &'static [&'static str],
pub positional_only_parameters: usize,
pub required_positional_parameters: usize,
pub keyword_only_parameters: &'static [KeywordOnlyParameterDescription],
}
impl FunctionDescription {
fn full_name(&self) -> String {
if let Some(cls_name) = self.cls_name {
format!("{}.{}()", cls_name, self.func_name)
} else {
format!("{}()", self.func_name)
}
}
#[cfg(not(Py_LIMITED_API))]
pub unsafe fn extract_arguments_fastcall<'py, V, K>(
&self,
py: Python<'py>,
args: *const *mut ffi::PyObject,
nargs: ffi::Py_ssize_t,
kwnames: *mut ffi::PyObject,
output: &mut [Option<PyArg<'py>>],
) -> PyResult<(V::Varargs, K::Varkeywords)>
where
V: VarargsHandler<'py>,
K: VarkeywordsHandler<'py>,
{
let num_positional_parameters = self.positional_parameter_names.len();
debug_assert!(nargs >= 0);
debug_assert!(self.positional_only_parameters <= num_positional_parameters);
debug_assert!(self.required_positional_parameters <= num_positional_parameters);
debug_assert_eq!(
output.len(),
num_positional_parameters + self.keyword_only_parameters.len()
);
let args: *const Option<PyArg<'py>> = args.cast();
let positional_args_provided = nargs as usize;
let remaining_positional_args = if args.is_null() {
debug_assert_eq!(positional_args_provided, 0);
&[]
} else {
let positional_args_to_consume =
num_positional_parameters.min(positional_args_provided);
let (positional_parameters, remaining) =
std::slice::from_raw_parts(args, positional_args_provided)
.split_at(positional_args_to_consume);
output[..positional_args_to_consume].copy_from_slice(positional_parameters);
remaining
};
let varargs = V::handle_varargs_fastcall(py, remaining_positional_args, self)?;
let mut varkeywords = K::Varkeywords::default();
let kwnames: Option<Borrowed<'_, '_, PyTuple>> =
Borrowed::from_ptr_or_opt(py, kwnames).map(|kwnames| kwnames.downcast_unchecked());
if let Some(kwnames) = kwnames {
let kwargs = ::std::slice::from_raw_parts(
args.offset(nargs).cast::<PyArg<'py>>(),
kwnames.len(),
);
self.handle_kwargs::<K, _>(
kwnames.iter_borrowed().zip(kwargs.iter().copied()),
&mut varkeywords,
num_positional_parameters,
output,
)?
}
self.ensure_no_missing_required_positional_arguments(output, positional_args_provided)?;
self.ensure_no_missing_required_keyword_arguments(output)?;
Ok((varargs, varkeywords))
}
pub unsafe fn extract_arguments_tuple_dict<'py, V, K>(
&self,
py: Python<'py>,
args: *mut ffi::PyObject,
kwargs: *mut ffi::PyObject,
output: &mut [Option<PyArg<'py>>],
) -> PyResult<(V::Varargs, K::Varkeywords)>
where
V: VarargsHandler<'py>,
K: VarkeywordsHandler<'py>,
{
let args: Borrowed<'py, 'py, PyTuple> =
Borrowed::from_ptr(py, args).downcast_unchecked::<PyTuple>();
let kwargs: Option<Borrowed<'py, 'py, PyDict>> =
Borrowed::from_ptr_or_opt(py, kwargs).map(|kwargs| kwargs.downcast_unchecked());
let num_positional_parameters = self.positional_parameter_names.len();
debug_assert!(self.positional_only_parameters <= num_positional_parameters);
debug_assert!(self.required_positional_parameters <= num_positional_parameters);
debug_assert_eq!(
output.len(),
num_positional_parameters + self.keyword_only_parameters.len()
);
for (i, arg) in args
.iter_borrowed()
.take(num_positional_parameters)
.enumerate()
{
output[i] = Some(arg);
}
let varargs = V::handle_varargs_tuple(&args, self)?;
let mut varkeywords = K::Varkeywords::default();
if let Some(kwargs) = kwargs {
self.handle_kwargs::<K, _>(
kwargs.iter_borrowed(),
&mut varkeywords,
num_positional_parameters,
output,
)?
}
self.ensure_no_missing_required_positional_arguments(output, args.len())?;
self.ensure_no_missing_required_keyword_arguments(output)?;
Ok((varargs, varkeywords))
}
#[inline]
fn handle_kwargs<'py, K, I>(
&self,
kwargs: I,
varkeywords: &mut K::Varkeywords,
num_positional_parameters: usize,
output: &mut [Option<PyArg<'py>>],
) -> PyResult<()>
where
K: VarkeywordsHandler<'py>,
I: IntoIterator<Item = (PyArg<'py>, PyArg<'py>)>,
{
debug_assert_eq!(
num_positional_parameters,
self.positional_parameter_names.len()
);
debug_assert_eq!(
output.len(),
num_positional_parameters + self.keyword_only_parameters.len()
);
let mut positional_only_keyword_arguments = Vec::new();
for (kwarg_name_py, value) in kwargs {
#[cfg(any(Py_3_10, not(Py_LIMITED_API)))]
let kwarg_name =
unsafe { kwarg_name_py.downcast_unchecked::<crate::types::PyString>() }.to_str();
#[cfg(all(not(Py_3_10), Py_LIMITED_API))]
let kwarg_name = kwarg_name_py.extract::<crate::pybacked::PyBackedStr>();
if let Ok(kwarg_name_owned) = kwarg_name {
#[cfg(any(Py_3_10, not(Py_LIMITED_API)))]
let kwarg_name = kwarg_name_owned;
#[cfg(all(not(Py_3_10), Py_LIMITED_API))]
let kwarg_name: &str = &kwarg_name_owned;
if let Some(i) = self.find_keyword_parameter_in_keyword_only(kwarg_name) {
if output[i + num_positional_parameters]
.replace(value)
.is_some()
{
return Err(self.multiple_values_for_argument(kwarg_name));
}
continue;
}
if let Some(i) = self.find_keyword_parameter_in_positional(kwarg_name) {
if i < self.positional_only_parameters {
if K::handle_varkeyword(varkeywords, kwarg_name_py, value, self).is_err() {
positional_only_keyword_arguments.push(kwarg_name_owned);
}
} else if output[i].replace(value).is_some() {
return Err(self.multiple_values_for_argument(kwarg_name));
}
continue;
}
};
K::handle_varkeyword(varkeywords, kwarg_name_py, value, self)?
}
if !positional_only_keyword_arguments.is_empty() {
#[cfg(all(not(Py_3_10), Py_LIMITED_API))]
let positional_only_keyword_arguments: Vec<_> = positional_only_keyword_arguments
.iter()
.map(std::ops::Deref::deref)
.collect();
return Err(self.positional_only_keyword_arguments(&positional_only_keyword_arguments));
}
Ok(())
}
#[inline]
fn find_keyword_parameter_in_positional(&self, kwarg_name: &str) -> Option<usize> {
self.positional_parameter_names
.iter()
.position(|¶m_name| param_name == kwarg_name)
}
#[inline]
fn find_keyword_parameter_in_keyword_only(&self, kwarg_name: &str) -> Option<usize> {
self.keyword_only_parameters
.iter()
.position(|param_desc| param_desc.name == kwarg_name)
}
#[inline]
fn ensure_no_missing_required_positional_arguments(
&self,
output: &[Option<PyArg<'_>>],
positional_args_provided: usize,
) -> PyResult<()> {
if positional_args_provided < self.required_positional_parameters {
for out in &output[positional_args_provided..self.required_positional_parameters] {
if out.is_none() {
return Err(self.missing_required_positional_arguments(output));
}
}
}
Ok(())
}
#[inline]
fn ensure_no_missing_required_keyword_arguments(
&self,
output: &[Option<PyArg<'_>>],
) -> PyResult<()> {
let keyword_output = &output[self.positional_parameter_names.len()..];
for (param, out) in self.keyword_only_parameters.iter().zip(keyword_output) {
if param.required && out.is_none() {
return Err(self.missing_required_keyword_arguments(keyword_output));
}
}
Ok(())
}
#[cold]
fn too_many_positional_arguments(&self, args_provided: usize) -> PyErr {
let was = if args_provided == 1 { "was" } else { "were" };
let msg = if self.required_positional_parameters != self.positional_parameter_names.len() {
format!(
"{} takes from {} to {} positional arguments but {} {} given",
self.full_name(),
self.required_positional_parameters,
self.positional_parameter_names.len(),
args_provided,
was
)
} else {
format!(
"{} takes {} positional arguments but {} {} given",
self.full_name(),
self.positional_parameter_names.len(),
args_provided,
was
)
};
PyTypeError::new_err(msg)
}
#[cold]
fn multiple_values_for_argument(&self, argument: &str) -> PyErr {
PyTypeError::new_err(format!(
"{} got multiple values for argument '{}'",
self.full_name(),
argument
))
}
#[cold]
fn unexpected_keyword_argument(&self, argument: PyArg<'_>) -> PyErr {
PyTypeError::new_err(format!(
"{} got an unexpected keyword argument '{}'",
self.full_name(),
argument.as_any()
))
}
#[cold]
fn positional_only_keyword_arguments(&self, parameter_names: &[&str]) -> PyErr {
let mut msg = format!(
"{} got some positional-only arguments passed as keyword arguments: ",
self.full_name()
);
push_parameter_list(&mut msg, parameter_names);
PyTypeError::new_err(msg)
}
#[cold]
fn missing_required_arguments(&self, argument_type: &str, parameter_names: &[&str]) -> PyErr {
let arguments = if parameter_names.len() == 1 {
"argument"
} else {
"arguments"
};
let mut msg = format!(
"{} missing {} required {} {}: ",
self.full_name(),
parameter_names.len(),
argument_type,
arguments,
);
push_parameter_list(&mut msg, parameter_names);
PyTypeError::new_err(msg)
}
#[cold]
fn missing_required_keyword_arguments(&self, keyword_outputs: &[Option<PyArg<'_>>]) -> PyErr {
debug_assert_eq!(self.keyword_only_parameters.len(), keyword_outputs.len());
let missing_keyword_only_arguments: Vec<_> = self
.keyword_only_parameters
.iter()
.zip(keyword_outputs)
.filter_map(|(keyword_desc, out)| {
if keyword_desc.required && out.is_none() {
Some(keyword_desc.name)
} else {
None
}
})
.collect();
debug_assert!(!missing_keyword_only_arguments.is_empty());
self.missing_required_arguments("keyword", &missing_keyword_only_arguments)
}
#[cold]
fn missing_required_positional_arguments(&self, output: &[Option<PyArg<'_>>]) -> PyErr {
let missing_positional_arguments: Vec<_> = self
.positional_parameter_names
.iter()
.take(self.required_positional_parameters)
.zip(output)
.filter_map(|(param, out)| if out.is_none() { Some(*param) } else { None })
.collect();
debug_assert!(!missing_positional_arguments.is_empty());
self.missing_required_arguments("positional", &missing_positional_arguments)
}
}
pub trait VarargsHandler<'py> {
type Varargs;
fn handle_varargs_fastcall(
py: Python<'py>,
varargs: &[Option<PyArg<'py>>],
function_description: &FunctionDescription,
) -> PyResult<Self::Varargs>;
fn handle_varargs_tuple(
args: &Bound<'py, PyTuple>,
function_description: &FunctionDescription,
) -> PyResult<Self::Varargs>;
}
pub struct NoVarargs;
impl<'py> VarargsHandler<'py> for NoVarargs {
type Varargs = ();
#[inline]
fn handle_varargs_fastcall(
_py: Python<'py>,
varargs: &[Option<PyArg<'py>>],
function_description: &FunctionDescription,
) -> PyResult<Self::Varargs> {
let extra_arguments = varargs.len();
if extra_arguments > 0 {
return Err(function_description.too_many_positional_arguments(
function_description.positional_parameter_names.len() + extra_arguments,
));
}
Ok(())
}
#[inline]
fn handle_varargs_tuple(
args: &Bound<'py, PyTuple>,
function_description: &FunctionDescription,
) -> PyResult<Self::Varargs> {
let positional_parameter_count = function_description.positional_parameter_names.len();
let provided_args_count = args.len();
if provided_args_count <= positional_parameter_count {
Ok(())
} else {
Err(function_description.too_many_positional_arguments(provided_args_count))
}
}
}
pub struct TupleVarargs;
impl<'py> VarargsHandler<'py> for TupleVarargs {
type Varargs = Bound<'py, PyTuple>;
#[inline]
fn handle_varargs_fastcall(
py: Python<'py>,
varargs: &[Option<PyArg<'py>>],
_function_description: &FunctionDescription,
) -> PyResult<Self::Varargs> {
Ok(PyTuple::new_bound(py, varargs))
}
#[inline]
fn handle_varargs_tuple(
args: &Bound<'py, PyTuple>,
function_description: &FunctionDescription,
) -> PyResult<Self::Varargs> {
let positional_parameters = function_description.positional_parameter_names.len();
Ok(args.get_slice(positional_parameters, args.len()))
}
}
pub trait VarkeywordsHandler<'py> {
type Varkeywords: Default;
fn handle_varkeyword(
varkeywords: &mut Self::Varkeywords,
name: PyArg<'py>,
value: PyArg<'py>,
function_description: &FunctionDescription,
) -> PyResult<()>;
}
pub struct NoVarkeywords;
impl<'py> VarkeywordsHandler<'py> for NoVarkeywords {
type Varkeywords = ();
#[inline]
fn handle_varkeyword(
_varkeywords: &mut Self::Varkeywords,
name: PyArg<'py>,
_value: PyArg<'py>,
function_description: &FunctionDescription,
) -> PyResult<()> {
Err(function_description.unexpected_keyword_argument(name))
}
}
pub struct DictVarkeywords;
impl<'py> VarkeywordsHandler<'py> for DictVarkeywords {
type Varkeywords = Option<Bound<'py, PyDict>>;
#[inline]
fn handle_varkeyword(
varkeywords: &mut Self::Varkeywords,
name: PyArg<'py>,
value: PyArg<'py>,
_function_description: &FunctionDescription,
) -> PyResult<()> {
varkeywords
.get_or_insert_with(|| PyDict::new_bound(name.py()))
.set_item(name, value)
}
}
fn push_parameter_list(msg: &mut String, parameter_names: &[&str]) {
let len = parameter_names.len();
for (i, parameter) in parameter_names.iter().enumerate() {
if i != 0 {
if len > 2 {
msg.push(',');
}
if i == len - 1 {
msg.push_str(" and ")
} else {
msg.push(' ')
}
}
msg.push('\'');
msg.push_str(parameter);
msg.push('\'');
}
}
#[cfg(test)]
mod tests {
use crate::types::{IntoPyDict, PyTuple};
use crate::Python;
use super::{push_parameter_list, FunctionDescription, NoVarargs, NoVarkeywords};
#[test]
fn unexpected_keyword_argument() {
let function_description = FunctionDescription {
cls_name: None,
func_name: "example",
positional_parameter_names: &[],
positional_only_parameters: 0,
required_positional_parameters: 0,
keyword_only_parameters: &[],
};
Python::with_gil(|py| {
let args = PyTuple::empty_bound(py);
let kwargs = [("foo", 0u8)].into_py_dict_bound(py);
let err = unsafe {
function_description
.extract_arguments_tuple_dict::<NoVarargs, NoVarkeywords>(
py,
args.as_ptr(),
kwargs.as_ptr(),
&mut [],
)
.unwrap_err()
};
assert_eq!(
err.to_string(),
"TypeError: example() got an unexpected keyword argument 'foo'"
);
})
}
#[test]
fn keyword_not_string() {
let function_description = FunctionDescription {
cls_name: None,
func_name: "example",
positional_parameter_names: &[],
positional_only_parameters: 0,
required_positional_parameters: 0,
keyword_only_parameters: &[],
};
Python::with_gil(|py| {
let args = PyTuple::empty_bound(py);
let kwargs = [(1u8, 1u8)].into_py_dict_bound(py);
let err = unsafe {
function_description
.extract_arguments_tuple_dict::<NoVarargs, NoVarkeywords>(
py,
args.as_ptr(),
kwargs.as_ptr(),
&mut [],
)
.unwrap_err()
};
assert_eq!(
err.to_string(),
"TypeError: example() got an unexpected keyword argument '1'"
);
})
}
#[test]
fn missing_required_arguments() {
let function_description = FunctionDescription {
cls_name: None,
func_name: "example",
positional_parameter_names: &["foo", "bar"],
positional_only_parameters: 0,
required_positional_parameters: 2,
keyword_only_parameters: &[],
};
Python::with_gil(|py| {
let args = PyTuple::empty_bound(py);
let mut output = [None, None];
let err = unsafe {
function_description.extract_arguments_tuple_dict::<NoVarargs, NoVarkeywords>(
py,
args.as_ptr(),
std::ptr::null_mut(),
&mut output,
)
}
.unwrap_err();
assert_eq!(
err.to_string(),
"TypeError: example() missing 2 required positional arguments: 'foo' and 'bar'"
);
})
}
#[test]
fn push_parameter_list_empty() {
let mut s = String::new();
push_parameter_list(&mut s, &[]);
assert_eq!(&s, "");
}
#[test]
fn push_parameter_list_one() {
let mut s = String::new();
push_parameter_list(&mut s, &["a"]);
assert_eq!(&s, "'a'");
}
#[test]
fn push_parameter_list_two() {
let mut s = String::new();
push_parameter_list(&mut s, &["a", "b"]);
assert_eq!(&s, "'a' and 'b'");
}
#[test]
fn push_parameter_list_three() {
let mut s = String::new();
push_parameter_list(&mut s, &["a", "b", "c"]);
assert_eq!(&s, "'a', 'b', and 'c'");
}
#[test]
fn push_parameter_list_four() {
let mut s = String::new();
push_parameter_list(&mut s, &["a", "b", "c", "d"]);
assert_eq!(&s, "'a', 'b', 'c', and 'd'");
}
}