扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
Python中,multiprocessing库中Pool类代表进程池,其对象有imap()和imap_unordered()方法。
在林周等地区,都构建了全面的区域性战略布局,加强发展的系统性、市场前瞻性、产品创新能力,以专注、极致的服务理念,为客户提供成都网站制作、成都网站设计 网站设计制作按需定制开发,公司网站建设,企业网站建设,品牌网站制作,全网营销推广,成都外贸网站建设,林周网站建设费用合理。
两者都用于对大量数据遍历多进程计算,返回一个迭代器(multiprocessing.pool.IMapIterator)。
imap返回结果顺序和输入相同,imap_unordered则为不保证顺序。
经过测试,发现Python多进程和imap()的一些特性:
1、 iter = pool.imap(fn, data) 一旦生成,无论使不使用iter,多进程计算都会开始。
计算结果会缓存在内存中,所以要注意内存用尽的问题。
2、fn,即执行函数,不可以是局部对象(不能嵌套在其他函数里),否则会报错:
AttributeError: Can't pickle local object 'fn_outer.locals.fn'
3、使用进程池map数据时,如果每次的运算量很小,最后的效率还不如单进程。这时多进程切换造成的开销已大于多进程计算提升的效率。
这时,可以将输入数据集分段,每次map,计算一段。具体分段多大时获得最佳效率,需要实际测试。
4、注意,Pool使用完毕后必须关闭,否则进程不会退出。
有两种写法,推荐第2种:
注意 ,第二种中,必须在with的块内使用iter。
本节判断列表排序的函数名格式为IsListSorted_XXX()。为简洁起见,除代码片段及其输出外,一律以_XXX()指代。
2.1 guess
def IsListSorted_guess(lst):
listLen = len(lst) if listLen = 1: return True
#由首个元素和末尾元素猜测可能的排序规则
if lst[0] == lst[-1]: #列表元素相同
for elem in lst: if elem != lst[0]: return False
elif lst[0] lst[-1]: #列表元素升序
for i, elem in enumerate(lst[1:]): if elem lst[i]: return False
else: #列表元素降序
for i, elem in enumerate(lst[1:]): if elem lst[i]: return False
return True
_guess()是最通用的实现,几乎与语言无关。值得注意的是,该函数内会猜测给定列表可能的排序规则,因此无需外部调用者指明排序规则。
2.2 sorted
def IsListSorted_sorted(lst):
return sorted(lst) == lst or sorted(lst, reverse=True) == lst
_sorted()使用Python内置函数sorted()。由于sorted()会对未排序的列表排序,_sorted()函数主要适用于已排序列表。
若想判断列表未排序后再对其排序,不如直接调用列表的sort()方法,因为该方法内部会判断列表是否排序。对于已排序列表,该方法的时间复杂度为线性阶O(n)——判断为O(n)而排序为O(nlgn)。
2.3 for-loop
def IsListSorted_forloop(lst, key=lambda x, y: x = y):
for i, elem in enumerate(lst[1:]): #注意,enumerate默认迭代下标从0开始
if not key(lst[i], elem): #if elem lst[i]更快,但通用性差
return False
return True
无论列表是否已排序,本函数的时间复杂度均为线性阶O(n)。注意,参数key表明缺省的排序规则为升序。
2.4 all
def IsListSorted_allenumk(lst, key=lambda x, y: x = y):
return all(key(lst[i], elem) for i, elem in enumerate(lst[1:]))import operatordef IsListSorted_allenumo(lst, oCmp=operator.le):
return all(oCmp(lst[i], elem) for i, elem in enumerate(lst[1:]))def IsListSorted_allenumd(lst):
return all((lst[i] = elem) for i, elem in enumerate(lst[1:]))def IsListSorted_allxran(lst, key=lambda x,y: x = y):
return all(key(lst[i],lst[i+1]) for i in xrange(len(lst)-1))def IsListSorted_allzip(lst, key=lambda x,y: x = y):
from itertools import izip #Python 3中zip返回生成器(generator),而izip被废弃
return all(key(a, b) for (a, b) in izip(lst[:-1],lst[1:]))
lambda表达式与operator运算符速度相当,前者简单灵活,后者略为高效(实测并不一定)。但两者速度均不如列表元素直接比较(可能存在调用开销)。亦即,_allenumd()快于_allenumo()快于_allenumk()。
若使用lambda表达式指示排序规则,更改规则时只需要改变x和y之间的比较运算符;若使用operator模块指示排序规则,更改规则时需要改变对象比较方法。具体地,lt(x, y)等效于x y,le(x, y)等效于x = y,eq(x, y)等效于x == y,ne(x, y)等效于x != y,gt(x, y)等效于x y,ge(x, y)等效于x = y。例如,_allenumo()函数若要严格升序可设置oCmp=operator.lt。
此外,由all()函数的帮助信息可知,_allenumk()其实是_forloop()的等效形式。
2.5 numpy
def IsListSorted_numpy(arr, key=lambda dif: dif = 0):
import numpy try: if arr.dtype.kind == 'u': #无符号整数数组执行np.diff时存在underflow风险
arr = numpy.int64(lst) except AttributeError: pass #无dtype属性,非数组
return (key(numpy.diff(arr))).all() #numpy.diff(x)返回相邻数组元素的差值构成的数组
NumPy是用于科学计算的Python基础包,可存储和处理大型矩阵。它包含一个强大的N维数组对象,比Python自身的嵌套列表结构(nested list structure)高效得多。第三节的实测数据表明,_numpy()处理大型列表时性能非常出色。
在Windows系统中可通过pip install numpy命令安装NumPy包,不建议登录官网下载文件自行安装。
2.6 reduce
def IsListSorted_reduce(iterable, key=lambda x, y: x = y):
cmpFunc = lambda x, y: y if key(x, y) else float('inf') return reduce(cmpFunc, iterable, .0) float('inf')
reduce实现是all实现的变体。累加器(accumulator)中仅存储最后一个检查的列表元素,或者Infinity(若任一元素小于前个元素值)。
前面2.1~2.5小节涉及下标操作的函数适用于列表等可迭代对象(Iterable)。对于通用迭代器(Iterator)对象,即可以作用于next()函数或方法的对象,可使用_reduce()及后面除_rand()外各小节的函数。迭代器的计算是惰性的,只有在需要返回下一个数据时才会计算,以避免不必要的计算。而且,迭代器方式无需像列表那样切片为两个迭代对象。
2.7 imap
def IsListSorted_itermap(iterable, key=lambda x, y: x = y):
from itertools import imap, tee
a, b = tee(iterable) #为单个iterable创建两个独立的iterator
next(b, None) return all(imap(key, a, b))
2.8 izip
def IsListSorted_iterzip(iterable, key=lambda x, y: x = y):
from itertools import tee, izip
a, b = tee(iterable) next(b, None) return all(key(x, y) for x, y in izip(a, b))def pairwise(iterable):
from itertools import tee, izip
a, b = tee(iterable) next(b, None) return izip(a, b) #"s - (s0,s1), (s1,s2), (s2, s3), ..."def IsListSorted_iterzipf(iterable, key=lambda x, y: x = y):
return all(key(a, b) for a, b in pairwise(iterable))
第三节的实测数据表明,虽然存在外部函数调用,_iterzipf()却比_iterzip()略为高效。
2.9 fast
def IsListSorted_fastd(lst):
it = iter(lst) try:
prev = it.next() except StopIteration: return True
for cur in it: if prev cur: return False
prev = cur return Truedef IsListSorted_fastk(lst, key=lambda x, y: x = y):
it = iter(lst) try:
prev = it.next() except StopIteration: return True
for cur in it: if not key(prev, cur): return False
prev = cur return True
_fastd()和_fastk()是Stack Overflow网站回答里据称执行最快的。实测数据表明,在列表未排序时,它们的性能表现确实优异。
2.10 random
import randomdef IsListSorted_rand(lst, randNum=3, randLen=100):
listLen = len(lst) if listLen = 1: return True
#由首个元素和末尾元素猜测可能的排序规则
if lst[0] lst[-1]: #列表元素升序
key = lambda dif: dif = 0
else: #列表元素降序或相等
key = lambda dif: dif = 0
threshold, sortedFlag = 10000, True
import numpy if listLen = threshold or listLen = randLen*2 or not randNum: return (key(numpy.diff(numpy.array(lst)))).all() from random import sample for i in range(randNum):
sortedRandList = sorted(sample(xrange(listLen), randLen))
flag = (key(numpy.diff(numpy.array([lst[x] for x in sortedRandList])))).all()
sortedFlag = sortedFlag and flag return sortedFlag
_rand()借助随机采样降低运算规模,并融入其他判断函数的优点。例如,猜测列表可能的排序规则,并在随机采样不适合时使用相对快速的判断方式,如NumPy。
通过line_profiler分析可知,第20行和第21行与randLen有关,但两者耗时接近。因此randLen应小于listLen的一半,以抵消sorted开销。除内部限制外,用户可以调节随机序列个数和长度,如定制单个但较长的序列。
注意,_rand()不适用于存在微量异常数据的长列表。因为这些数据很可能被随机采样遗漏,从而影响判断结果的准确性。
想用python做一个很简单的接收邮件的功能,只看python的官方doc()真的很不好懂,经过google之,探索之,稍微总结一下:
要使用imap接收邮件,当然要导入imaplib拉.
import imaplib
然后按常规的,建立链接→登录
conn = imaplib.IMAP4("imap.xxx.com",143)
conn.login("userName","password")
然后我想查看收件箱的邮件,咋办呢?要先选择一个目录,收件箱默认名称是"INBOX",IMAP是支持创建文件夹,查看其它文件夹的,如果是自己新建的文件夹,那么名称一般会是"INBOX.新建文件夹",不同的邮箱可能表示方式不一样,如果你不知道的话,那运行conn.list()查看所有的文件夹.
conn.select("INBOX")
选择后,然后查看文件夹,注意,IMAP的查看其实是一个搜索的过程,IMAP的原始命令是search all(大概的),在python里这么用:
type, data = conn.search(None, 'ALL')
然后返回的是这个收件箱里所有邮件的编号,按接收时间升序排列,最后的表示最近.
search这个很鬼麻烦,因为官方文档里没讲这个函数的第二个参数怎么用,于是找了下,可以填的命令有:
于是如果我想找Essh邮件的话,使用
type, data = conn.search(None, '(SUBJECT "Essh")')
里面要用一个括号,代表是一个查询条件,可以同时指定多个查询条件,例如FROM xxxx SUBJECT "aaa",注意,命令要用括号罩住(痛苦的尝试)
search第一个参数是charset的意思,填None表示用默认ASCII,
data里获取到的是一个只有一个字符串元素的数组,包含很多数字,用空格隔开
['1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103'
于是想获取最后一封的做法是:
msgList = data[0].split()
last = msgList[len(msgList) - 1]
然后把那个邮件获取回来,用fetch函数
例子:
conn.fetch(last, '(RFC822.SIZE BODY[HEADER.FIELDS (SUBJECT)])')
但是返回的是一串MIME编码的东东,看不懂,如果能像eml那一操作一封邮件就好了.
方法是有的,用email库.
import email
然后以RFC822获取邮件格式,再用email.message_from_string转换为message对象.就可以当message操作了,()
type,data=connect.fetch(msgList[len(msgList)-1],'(RFC822)')
msg=email.message_from_string(data[0][1])
content=msg.get_payload(decode=True)
最后content得到就是邮件的内容了
您好,
imap_append : 附加字符串到指定的邮箱中。
imap_base64 : 解 BASE64 编码。
imap_body : 读信的内文。
imap_check : 返回邮箱信息。
: 关闭 IMAP 链接。
imap_createmailbox : 建立新的信箱。
imap_delete : 标记欲删除邮件。
imap_deletemailbox : 删除既有信箱。
imap_expunge : 删除已标记的邮件。
imap_fetchbody : 从信件内文取出指定部分。
imap_fetchstructure : 获取某信件的结构信息。
imap_header : 获取某信件的标头信息。
imap_headers : 获取全部信件的标头信息。
imap_listmailbox : 获取邮箱列示。
imap_listsubscribed : 获取订阅邮箱列示。
imap_mail_copy : 复制指定信件到它处邮箱。
imap_mail_move : 移动指定信件到它处邮箱。
imap_num_msg : 取得信件数。
imap_num_recent : 取得新进信件数。
imap_open : 打开 IMAP 链接。
imap_ping : 检查 IMAP 是否连接。
imap_renamemailbox : 更改邮箱名字。
imap_reopen : 重开 IMAP 链接。
imap_subscribe : 订阅邮箱。
imap_undelete : 取消删除邮件标记。
imap_unsubscribe : 取消订阅邮箱。
imap_qprint : 将 qp 编码转成八位。
imap_8bit : 将八位转成 qp 编码。
imap_binary : 将八位转成 base64 编码。
imap_scanmailbox : 寻找信件有无特定字符串。
imap_mailboxmsginfo : 取得目前邮箱的信息。
imap_rfc822_write_address : 电子邮件位址标准化。
imap_rfc822_parse_adrlist : 解析电子邮件位址。
imap_setflag_full : 配置信件标志。
imap_clearflag_full : 清除信件标志。
imap_sort : 将信件标头排序。
imap_fetchheader : 取得原始标头。
imap_uid : 取得信件 UID。
imap_getmailboxes : 取得全部信件详细信息。
imap_getsubscribed : 列出所有订阅邮箱。
imap_msgno : 列出 UID 的连续信件。
imap_search : 搜寻指定标准的信件。
imap_last_error : 最后的错误信息。
imap_errors : 所有的错误信息。
imap_alerts : 所有的警告信息。
imap_status : 目前的状态信息。
当我们有一个很长很长的任务队列(mission_list)和阈值对应的一个处理函数(missionFunction)时,我们一般采用如下的方式进行处理:
但是,如果这任务列表很长很长,处理函数很复杂(占用cpu)时,单核往往需要很长的时间进行处理,此时,Multiprocess便可以极大的提高我们程序的运行速度,相关内容请借鉴 multiprocessing --- 基于进程的并行 — Python 3.10.4 文档。
以上这种场景下,推荐大家采用最简单的进程池+map的方法进行处理,标准的写法, chunksize要借鉴官方的说法,最好大一点 :
但是!!!! 如果我们的任务列表非常的长,这会导致多进程还没跑起来之前,内存已经撑爆了,任务自然没法完成,此时我们有几种办法进行优化:
进程的启动方法有三种,可参考官方文档:
[图片上传失败...(image-48cd3c-1650511153989)]
在linux环境下,使用forkserver可以节省很多的内存空间, 因为进程启动的是一个服务,不会把主进程的数据全部复制
采用imap会极大的节省空间,它返回的是一个迭代器,也就是结果列表:
但注意,以上写法中,你写的结果迭代部分必须写在with下面。或者采用另一种写法:
还有最后一种,当你的mission list实在太大了,导致你在生成 mission list的时候已经把内存撑爆了,这个时候就得优化 mission_list了,如果你的mission_list是通过一个for循环生成的,你可以使用yield字段,将其封装为一个迭代器,传入进程池:
这样子,我们就封装好了mission_list,它是一个可迭代对象,在取数据的时候才会将数据拉到内存
我在项目中结合了后两种方法,原本256G的内存都不够用,但在修改后内存只占用了不到10G。希望能够帮助到你
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流