查看: 2245|回复: 3

Python multiprocessing pickling error

[复制链接]

10

主题

72

帖子

180

积分

注册会员

Rank: 2

积分
180
发表于 2018-9-18 17:53:40 | 显示全部楼层 |阅读模式
很抱歉,我无法用更简单的示例重现错误,而且我的代码太复杂而无法发布。如果我在IPython shell而不是常规python中运行程序,那么事情就会很顺利。
我查看了以前关于这个问题的一些注意事项。它们都是由在类函数中定义的pool to call函数引起的。但对我来说情况并非如此。
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我将不胜感激任何帮助。
回复

使用道具 举报

11

主题

80

帖子

199

积分

注册会员

Rank: 2

积分
199
发表于 2018-9-18 17:54:27 | 显示全部楼层
如果函数在模块的顶层定义,则它们只能被选中。
这段代码:
import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()
产生的错误几乎与您发布的错误相同:
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
问题是这些pool方法都使用a queue.Queue将任务传递给工作进程。经过的所有东西queue.Queue必须是可挑选的,并且foo.work不可挑选,因为它没有在模块的顶层定义。
它可以通过在顶层定义一个函数来修复,该函数调用foo.work():
def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))
请注意,这foo是可选择的,因为它Foo是在顶层定义的并且 foo.__dict__是可选择的。
回复

使用道具 举报

9

主题

74

帖子

185

积分

注册会员

Rank: 2

积分
185
发表于 2018-9-18 17:55:00 | 显示全部楼层
我会用pathos.multiprocesssing,而不是multiprocessing。 pathos.multiprocessing是一个multiprocessing使用的分支dill。dill可以在python中序列化几乎所有内容,因此您可以并行发送更多内容。该pathos叉也有直接与多个参数的函数的工作,因为你需要为类方法的能力。
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y):
...     return x+y
...
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>>
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
...
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101
在这里获取pathos(如果你愿意的话dill):https: //github.com/uqfoundation
回复

使用道具 举报

11

主题

63

帖子

159

积分

注册会员

Rank: 2

积分
159
发表于 2018-9-18 17:56:22 | 显示全部楼层
正如其他人所说multiprocessing,只能将Python对象转移到其他的工作进程。如果您无法按照unutbu的描述重新组织代码,则可以使用dills扩展的pickling / unpickling功能来传输数据(尤其是代码数据),如下所示。
此解决方案仅需要安装dill而不需要其他库pathos:
  1. import os
  2. from multiprocessing import Pool

  3. import dill


  4. def run_dill_encoded(payload):
  5.     fun, args = dill.loads(payload)
  6.     return fun(*args)


  7. def apply_async(pool, fun, args):
  8.     payload = dill.dumps((fun, args))
  9.     return pool.apply_async(run_dill_encoded, (payload,))


  10. if __name__ == "__main__":

  11.     pool = Pool(processes=5)

  12.     # asyn execution of lambda
  13.     jobs = []
  14.     for i in range(10):
  15.         job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
  16.         jobs.append(job)

  17.     for job in jobs:
  18.         print job.get()
  19.     print

  20.     # async execution of static method

  21.     class O(object):

  22.         @staticmethod
  23.         def calc():
  24.             return os.getpid()

  25.     jobs = []
  26.     for i in range(10):
  27.         job = apply_async(pool, O.calc, ())
  28.         jobs.append(job)

  29.     for job in jobs:
  30.         print job.get()
复制代码
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表