refactor deque

This commit is contained in:
blueloveTH 2024-06-01 21:47:27 +08:00
parent 6aa210159a
commit 2f96712371
8 changed files with 196 additions and 959 deletions

View File

@ -9,7 +9,7 @@ pipeline = [
["config.h", "export.h", "_generated.h", "common.h", "memory.h", "vector.h", "str.h", "tuplelist.h", "namedict.h", "error.h", "any.h"],
["obj.h", "dict.h", "codeobject.h", "frame.h", "profiler.h"],
["gc.h", "vm.h", "ceval.h", "lexer.h", "expr.h", "compiler.h", "repl.h"],
["cffi.h", "bindings.h", "iter.h", "base64.h", "csv.h", "collections.h", "array2d.h", "dataclasses.h", "random.h", "linalg.h", "easing.h", "io.h", "modules.h"],
["cffi.h", "bindings.h", "iter.h", "base64.h", "csv.h", "array2d.h", "dataclasses.h", "random.h", "linalg.h", "easing.h", "io.h", "modules.h"],
["pocketpy.h", "pocketpy_c.h"]
]

View File

@ -1,8 +0,0 @@
#pragma once
#include "bindings.h"
namespace pkpy
{
void add_module_collections(VM *vm);
} // namespace pkpy

View File

@ -12,7 +12,6 @@
#include "vm.h"
#include "random.h"
#include "bindings.h"
#include "collections.h"
#include "csv.h"
#include "dataclasses.h"
#include "array2d.h"

View File

@ -1,5 +1,10 @@
def Counter(iterable):
a = {}
from __builtins import _enable_instance_dict
from typing import Generic, TypeVar, Iterable
T = TypeVar('T')
def Counter(iterable: Iterable[T]):
a: dict[T, int] = {}
for x in iterable:
if x in a:
a[x] += 1
@ -7,7 +12,6 @@ def Counter(iterable):
a[x] = 1
return a
from __builtins import _enable_instance_dict
class defaultdict(dict):
def __init__(self, default_factory, *args):
@ -25,3 +29,119 @@ class defaultdict(dict):
def copy(self):
return defaultdict(self.default_factory, self)
class deque(Generic[T]):
_data: list[T]
_head: int
_tail: int
_capacity: int
def __init__(self, iterable: Iterable[T] = None):
self._data = [None] * 8 # initial capacity
self._head = 0
self._tail = 0
self._capacity = len(self._data)
if iterable is not None:
self.extend(iterable)
def __resize_2x(self):
backup = list(self)
self._capacity *= 2
self._head = 0
self._tail = len(backup)
self._data.clear()
self._data.extend(backup)
self._data.extend([None] * (self._capacity - len(backup)))
def append(self, x: T):
self._data[self._tail] = x
self._tail = (self._tail + 1) % self._capacity
if (self._tail + 1) % self._capacity == self._head:
self.__resize_2x()
def appendleft(self, x: T):
self._head = (self._head - 1 + self._capacity) % self._capacity
self._data[self._head] = x
if (self._tail + 1) % self._capacity == self._head:
self.__resize_2x()
def copy(self):
return deque(self)
def count(self, x: T) -> int:
n = 0
for item in self:
if item == x:
n += 1
return n
def extend(self, iterable: Iterable[T]):
for x in iterable:
self.append(x)
def extendleft(self, iterable: Iterable[T]):
for x in iterable:
self.appendleft(x)
def pop(self) -> T:
if self._head == self._tail:
raise IndexError("pop from an empty deque")
self._tail = (self._tail - 1 + self._capacity) % self._capacity
return self._data[self._tail]
def popleft(self) -> T:
if self._head == self._tail:
raise IndexError("pop from an empty deque")
x = self._data[self._head]
self._head = (self._head + 1) % self._capacity
return x
def clear(self):
i = self._head
while i != self._tail:
self._data[i] = None
i = (i + 1) % self._capacity
self._head = 0
self._tail = 0
def rotate(self, n: int = 1):
if len(self) == 0:
return
if n > 0:
n = n % len(self)
for _ in range(n):
self.appendleft(self.pop())
elif n < 0:
n = -n % len(self)
for _ in range(n):
self.append(self.popleft())
def __len__(self) -> int:
return (self._tail - self._head + self._capacity) % self._capacity
def __contains__(self, x: object) -> bool:
for item in self:
if item == x:
return True
return False
def __iter__(self):
i = self._head
while i != self._tail:
yield self._data[i]
i = (i + 1) % self._capacity
def __eq__(self, other: object) -> bool:
if not isinstance(other, deque):
return False
if len(self) != len(other):
return False
for x, y in zip(self, other):
if x != y:
return False
return True
def __repr__(self) -> str:
return f"deque({list(self)!r})"

File diff suppressed because one or more lines are too long

View File

