pocketpy/python/pickle.py
2023-09-10 03:22:01 +08:00

181 lines
4.7 KiB
Python

import json
import builtins
_BASIC_TYPES = [int, float, str, bool, type(None)]
_MOD_T_SEP = "@"
def _find_class(path: str):
if _MOD_T_SEP not in path:
return builtins.__dict__[path]
modpath, name = path.split(_MOD_T_SEP)
return __import__(modpath).__dict__[name]
def _find__new__(cls):
while cls is not None:
d = cls.__dict__
if "__new__" in d:
return d["__new__"]
cls = cls.__base__
assert False
class _Pickler:
def __init__(self, obj) -> None:
self.obj = obj
self.raw_memo = {} # id -> int
self.memo = [] # int -> object
def _type_id(self, o: type):
assert type(o) is type
name = o.__name__
mod = o.__module__
if mod is not None:
name = mod.__path__ + _MOD_T_SEP + name
return name
def wrap(self, o):
if type(o) in _BASIC_TYPES:
return o
if type(o) is type:
return ["type", self._type_id(o)]
index = self.raw_memo.get(id(o), None)
if index is not None:
return [index]
ret = []
index = len(self.memo)
self.memo.append(ret)
self.raw_memo[id(o)] = index
if type(o) is tuple:
ret.append("tuple")
ret.append([self.wrap(i) for i in o])
return [index]
if type(o) is bytes:
ret.append("bytes")
ret.append([o[j] for j in range(len(o))])
return [index]
if type(o) is list:
ret.append("list")
ret.append([self.wrap(i) for i in o])
return [index]
if type(o) is dict:
ret.append("dict")
ret.append([[self.wrap(k), self.wrap(v)] for k,v in o.items()])
return [index]
_0 = self._type_id(type(o))
if hasattr(o, "__getnewargs__"):
_1 = o.__getnewargs__() # an iterable
_1 = [self.wrap(i) for i in _1]
else:
_1 = None
if o.__dict__ is None:
_2 = None
else:
_2 = {}
for k,v in o.__dict__.items():
_2[k] = self.wrap(v)
ret.append(_0)
ret.append(_1)
ret.append(_2)
return [index]
def run_pipe(self):
o = self.wrap(self.obj)
return [o, self.memo]
class _Unpickler:
def __init__(self, obj, memo: list) -> None:
self.obj = obj
self.memo = memo
self._unwrapped = [None] * len(memo)
def tag(self, index, o):
assert self._unwrapped[index] is None
self._unwrapped[index] = o
def unwrap(self, o, index=None):
if type(o) in _BASIC_TYPES:
return o
assert type(o) is list
if o[0] == "type":
return _find_class(o[1])
# reference
if type(o[0]) is int:
assert index is None # index should be None
index = o[0]
if self._unwrapped[index] is None:
o = self.memo[index]
assert type(o) is list
assert type(o[0]) is str
self.unwrap(o, index)
assert self._unwrapped[index] is not None
return self._unwrapped[index]
# concrete reference type
if o[0] == "tuple":
ret = tuple([self.unwrap(i) for i in o[1]])
self.tag(index, ret)
return ret
if o[0] == "bytes":
ret = bytes(o[1])
self.tag(index, ret)
return ret
if o[0] == "list":
ret = []
self.tag(index, ret)
for i in o[1]:
ret.append(self.unwrap(i))
return ret
if o[0] == "dict":
ret = {}
self.tag(index, ret)
for k,v in o[1]:
ret[self.unwrap(k)] = self.unwrap(v)
return ret
# generic object
cls, newargs, state = o
cls = _find_class(o[0])
# create uninitialized instance
new_f = _find__new__(cls)
if newargs is not None:
newargs = [self.unwrap(i) for i in newargs]
inst = new_f(cls, *newargs)
else:
inst = new_f(cls)
self.tag(index, inst)
# restore state
if state is not None:
for k,v in state.items():
setattr(inst, k, self.unwrap(v))
return inst
def run_pipe(self):
return self.unwrap(self.obj)
def _wrap(o):
return _Pickler(o).run_pipe()
def _unwrap(packed: list):
return _Unpickler(*packed).run_pipe()
def dumps(o) -> bytes:
o = _wrap(o)
return json.dumps(o).encode()
def loads(b) -> object:
assert type(b) is bytes
o = json.loads(b.decode())
return _unwrap(o)