def manageWork(self):
#file = open("/home/ddimitrov/20160305_en_wikilinks/tmp/missing_article_ids.p",'r')
file = open(SSD_HOME+"pickle/redirects_ids.obj",'r')
object_file = pickle.load(file)
#print object_file
#print type(object_file)
for root, dirs, files in os.walk(STATIC_HTML_DUMP_ARTICLES_DIR+self.path):
for i, file_name in enumerate(files):
if file_name.endswith(".zip"):
parts = file_name.split('_')
if long(parts[1]) in object_file:
try:
self.parse_article(file_name,root)
except Exception as e:
print("FILENAME_FAIL:"+file_name)
print(type(e)) # the exception instance
print(e)
print (e.message)
python类pickle()的实例源码
def _try_store_in_file(self, request, response):
try:
if (not response.session_id or self._forget
or self._unchanged(response)):
# self.clear_session_cookies()
self.save_session_id_cookie()
return False
if response.session_new or not response.session_file:
# Tests if the session sub-folder exists, if not, create it
session_folder = os.path.dirname(response.session_filename)
if not os.path.exists(session_folder):
os.mkdir(session_folder)
response.session_file = recfile.open(response.session_filename, 'wb')
portalocker.lock(response.session_file, portalocker.LOCK_EX)
response.session_locked = True
if response.session_file:
session_pickled = response.session_pickled or pickle.dumps(self, pickle.HIGHEST_PROTOCOL)
response.session_file.write(session_pickled)
response.session_file.truncate()
finally:
self._close(response)
self.save_session_id_cookie()
return True
def _try_store_in_file(self, request, response):
try:
if (not response.session_id or self._forget
or self._unchanged(response)):
# self.clear_session_cookies()
self.save_session_id_cookie()
return False
if response.session_new or not response.session_file:
# Tests if the session sub-folder exists, if not, create it
session_folder = os.path.dirname(response.session_filename)
if not os.path.exists(session_folder):
os.mkdir(session_folder)
response.session_file = recfile.open(response.session_filename, 'wb')
portalocker.lock(response.session_file, portalocker.LOCK_EX)
response.session_locked = True
if response.session_file:
session_pickled = response.session_pickled or pickle.dumps(self, pickle.HIGHEST_PROTOCOL)
response.session_file.write(session_pickled)
response.session_file.truncate()
finally:
self._close(response)
self.save_session_id_cookie()
return True
def doMap(self, i):
f = open("#split-%s-%s" % (self.path, i), "r")
keyvalue = f.readline()
value = f.read()
f.close()
os.unlink("#split-%s-%s" % (self.path, i))
keyvaluelist = self.Map(keyvalue, value)
for r in range(0, self.reducetask):
# print "map", i, "#map-%s-%s-%d" % (self.path, i, r)
f = open("#map-%s-%s-%d" % (self.path, i, r), "w+")
itemlist = [item for item in keyvaluelist if self.Partition(item) == r]
pickle.dump(itemlist, f)
f.close()
return [(i, r) for r in range(0, self.reducetask)]
# Get reduce regions from maptasks, sort by key, and apply Reduce for each key
def doReduce(self, i):
keys = {}
out = []
for m in range(0, self.maptask):
# print "reduce", i, "#map-%s-%s-%d" % (self.path, m, i)
f = open("#map-%s-%s-%d" % (self.path, m, i), "r")
itemlist = pickle.load(f)
for item in itemlist:
if keys.has_key(item[0]):
keys[item[0]].append(item)
else:
keys[item[0]] = [item]
f.close()
os.unlink("#map-%s-%s-%d" % (self.path, m, i))
for k in sorted(keys.keys()):
out.append(self.Reduce(k, keys[k]))
f = open("#reduce-%s-%d" % (self.path, i), "w+")
pickle.dump(out, f)
f.close()
return i
# The master.
def no_tracing(func):
"""Decorator to temporarily turn off tracing for the duration of a test."""
if not hasattr(sys, 'gettrace'):
return func
else:
def wrapper(*args, **kwargs):
original_trace = sys.gettrace()
try:
sys.settrace(None)
return func(*args, **kwargs)
finally:
sys.settrace(original_trace)
wrapper.__name__ = func.__name__
return wrapper
# Return True if opcode code appears in the pickle, else False.
def test_long(self):
for proto in protocols:
# 256 bytes is where LONG4 begins.
for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257:
nbase = 1L << nbits
for npos in nbase-1, nbase, nbase+1:
for n in npos, -npos:
pickle = self.dumps(n, proto)
got = self.loads(pickle)
self.assertEqual(n, got)
# Try a monster. This is quadratic-time in protos 0 & 1, so don't
# bother with those.
nbase = long("deadbeeffeedface", 16)
nbase += nbase << 1000000
for n in nbase, -nbase:
p = self.dumps(n, 2)
got = self.loads(p)
self.assertEqual(n, got)
def test_singletons(self):
# Map (proto, singleton) to expected opcode.
expected_opcode = {(0, None): pickle.NONE,
(1, None): pickle.NONE,
(2, None): pickle.NONE,
(0, True): pickle.INT,
(1, True): pickle.INT,
(2, True): pickle.NEWTRUE,
(0, False): pickle.INT,
(1, False): pickle.INT,
(2, False): pickle.NEWFALSE,
}
for proto in protocols:
for x in None, False, True:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertTrue(x is y, (proto, x, s, y))
expected = expected_opcode[proto, x]
self.assertEqual(opcode_in_pickle(expected, s), True)
def test_newobj_proxies(self):
# NEWOBJ should use the __class__ rather than the raw type
import weakref
classes = myclasses[:]
# Cannot create weakproxies to these classes
for c in (MyInt, MyLong, MyStr, MyTuple):
classes.remove(c)
for proto in protocols:
for C in classes:
B = C.__base__
x = C(C.sample)
x.foo = 42
p = weakref.proxy(x)
s = self.dumps(p, proto)
y = self.loads(s)
self.assertEqual(type(y), type(x)) # rather than type(p)
detail = (proto, C, B, x, y, type(y))
self.assertEqual(B(x), B(y), detail)
self.assertEqual(x.__dict__, y.__dict__, detail)
# Register a type with copy_reg, with extension code extcode. Pickle
# an object of that type. Check that the resulting pickle uses opcode
# (EXT[124]) under proto 2, and not in proto 1.
def test_list_chunking(self):
n = 10 # too small to chunk
x = range(n)
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
self.assertEqual(num_appends, proto > 0)
n = 2500 # expect at least two chunks when proto > 0
x = range(n)
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
if proto == 0:
self.assertEqual(num_appends, 0)
else:
self.assertTrue(num_appends >= 2)
def test_dict_chunking(self):
n = 10 # too small to chunk
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
self.assertEqual(num_setitems, proto > 0)
n = 2500 # expect at least two chunks when proto > 0
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
if proto == 0:
self.assertEqual(num_setitems, 0)
else:
self.assertTrue(num_setitems >= 2)
def test_reduce_bad_iterator(self):
# Issue4176: crash when 4th and 5th items of __reduce__()
# are not iterators
class C(object):
def __reduce__(self):
# 4th item is not an iterator
return list, (), None, [], None
class D(object):
def __reduce__(self):
# 5th item is not an iterator
return dict, (), None, None, []
# Protocol 0 in Python implementation is less strict and also accepts
# iterables.
for proto in protocols:
try:
self.dumps(C(), proto)
except (AttributeError, pickle.PicklingError, cPickle.PicklingError):
pass
try:
self.dumps(D(), proto)
except (AttributeError, pickle.PicklingError, cPickle.PicklingError):
pass
def no_tracing(func):
"""Decorator to temporarily turn off tracing for the duration of a test."""
if not hasattr(sys, 'gettrace'):
return func
else:
def wrapper(*args, **kwargs):
original_trace = sys.gettrace()
try:
sys.settrace(None)
return func(*args, **kwargs)
finally:
sys.settrace(original_trace)
wrapper.__name__ = func.__name__
return wrapper
# Return True if opcode code appears in the pickle, else False.
def test_long(self):
for proto in protocols:
# 256 bytes is where LONG4 begins.
for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257:
nbase = 1L << nbits
for npos in nbase-1, nbase, nbase+1:
for n in npos, -npos:
pickle = self.dumps(n, proto)
got = self.loads(pickle)
self.assertEqual(n, got)
# Try a monster. This is quadratic-time in protos 0 & 1, so don't
# bother with those.
nbase = long("deadbeeffeedface", 16)
nbase += nbase << 1000000
for n in nbase, -nbase:
p = self.dumps(n, 2)
got = self.loads(p)
self.assertEqual(n, got)
def test_singletons(self):
# Map (proto, singleton) to expected opcode.
expected_opcode = {(0, None): pickle.NONE,
(1, None): pickle.NONE,
(2, None): pickle.NONE,
(0, True): pickle.INT,
(1, True): pickle.INT,
(2, True): pickle.NEWTRUE,
(0, False): pickle.INT,
(1, False): pickle.INT,
(2, False): pickle.NEWFALSE,
}
for proto in protocols:
for x in None, False, True:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertTrue(x is y, (proto, x, s, y))
expected = expected_opcode[proto, x]
self.assertEqual(opcode_in_pickle(expected, s), True)
def test_newobj_proxies(self):
# NEWOBJ should use the __class__ rather than the raw type
import weakref
classes = myclasses[:]
# Cannot create weakproxies to these classes
for c in (MyInt, MyLong, MyStr, MyTuple):
classes.remove(c)
for proto in protocols:
for C in classes:
B = C.__base__
x = C(C.sample)
x.foo = 42
p = weakref.proxy(x)
s = self.dumps(p, proto)
y = self.loads(s)
self.assertEqual(type(y), type(x)) # rather than type(p)
detail = (proto, C, B, x, y, type(y))
self.assertEqual(B(x), B(y), detail)
self.assertEqual(x.__dict__, y.__dict__, detail)
# Register a type with copy_reg, with extension code extcode. Pickle
# an object of that type. Check that the resulting pickle uses opcode
# (EXT[124]) under proto 2, and not in proto 1.
def test_list_chunking(self):
n = 10 # too small to chunk
x = range(n)
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
self.assertEqual(num_appends, proto > 0)
n = 2500 # expect at least two chunks when proto > 0
x = range(n)
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
if proto == 0:
self.assertEqual(num_appends, 0)
else:
self.assertTrue(num_appends >= 2)
def test_dict_chunking(self):
n = 10 # too small to chunk
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
self.assertEqual(num_setitems, proto > 0)
n = 2500 # expect at least two chunks when proto > 0
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
if proto == 0:
self.assertEqual(num_setitems, 0)
else:
self.assertTrue(num_setitems >= 2)
def test_reduce_bad_iterator(self):
# Issue4176: crash when 4th and 5th items of __reduce__()
# are not iterators
class C(object):
def __reduce__(self):
# 4th item is not an iterator
return list, (), None, [], None
class D(object):
def __reduce__(self):
# 5th item is not an iterator
return dict, (), None, None, []
# Protocol 0 in Python implementation is less strict and also accepts
# iterables.
for proto in protocols:
try:
self.dumps(C(), proto)
except (AttributeError, pickle.PicklingError, cPickle.PicklingError):
pass
try:
self.dumps(D(), proto)
except (AttributeError, pickle.PicklingError, cPickle.PicklingError):
pass
def __init__(self, datadir, loadFunc):
self.datadir = datadir
if datadir[-1] == '/': self.datadir = self.datadir[:-1]
idxf = datadir + '/index.pkl'
if not os.path.exists(idxf):
raise ValueError('Index pickle not found')
idx = pickle.load(open(idxf, 'rb'))
fnames = set([])
for k in idx:
for fname in idx[k]:
### Backward compat with earlier versions of this code:
if isinstance(fname, (long, int)):
fname = '%d.ogg' % (fname)
if fname is None: continue
fname = fname.split('/')[-1]
###
fnames.add(fname)
self.idx = idx
self.fnames = list(fnames)
self.loadFunc = loadFunc
def _try_store_in_file(self, request, response):
if not self._enabled: return
try:
if (not response.session_id or self._unchanged(response)):
# self.save_session_id_cookie() # CHECK THIS
return False
if response.session_new or not response.session_file:
# Tests if the session sub-folder exists, if not, create it
session_folder = os.path.dirname(response.session_filename)
if not os.path.exists(session_folder):
os.mkdir(session_folder)
response.session_file = recfile.open(response.session_filename, 'wb')
portalocker.lock(response.session_file, portalocker.LOCK_EX)
response.session_locked = True
if response.session_file:
session_pickled = response.session_pickled or pickle.dumps(self, pickle.HIGHEST_PROTOCOL)
response.session_file.write(session_pickled)
response.session_file.truncate()
finally:
self._close(response)
self.save_session_id_cookie()
return True
def count_opcode(code, pickle):
n = 0
for op, dummy, dummy in pickletools.genops(pickle):
if op.code == code:
n += 1
return n
# We can't very well test the extension registry without putting known stuff
# in it, but we have to be careful to restore its original state. Code
# should do this:
#
# e = ExtensionSaver(extension_code)
# try:
# fiddle w/ the extension registry's stuff for extension_code
# finally:
# e.restore()
def test_proto(self):
build_none = pickle.NONE + pickle.STOP
for proto in protocols:
expected = build_none
if proto >= 2:
expected = pickle.PROTO + chr(proto) + expected
p = self.dumps(None, proto)
self.assertEqual(p, expected)
oob = protocols[-1] + 1 # a future protocol
badpickle = pickle.PROTO + chr(oob) + build_none
try:
self.loads(badpickle)
except ValueError, detail:
self.assertTrue(str(detail).startswith(
"unsupported pickle protocol"))
else:
self.fail("expected bad protocol number to raise ValueError")
def test_singletons(self):
# Map (proto, singleton) to expected opcode.
expected_opcode = {(0, None): pickle.NONE,
(1, None): pickle.NONE,
(2, None): pickle.NONE,
(0, True): pickle.INT,
(1, True): pickle.INT,
(2, True): pickle.NEWTRUE,
(0, False): pickle.INT,
(1, False): pickle.INT,
(2, False): pickle.NEWFALSE,
}
for proto in protocols:
for x in None, False, True:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertTrue(x is y, (proto, x, s, y))
expected = expected_opcode[proto, x]
self.assertEqual(opcode_in_pickle(expected, s), True)
def test_dict_chunking(self):
n = 10 # too small to chunk
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
self.assertEqual(num_setitems, proto > 0)
n = 2500 # expect at least two chunks when proto > 0
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
if proto == 0:
self.assertEqual(num_setitems, 0)
else:
self.assertTrue(num_setitems >= 2)
def test_reduce_bad_iterator(self):
# Issue4176: crash when 4th and 5th items of __reduce__()
# are not iterators
class C(object):
def __reduce__(self):
# 4th item is not an iterator
return list, (), None, [], None
class D(object):
def __reduce__(self):
# 5th item is not an iterator
return dict, (), None, None, []
# Protocol 0 is less strict and also accept iterables.
for proto in protocols:
try:
self.dumps(C(), proto)
except (AttributeError, pickle.PickleError, cPickle.PickleError):
pass
try:
self.dumps(D(), proto)
except (AttributeError, pickle.PickleError, cPickle.PickleError):
pass
def _try_store_in_file(self, request, response):
try:
if (not response.session_id or self._forget
or self._unchanged(response)):
# self.clear_session_cookies()
self.save_session_id_cookie()
return False
if response.session_new or not response.session_file:
# Tests if the session sub-folder exists, if not, create it
session_folder = os.path.dirname(response.session_filename)
if not os.path.exists(session_folder):
os.mkdir(session_folder)
response.session_file = recfile.open(response.session_filename, 'wb')
portalocker.lock(response.session_file, portalocker.LOCK_EX)
response.session_locked = True
if response.session_file:
session_pickled = response.session_pickled or pickle.dumps(self, pickle.HIGHEST_PROTOCOL)
response.session_file.write(session_pickled)
response.session_file.truncate()
finally:
self._close(response)
self.save_session_id_cookie()
return True
def count_opcode(code, pickle):
n = 0
for op, dummy, dummy in pickletools.genops(pickle):
if op.code == code:
n += 1
return n
# We can't very well test the extension registry without putting known stuff
# in it, but we have to be careful to restore its original state. Code
# should do this:
#
# e = ExtensionSaver(extension_code)
# try:
# fiddle w/ the extension registry's stuff for extension_code
# finally:
# e.restore()
def test_proto(self):
build_none = pickle.NONE + pickle.STOP
for proto in protocols:
expected = build_none
if proto >= 2:
expected = pickle.PROTO + chr(proto) + expected
p = self.dumps(None, proto)
self.assertEqual(p, expected)
oob = protocols[-1] + 1 # a future protocol
badpickle = pickle.PROTO + chr(oob) + build_none
try:
self.loads(badpickle)
except ValueError, detail:
self.assertTrue(str(detail).startswith(
"unsupported pickle protocol"))
else:
self.fail("expected bad protocol number to raise ValueError")
def test_singletons(self):
# Map (proto, singleton) to expected opcode.
expected_opcode = {(0, None): pickle.NONE,
(1, None): pickle.NONE,
(2, None): pickle.NONE,
(0, True): pickle.INT,
(1, True): pickle.INT,
(2, True): pickle.NEWTRUE,
(0, False): pickle.INT,
(1, False): pickle.INT,
(2, False): pickle.NEWFALSE,
}
for proto in protocols:
for x in None, False, True:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertTrue(x is y, (proto, x, s, y))
expected = expected_opcode[proto, x]
self.assertEqual(opcode_in_pickle(expected, s), True)