@ -1,548 +0,0 @@
#include "pocketpy/collections.h"
namespace pkpy
{
struct PyDequeIter // Iterator for the deque type
{
PyVar ref;
bool is_reversed;
std::deque<PyVar >::iterator begin, end, current;
std::deque<PyVar >::reverse_iterator rbegin, rend, rcurrent;
PyDequeIter(PyVar ref, std::deque<PyVar >::iterator begin, std::deque<PyVar >::iterator end)
: ref(ref), begin(begin), end(end), current(begin)
{
this->is_reversed = false;
}
PyDequeIter(PyVar ref, std::deque<PyVar >::reverse_iterator rbegin, std::deque<PyVar >::reverse_iterator rend)
: ref(ref), rbegin(rbegin), rend(rend), rcurrent(rbegin)
{
this->is_reversed = true;
}
void _gc_mark(VM* vm) const { vm->obj_gc_mark(ref); }
static void _register(VM *vm, PyObject* mod, PyObject* type);
};
void PyDequeIter::_register(VM *vm, PyObject* mod, PyObject* type)
{
vm->bind__iter__(type->as<Type>(), [](VM *vm, PyVar obj)
{ return obj; });
vm->bind__next__(type->as<Type>(), [](VM *vm, PyVar obj) -> unsigned
{
PyDequeIter& self = _CAST(PyDequeIter&, obj);
if(self.is_reversed){
if(self.rcurrent == self.rend) return 0;
vm->s_data.push(*self.rcurrent);
++self.rcurrent;
return 1;
}
else{
if(self.current == self.end) return 0;
vm->s_data.push(*self.current);
++self.current;
return 1;
} });
}
struct PyDeque
{
PyDeque(VM *vm, PyVar iterable, PyVar maxlen); // constructor
// PyDeque members
std::deque<PyVar > dequeItems;
int maxlen = -1; // -1 means unbounded
bool bounded = false; // if true, maxlen is not -1
void insertObj(bool front, bool back, int index, PyVar item); // insert at index, used purely for internal purposes: append, appendleft, insert methods
PyVar popObj(bool front, bool back, PyVar item, VM *vm); // pop at index, used purely for internal purposes: pop, popleft, remove methods
int findIndex(VM *vm, PyVar obj, int start, int stop); // find the index of the given object in the deque
// Special methods
static void _register(VM *vm, PyObject* mod, PyObject* type); // register the type
void _gc_mark(VM*) const; // needed for container types, mark all objects in the deque for gc
};
void PyDeque::_register(VM *vm, PyObject* mod, PyObject* type)
{
vm->bind(type, "__new__(cls, iterable=None, maxlen=None)",
[](VM *vm, ArgsView args)
{
Type cls_t = PK_OBJ_GET(Type, args[0]);
PyVar iterable = args[1];
PyVar maxlen = args[2];
return vm->new_object<PyDeque>(cls_t, vm, iterable, maxlen);
});
// gets the item at the given index, if index is negative, it will be treated as index + len(deque)
// if the index is out of range, IndexError will be thrown --> required for [] operator
vm->bind__getitem__(type->as<Type>(), [](VM *vm, PyVar _0, PyVar _1)
{
PyDeque &self = _CAST(PyDeque &, _0);
i64 index = CAST(i64, _1);
index = vm->normalized_index(index, self.dequeItems.size()); // error is handled by the vm->normalized_index
return self.dequeItems[index];
});
// sets the item at the given index, if index is negative, it will be treated as index + len(deque)
// if the index is out of range, IndexError will be thrown --> required for [] operator
vm->bind__setitem__(type->as<Type>(), [](VM *vm, PyVar _0, PyVar _1, PyVar _2)
{
PyDeque &self = _CAST(PyDeque&, _0);
i64 index = CAST(i64, _1);
index = vm->normalized_index(index, self.dequeItems.size()); // error is handled by the vm->normalized_index
self.dequeItems[index] = _2;
});
// erases the item at the given index, if index is negative, it will be treated as index + len(deque)
// if the index is out of range, IndexError will be thrown --> required for [] operator
vm->bind__delitem__(type->as<Type>(), [](VM *vm, PyVar _0, PyVar _1)
{
PyDeque &self = _CAST(PyDeque&, _0);
i64 index = CAST(i64, _1);
index = vm->normalized_index(index, self.dequeItems.size()); // error is handled by the vm->normalized_index
self.dequeItems.erase(self.dequeItems.begin() + index);
});
vm->bind__len__(type->as<Type>(), [](VM *vm, PyVar _0)
{
PyDeque &self = _CAST(PyDeque&, _0);
return (i64)self.dequeItems.size();
});
vm->bind__iter__(type->as<Type>(), [](VM *vm, PyVar _0)
{
PyDeque &self = _CAST(PyDeque &, _0);
return vm->new_user_object<PyDequeIter>(_0, self.dequeItems.begin(), self.dequeItems.end());
});
vm->bind__repr__(type->as<Type>(), [](VM *vm, PyVar _0) -> Str
{
if(vm->_repr_recursion_set.count(_0)) return "[...]";
const PyDeque &self = _CAST(PyDeque&, _0);
SStream ss;
ss << "deque([";
vm->_repr_recursion_set.insert(_0);
for (auto it = self.dequeItems.begin(); it != self.dequeItems.end(); ++it)
{
ss << vm->py_repr(*it);
if (it != self.dequeItems.end() - 1) ss << ", ";
}
vm->_repr_recursion_set.erase(_0);
self.bounded ? ss << "], maxlen=" << self.maxlen << ")" : ss << "])";
return ss.str();
});
// enables comparison between two deques, == and != are supported
vm->bind__eq__(type->as<Type>(), [](VM *vm, PyVar _0, PyVar _1)
{
const PyDeque &self = _CAST(PyDeque&, _0);
if(!vm->is_user_type<PyDeque>(_0)) return vm->NotImplemented;
const PyDeque &other = _CAST(PyDeque&, _1);
if (self.dequeItems.size() != other.dequeItems.size()) return vm->False;
for (int i = 0; i < self.dequeItems.size(); i++){
if (vm->py_ne(self.dequeItems[i], other.dequeItems[i])) return vm->False;
}
return vm->True;
});
// clear the deque
vm->bind(type, "clear(self) -> None",
[](VM *vm, ArgsView args)
{
PyDeque &self = _CAST(PyDeque &, args[0]);
self.dequeItems.clear();
return vm->None;
});
// extend the deque with the given iterable
vm->bind(type, "extend(self, iterable) -> None",
[](VM *vm, ArgsView args)
{
auto _lock = vm->heap.gc_scope_lock(); // locking the heap
PyDeque &self = _CAST(PyDeque &, args[0]);
PyVar it = vm->py_iter(args[1]); // strong ref
PyVar obj = vm->py_next(it);
while (obj != vm->StopIteration)
{
self.insertObj(false, true, -1, obj);
obj = vm->py_next(it);
}
return vm->None;
});
// append at the end of the deque
vm->bind(type, "append(self, item) -> None",
[](VM *vm, ArgsView args)
{
PyDeque &self = _CAST(PyDeque &, args[0]);
PyVar item = args[1];
self.insertObj(false, true, -1, item);
return vm->None;
});
// append at the beginning of the deque
vm->bind(type, "appendleft(self, item) -> None",
[](VM *vm, ArgsView args)
{
PyDeque &self = _CAST(PyDeque &, args[0]);
PyVar item = args[1];
self.insertObj(true, false, -1, item);
return vm->None;
});
// pop from the end of the deque
vm->bind(type, "pop(self) -> PyObject",
[](VM *vm, ArgsView args)
{
PyDeque &self = _CAST(PyDeque &, args[0]);
if (self.dequeItems.empty())
{
vm->IndexError("pop from an empty deque");
return vm->None;
}
return self.popObj(false, true, nullptr, vm);
});
// pop from the beginning of the deque
vm->bind(type, "popleft(self) -> PyObject",
[](VM *vm, ArgsView args)
{
PyDeque &self = _CAST(PyDeque &, args[0]);
if (self.dequeItems.empty())
{
vm->IndexError("pop from an empty deque");
return vm->None;
}
return self.popObj(true, false, nullptr, vm);
});
// shallow copy of the deque
vm->bind(type, "copy(self) -> deque",
[](VM *vm, ArgsView args)
{
auto _lock = vm->heap.gc_scope_lock(); // locking the heap
PyDeque &self = _CAST(PyDeque &, args[0]);
PyVar newDequeObj = vm->new_user_object<PyDeque>(vm, vm->None, vm->None); // create the empty deque
PyDeque &newDeque = _CAST(PyDeque &, newDequeObj); // cast it to PyDeque so we can use its methods
for (auto it = self.dequeItems.begin(); it != self.dequeItems.end(); ++it)
newDeque.insertObj(false, true, -1, *it);
return newDequeObj;
});
// NEW: counts the number of occurrences of the given object in the deque
vm->bind(type, "count(self, obj) -> int",
[](VM *vm, ArgsView args)
{
PyDeque &self = _CAST(PyDeque &, args[0]);
PyVar obj = args[1];
int cnt = 0, sz = self.dequeItems.size();
for (auto it = self.dequeItems.begin(); it != self.dequeItems.end(); ++it)
{
if (vm->py_eq((*it), obj))
cnt++;
if (sz != self.dequeItems.size())// mutating the deque during iteration is not allowed
vm->RuntimeError("deque mutated during iteration");
}
return VAR(cnt);
});
// NEW: extends the deque from the left
vm->bind(type, "extendleft(self, iterable) -> None",
[](VM *vm, ArgsView args)
{
auto _lock = vm->heap.gc_scope_lock();
PyDeque &self = _CAST(PyDeque &, args[0]);
PyVar it = vm->py_iter(args[1]); // strong ref
PyVar obj = vm->py_next(it);
while (obj != vm->StopIteration)
{
self.insertObj(true, false, -1, obj);
obj = vm->py_next(it);
}
return vm->None;
});
// NEW: returns the index of the given object in the deque
vm->bind(type, "index(self, obj, start=None, stop=None) -> int",
[](VM *vm, ArgsView args)
{
// Return the position of x in the deque (at or after index start and before index stop). Returns the first match or raises ValueError if not found.
PyDeque &self = _CAST(PyDeque &, args[0]);
PyVar obj = args[1];
int start = CAST_DEFAULT(int, args[2], 0);
int stop = CAST_DEFAULT(int, args[3], self.dequeItems.size());
int index = self.findIndex(vm, obj, start, stop);
if (index < 0) vm->ValueError(vm->py_repr(obj) + " is not in deque");
return VAR(index);
});
// NEW: returns the index of the given object in the deque
vm->bind(type, "__contains__(self, obj) -> bool",
[](VM *vm, ArgsView args)
{
// Return the position of x in the deque (at or after index start and before index stop). Returns the first match or raises ValueError if not found.
PyDeque &self = _CAST(PyDeque &, args[0]);
PyVar obj = args[1];
int start = 0, stop = self.dequeItems.size(); // default values
int index = self.findIndex(vm, obj, start, stop);
if (index != -1)
return VAR(true);
return VAR(false);
});
// NEW: inserts the given object at the given index
vm->bind(type, "insert(self, index, obj) -> None",
[](VM *vm, ArgsView args)
{
PyDeque &self = _CAST(PyDeque &, args[0]);
int index = CAST(int, args[1]);
PyVar obj = args[2];
if (self.bounded && self.dequeItems.size() == self.maxlen)
vm->IndexError("deque already at its maximum size");
else
self.insertObj(false, false, index, obj); // this index shouldn't be fixed using vm->normalized_index, pass as is
return vm->None;
});
// NEW: removes the first occurrence of the given object from the deque
vm->bind(type, "remove(self, obj) -> None",
[](VM *vm, ArgsView args)
{
PyDeque &self = _CAST(PyDeque &, args[0]);
PyVar obj = args[1];
PyVar removed = self.popObj(false, false, obj, vm);
if (removed == nullptr)
vm->ValueError(vm->py_repr(obj) + " is not in list");
return vm->None;
});
// NEW: reverses the deque
vm->bind(type, "reverse(self) -> None",
[](VM *vm, ArgsView args)
{
PyDeque &self = _CAST(PyDeque &, args[0]);
if (self.dequeItems.empty() || self.dequeItems.size() == 1)
return vm->None; // handle trivial cases
int sz = self.dequeItems.size();
for (int i = 0; i < sz / 2; i++)
{
PyVar tmp = self.dequeItems[i];
self.dequeItems[i] = self.dequeItems[sz - i - 1]; // swapping
self.dequeItems[sz - i - 1] = tmp;
}
return vm->None;
});
// NEW: rotates the deque by n steps
vm->bind(type, "rotate(self, n=1) -> None",
[](VM *vm, ArgsView args)
{
PyDeque &self = _CAST(PyDeque &, args[0]);
int n = CAST(int, args[1]);
if (n != 0 && !self.dequeItems.empty()) // trivial case
{
PyVar tmp; // holds the object to be rotated
int direction = n > 0 ? 1 : -1;
n = abs(n);
n = n % self.dequeItems.size(); // make sure n is in range
while (n--)
{
if (direction == 1)
{
tmp = self.dequeItems.back();
self.dequeItems.pop_back();
self.dequeItems.push_front(tmp);
}
else
{
tmp = self.dequeItems.front();
self.dequeItems.pop_front();
self.dequeItems.push_back(tmp);
}
}
}
return vm->None;
});
// NEW: getter and setter of property `maxlen`
vm->bind_property(
type, "maxlen: int",
[](VM *vm, ArgsView args)
{
PyDeque &self = _CAST(PyDeque &, args[0]);
if (self.bounded)
return VAR(self.maxlen);
return vm->None;
},
[](VM *vm, ArgsView args)
{
vm->AttributeError("attribute 'maxlen' of 'collections.deque' objects is not writable");
return vm->None;
});
// NEW: support pickle
vm->bind(type, "__getnewargs__(self) -> tuple[list, int]",
[](VM *vm, ArgsView args)
{
PyDeque &self = _CAST(PyDeque &, args[0]);
Tuple ret(2);
List list;
for (PyVar obj : self.dequeItems)
{
list.push_back(obj);
}
ret[0] = VAR(std::move(list));
if (self.bounded)
ret[1] = VAR(self.maxlen);
else
ret[1] = vm->None;
return VAR(ret);
});
}
/// @brief initializes a new PyDeque object, actual initialization is done in __init__
PyDeque::PyDeque(VM *vm, PyVar iterable, PyVar maxlen)
{
if (maxlen != vm->None) // fix the maxlen first
{
int tmp = CAST(int, maxlen);
if (tmp < 0)
vm->ValueError("maxlen must be non-negative");
else
{
this->maxlen = tmp;
this->bounded = true;
}
}
else
{
this->bounded = false;
this->maxlen = -1;
}
if (iterable != vm->None)
{
this->dequeItems.clear(); // clear the deque
auto _lock = vm->heap.gc_scope_lock(); // locking the heap
PyVar it = vm->py_iter(iterable); // strong ref
PyVar obj = vm->py_next(it);
while (obj != vm->StopIteration)
{
this->insertObj(false, true, -1, obj);
obj = vm->py_next(it);
}
}
}
int PyDeque::findIndex(VM *vm, PyVar obj, int start, int stop)
{
// the following code is special purpose normalization for this method, taken from CPython: _collectionsmodule.c file
if (start < 0)
{
start = this->dequeItems.size() + start; // try to fix for negative indices
if (start < 0)
start = 0;
}
if (stop < 0)
{
stop = this->dequeItems.size() + stop; // try to fix for negative indices
if (stop < 0)
stop = 0;
}
if (stop > this->dequeItems.size())
stop = this->dequeItems.size();
if (start > stop)
start = stop; // end of normalization
PK_ASSERT(start >= 0 && start <= this->dequeItems.size() && stop >= 0 && stop <= this->dequeItems.size() && start <= stop); // sanity check
int loopSize = std::min((int)(this->dequeItems.size()), stop);
int sz = this->dequeItems.size();
for (int i = start; i < loopSize; i++)
{
if (vm->py_eq(this->dequeItems[i], obj))
return i;
if (sz != this->dequeItems.size())// mutating the deque during iteration is not allowed
vm->RuntimeError("deque mutated during iteration");
}
return -1;
}
/// @brief pops or removes an item from the deque
/// @param front if true, pop from the front of the deque
/// @param back if true, pop from the back of the deque
/// @param item if front and back is not set, remove the first occurrence of item from the deque
/// @param vm is needed for the py_eq
/// @return PyVar if front or back is set, this is a pop operation and we return a PyVar, if front and back are not set, this is a remove operation and we return the removed item or nullptr
PyVar PyDeque::popObj(bool front, bool back, PyVar item, VM *vm)
{
// error handling
if (front && back)
throw std::runtime_error("both front and back are set"); // this should never happen
if (front || back)
{
// front or back is set, we don't care about item, this is a pop operation and we return a PyVar
if (this->dequeItems.empty())
throw std::runtime_error("pop from an empty deque"); // shouldn't happen
PyVar obj;
if (front)
{
obj = this->dequeItems.front();
this->dequeItems.pop_front();
}
else
{
obj = this->dequeItems.back();
this->dequeItems.pop_back();
}
return obj;
}
else
{
// front and back are not set, we care about item, this is a remove operation and we return the removed item or nullptr
int sz = this->dequeItems.size();
for (auto it = this->dequeItems.begin(); it != this->dequeItems.end(); ++it)
{
bool found = vm->py_eq((*it), item);
if (sz != this->dequeItems.size()) // mutating the deque during iteration is not allowed
vm->IndexError("deque mutated during iteration");
if (found)
{
PyVar obj = *it; // keep a reference to the object for returning
this->dequeItems.erase(it);
return obj;
}
}
return nullptr; // not found
}
}
/// @brief inserts an item into the deque
/// @param front if true, insert at the front of the deque
/// @param back if true, insert at the back of the deque
/// @param index if front and back are not set, insert at the given index
/// @param item the item to insert
/// @return true if the item was inserted successfully, false if the deque is bounded and is already at its maximum size
void PyDeque::insertObj(bool front, bool back, int index, PyVar item) // assume index is not fixed using the vm->normalized_index
{
// error handling
if (front && back)
throw std::runtime_error("both front and back are set"); // this should never happen
if (front || back)
{
// front or back is set, we don't care about index
if (this->bounded)
{
if (this->maxlen == 0)
return; // bounded and maxlen is 0, so we can't append
else if (this->dequeItems.size() == this->maxlen)
{
if (front)
this->dequeItems.pop_back(); // remove the last item
else if (back)
this->dequeItems.pop_front(); // remove the first item
}
}
if (front)
this->dequeItems.emplace_front(item);
else if (back)
this->dequeItems.emplace_back(item);
}
else
{
// front and back are not set, we care about index
if (index < 0)
index = this->dequeItems.size() + index; // try fixing for negative indices
if (index < 0) // still negative means insert at the beginning
this->dequeItems.push_front(item);
else if (index >= this->dequeItems.size()) // still out of range means insert at the end
this->dequeItems.push_back(item);
else
this->dequeItems.insert((this->dequeItems.begin() + index), item); // insert at the given index
}
}
/// @brief marks the deque items for garbage collection
void PyDeque::_gc_mark(VM* vm) const
{
for (PyVar obj : this->dequeItems) vm->obj_gc_mark(obj);
}
/// @brief registers the PyDeque class
void add_module_collections(VM *vm)
{
PyObject* mod = vm->new_module("collections");
vm->register_user_class<PyDeque>(mod, "deque", VM::tp_object, true);
vm->register_user_class<PyDequeIter>(mod, "_deque_iter");
CodeObject_ code = vm->compile(kPythonLibs_collections, "collections.py", EXEC_MODE);
vm->_exec(code, mod);
}
} // namespace pkpypkpy

View File

@ -1639,6 +1639,7 @@ void VM::__post_init_builtin_types(){
_lazy_modules["cmath"] = kPythonLibs_cmath;
_lazy_modules["itertools"] = kPythonLibs_itertools;
_lazy_modules["operator"] = kPythonLibs_operator;
_lazy_modules["collections"] = kPythonLibs_collections;
try{
// initialize dummy func_decl for exec/eval
@ -1665,7 +1666,6 @@ void VM::__post_init_builtin_types(){
add_module_dataclasses(this);
add_module_linalg(this);
add_module_easing(this);
add_module_collections(this);
add_module_array2d(this);
add_module_line_profiler(this);
add_module_enum(this);

View File

@ -30,31 +30,56 @@ assert q == deque([1, 2])
def assertEqual(a, b):
assert a == b
if a == b:
return
print(a)
print(b)
raise AssertionError
def assertNotEqual(a, b):
assert a != b
if a != b:
return
print(a)
print(b)
raise AssertionError
def printFailed(function_name, *args, **kwargs):
print("X Failed Tests for {} for args: {} {}".format(str(function_name), str(args), str(kwargs)))
BIG = 100000
BIG = 10000
def fail():
raise SyntaxError
yield 1
d = deque()
assertEqual(len(d), 0)
assertEqual(list(d), [])
d = deque(range(6))
assertEqual(list(d), list(range(6)))
d = deque(range(7)) # [0, 1, 2, 3, 4, 5, 6]
# print(d._data, d._head, d._tail, d._capacity)
assertEqual(list(d), list(range(7)))
d = deque(range(8))
assertEqual(list(d), list(range(8)))
d = deque(range(9))
assertEqual(list(d), list(range(9)))
d = deque(range(-5125, -5000))
# d.__init__(range(200)) # not supported
d = deque(range(200))
for i in range(200, 400):
d.append(i)
assertEqual(len(d), 400)
assertEqual(list(d), list(range(400)))
for i in reversed(range(-200, 0)):
d.appendleft(i)
assertEqual(list(d), list(range(-200, 400)))
assertEqual(len(d), 600)
assertEqual(list(d), list(range(-200, 400)))
left = [d.popleft() for i in range(250)]
assertEqual(left, list(range(-200, 50)))
@ -65,74 +90,6 @@ right.reverse()
assertEqual(right, list(range(150, 400)))
assertEqual(list(d), list(range(50, 150)))
####### TEST maxlen###############
try:
dq = deque()
dq.maxlen = -1
printFailed("deque.maxlen", -1)
exit(1)
except AttributeError:
pass
try:
dq = deque()
dq.maxlen = -2
printFailed("deque.maxlen", -2)
exit(1)
except AttributeError:
pass
it = iter(range(10))
d = deque(it, maxlen=3)
assertEqual(list(it), [])
assertEqual(repr(d), 'deque([7, 8, 9], maxlen=3)')
assertEqual(list(d), [7, 8, 9])
assertEqual(d, deque(range(10), 3))
d.append(10)
assertEqual(list(d), [8, 9, 10])
d.appendleft(7)
assertEqual(list(d), [7, 8, 9])
d.extend([10, 11])
assertEqual(list(d), [9, 10, 11])
d.extendleft([8, 7])
assertEqual(list(d), [7, 8, 9])
d = deque(range(200), maxlen=10)
d.append(d)
assertEqual(repr(d)[-30:], ', 198, 199, [...]], maxlen=10)')
d = deque(range(10), maxlen=None)
assertEqual(repr(d), 'deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])')
####### TEST maxlen = 0###############
it = iter(range(100))
deque(it, maxlen=0)
assertEqual(list(it), [])
it = iter(range(100))
d = deque(maxlen=0)
d.extend(it)
assertEqual(list(it), [])
it = iter(range(100))
d = deque(maxlen=0)
d.extendleft(it)
assertEqual(list(it), [])
####### TEST maxlen attribute #############
assertEqual(deque().maxlen, None)
assertEqual(deque('abc').maxlen, None)
assertEqual(deque('abc', maxlen=4).maxlen, 4)
assertEqual(deque('abc', maxlen=2).maxlen, 2)
assertEqual(deque('abc', maxlen=0).maxlen, 0)
try:
d = deque('abc')
d.maxlen = 10
printFailed("deque.maxlen", 10)
exit(1)
except AttributeError:
pass
######### TEST count()#################
for s in ('', 'abracadabra', 'simsalabim'*500+'abc'):
s = list(s)
@ -178,29 +135,21 @@ except ArithmeticError:
pass
class MutatingCompare:
def __eq__(self, other):
d.pop()
return True
# class MutatingCompare:
# def __eq__(self, other):
# d.pop()
# return True
# m = MutatingCompare()
# d = deque([1, 2, 3, m, 4, 5])
# m.d = d
m = MutatingCompare()
d = deque([1, 2, 3, m, 4, 5])
m.d = d
try:
d.count(3)
printFailed("deque.count", "MutatingCompare()")
exit(1)
except RuntimeError:
pass
d = deque([None]*16)
for i in range(len(d)):
d.rotate(-1)
d.rotate(1)
assertEqual(d.count(1), 0)
assertEqual(d.count(None), 16)
# try:
# d.count(3)
# printFailed("deque.count", "MutatingCompare()")
# exit(1)
# except RuntimeError:
# pass
#### TEST comparisons == #####
@ -234,25 +183,25 @@ for i in range(n):
assertEqual((n+1) not in d, True)
class MutateCmp:
def __init__(self, deque, result):
self.deque = deque
self.result = result
# class MutateCmp:
# def __init__(self, deque, result):
# self.deque = deque
# self.result = result
def __eq__(self, other):
self.deque.clear()
return self.result
# def __eq__(self, other):
# self.deque.clear()
# return self.result
# # Test detection of mutation during iteration
d = deque(range(n))
d[n//2] = MutateCmp(d, False)
try:
n in d
printFailed("deque.__contains__", n)
exit(1)
except RuntimeError:
pass
# # # Test detection of mutation during iteration
# d = deque(range(n))
# d[n//2] = MutateCmp(d, False)
# try:
# n in d
# printFailed("deque.__contains__", n)
# exit(1)
# except RuntimeError:
# pass
class BadCmp:
@ -262,7 +211,7 @@ class BadCmp:
# # Test detection of comparison exceptions
d = deque(range(n))
d[n//2] = BadCmp()
d.append(BadCmp())
try:
n in d
printFailed("deque.__contains__", n)
@ -270,33 +219,6 @@ try:
except RuntimeError:
pass
##### test_contains_count_stop_crashes#####
class A:
def __eq__(self, other):
d.clear()
return NotImplemented
d = deque([A(), A()])
try:
_ = 3 in d
printFailed("deque.__contains__", 3)
exit(1)
except RuntimeError:
pass
d = deque([A(), A()])
try:
_ = d.count(3)
printFailed("deque.count", 3)
exit(1)
except RuntimeError:
pass
######## TEST extend()################
@ -335,203 +257,6 @@ try:
except SyntaxError:
pass
##### TEST get_item ################
n = 200
d = deque(range(n))
l = list(range(n))
for i in range(n):
d.popleft()
l.pop(0)
if random.random() < 0.5:
d.append(i)
l.append(i)
for j in range(1-len(l), len(l)):
assert d[j] == l[j]
d = deque('superman')
assertEqual(d[0], 's')
assertEqual(d[-1], 'n')
d = deque()
try:
d.__getitem__(0)
printFailed("deque.__getitem__", 0)
exit(1)
except IndexError:
pass
try:
d.__getitem__(-1)
printFailed("deque.__getitem__", -1)
exit(1)
except IndexError:
pass
######### TEST index()###############
for n in 1, 2, 30, 40, 200:
d = deque(range(n))
for i in range(n):
assertEqual(d.index(i), i)
try:
d.index(n+1)
printFailed("deque.index", n+1)
exit(1)
except ValueError:
pass
# Test detection of mutation during iteration
d = deque(range(n))
d[n//2] = MutateCmp(d, False)
try:
d.index(n)
printFailed("deque.index", n)
exit(1)
except RuntimeError:
pass
# Test detection of comparison exceptions
d = deque(range(n))
d[n//2] = BadCmp()
try:
d.index(n)
printFailed("deque.index", n)
exit(1)
except RuntimeError:
pass
# Test start and stop arguments behavior matches list.index()
# COMMENT: Current List behavior doesn't support start and stop arguments, so this test is not supported
# elements = 'ABCDEFGHI'
# nonelement = 'Z'
# d = deque(elements * 2)
# s = list(elements * 2)
# for start in range(-5 - len(s)*2, 5 + len(s) * 2):
# for stop in range(-5 - len(s)*2, 5 + len(s) * 2):
# for element in elements + 'Z':
# try:
# print(element, start, stop)
# target = s.index(element, start, stop)
# except ValueError:
# try:
# d.index(element, start, stop)
# print("X Failed Tests!")
# exit(1)
# except ValueError:
# continue
# # with assertRaises(ValueError):
# # d.index(element, start, stop)
# assertEqual(d.index(element, start, stop), target)
# Test large start argument
d = deque(range(0, 10000, 10))
for step in range(100):
i = d.index(8500, 700)
assertEqual(d[i], 8500)
# Repeat test with a different internal offset
d.rotate()
########### test_index_bug_24913#############
d = deque('A' * 3)
try:
d.index('A', 1, 0)
printFailed("deque.index", 'A', 1, 0)
exit(1)
except ValueError:
pass
########### test_insert#############
# Test to make sure insert behaves like lists
elements = 'ABCDEFGHI'
for i in range(-5 - len(elements)*2, 5 + len(elements) * 2):
d = deque('ABCDEFGHI')
s = list('ABCDEFGHI')
d.insert(i, 'Z')
s.insert(i, 'Z')
assertEqual(list(d), s)
########### test_insert_bug_26194#############
data = 'ABC'
d = deque(data, maxlen=len(data))
try:
d.insert(0, 'Z')
printFailed("deque.insert", 0, 'Z')
exit(1)
except IndexError:
pass
elements = 'ABCDEFGHI'
for i in range(-len(elements), len(elements)):
d = deque(elements, maxlen=len(elements)+1)
d.insert(i, 'Z')
if i >= 0:
assertEqual(d[i], 'Z')
else:
assertEqual(d[i-1], 'Z')
######### test set_item #############
n = 200
d = deque(range(n))
for i in range(n):
d[i] = 10 * i
assertEqual(list(d), [10*i for i in range(n)])
l = list(d)
for i in range(1-n, 0, -1):
d[i] = 7*i
l[i] = 7*i
assertEqual(list(d), l)
########## test del_item #############
n = 500 # O(n**2) test, don't make this too big
d = deque(range(n))
try:
d.__delitem__(-n-1)
printFailed("deque.__delitem__", -n-1)
exit(1)
except IndexError:
pass
try:
d.__delitem__(n)
printFailed("deque.__delitem__", n)
exit(1)
except IndexError:
pass
for i in range(n):
assertEqual(len(d), n-i)
j = random.randint(0, len(d)-1)
val = d[j]
assertEqual(val in d, True)
del d[j]
assertEqual(val in d, False)
assertEqual(len(d), 0)
######### test reverse()###############
n = 500 # O(n**2) test, don't make this too big
data = [random.random() for i in range(n)]
for i in range(n):
d = deque(data[:i])
r = d.reverse()
assertEqual(list(d), list(reversed(data[:i])))
assertEqual(r, None)
d.reverse()
assertEqual(list(d), data[:i])
try:
d.reverse(1)
printFailed("deque.reverse", 1)
exit(1)
except TypeError:
pass
############ test rotate#############
s = tuple('abcde')
@ -550,6 +275,7 @@ assertEqual(tuple(d), s)
for i in range(n*3):
d = deque(s)
e = deque(d)
# print(i, d, e)
d.rotate(i) # check vs. rot(1) n times
for j in range(i):
e.rotate(1)
@ -643,54 +369,20 @@ assertEqual(list(d), [])
d.clear() # clear an empty deque
assertEqual(list(d), [])
############# test remove#############
d = deque('abcdefghcij')
d.remove('c')
assertEqual(d, deque('abdefghcij'))
d.remove('c')
assertEqual(d, deque('abdefghij'))
try:
d.remove('c')
printFailed("deque.remove", "c")
exit(1)
except ValueError:
pass
assertEqual(d, deque('abdefghij'))
# Handle comparison errors
d = deque(['a', 'b', BadCmp(), 'c'])
e = deque(d)
try:
d.remove('c')
printFailed("deque.remove", "c")
exit(1)
except RuntimeError:
pass
for x, y in zip(d, e):
# verify that original order and values are retained.
assertEqual(x is y, True)
# Handle evil mutator
for match in (True, False):
d = deque(['ab'])
d.extend([MutateCmp(d, match), 'c'])
try:
d.remove('c')
printFailed("deque.remove", "c")
exit(1)
except IndexError:
pass
assertEqual(d, deque())
########### test repr#############
d = deque(range(200))
e = eval(repr(d))
assertEqual(list(d), list(e))
d.append(d)
assertEqual(repr(d)[-20:], '7, 198, 199, [...]])')
d.append(None)
assertEqual(repr(d)[-19:], '7, 198, 199, None])')
######### test init #############
@ -794,24 +486,12 @@ assertEqual(list(d), list(e))
########## test pickle #############
for d in deque(range(200)), deque(range(200), 100):
for i in range(5 + 1):
s = pickle.dumps(d)
e = pickle.loads(s)
assertNotEqual(id(e), id(d))
assertEqual(list(e), list(d))
assertEqual(e.maxlen, d.maxlen)
######## test pickle recursive ########
# the following doesn't work because the pickle module doesn't
# for d in deque('abc'), deque('abc', 3):
# d.append(d)
# for i in range(5 + 1):
# e = pickle.loads(pickle.dumps(d))
# assertNotEqual(id(e), id(d))
# assertEqual(id(e[-1]), id(e))
# assertEqual(e.maxlen, d.maxlen)
d = deque(range(200))
for _ in range(5 + 1):
s = pickle.dumps(d)
e = pickle.loads(s)
assertNotEqual(id(e), id(d))
assertEqual(list(e), list(d))
### test copy ########
@ -828,12 +508,6 @@ assertEqual(list(d), list(e))
for s in ('abcd', range(2000)):
assertEqual(list(reversed(deque(s))), list(reversed(s)))
# probably not supported
# klass = type(reversed(deque()))
# for s in ('abcd', range(2000)):
# assertEqual(list(klass(deque(s))), list(reversed(s)))
d = deque()
for i in range(100):
d.append(1)