十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
花了一天时间用python为服务写了个压力测试。很简单,多线程向服务器发请求。但写完之后发现如果中途想停下来,按Ctrl+C达不到效果,自然想到要用信号处理函数捕捉信号,使线程都停下来,问题解决的方法请往下看:
成都创新互联公司自2013年起,是专业互联网技术服务公司,拥有项目做网站、成都网站制作网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元仁和做网站,已为上家服务,为仁和各地企业和个人服务,联系电话:18980820575
复制代码代码如下:
#!/bin/env python
# -*- coding: utf-8 -*-
#filename: peartest.py
import threading, signal
is_exit = False
def doStress(i, cc):
global is_exit
idx = i
while not is_exit:
if (idx 10000000):
print "thread[%d]: idx=%d"%(i, idx)
idx = idx + cc
else:
break
print "thread[%d] complete."%i
def handler(signum, frame):
global is_exit
is_exit = True
print "receive a signal %d, is_exit = %d"%(signum, is_exit)
if __name__ == "__main__":
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
cc = 5
for i in range(cc):
t = threading.Thread(target=doStress, args=(i,cc))
t.start()
上面是一个模拟程序,并不真正向服务发送请求,而代之以在一千万以内,每个线程每隔并发数个(cc个)打印一个整数。很明显,当所有线程都完成自己的任务后,进程会正常退出。但如果我们中途想退出(试想一个压力测试程序,在中途已经发现了问题,需要停止测试),该肿么办?你当然可以用ps查找到进程号,然后kill -9杀掉,但这样太繁琐了,捕捉Ctrl+C是最自然的想法。上面示例程序中已经捕捉了这个信号,并修改全局变量is_exit,线程中会检测这个变量,及时退出。
但事实上这个程序并不work,当你按下Ctrl+C时,程序照常运行,并无任何响应。网上搜了一些资料,明白是python的子线程如果不是daemon的话,主线程是不能响应任何中断的。但设为daemon后主线程会随之退出,接着整个进程很快就退出了,所以还需要在主线程中检测各个子线程的状态,直到所有子线程退出后自己才退出,因此上例29行之后的代码可以修改为:
复制代码代码如下:
threads=[]
for i in range(cc):
t = threading.Thread(target=doStress, args=(i, cc))
t.setDaemon(True)
threads.append(t)
t.start()
for i in range(cc):
threads[i].join()
重新试一下,问题依然没有解决,进程还是没有响应Ctrl+C,这是因为join()函数同样会waiting在一个锁上,使主线程无法捕获信号。因此继续修改,调用线程的isAlive()函数判断线程是否完成:
复制代码代码如下:
while 1:
alive = False
for i in range(cc):
alive = alive or threads[i].isAlive()
if not alive:
break
这样修改后,程序完全按照预想运行了:可以顺利的打印每个线程应该打印的所有数字,也可以中途用Ctrl+C终结整个进程。完整的代码如下:
复制代码代码如下:
#!/bin/env python
# -*- coding: utf-8 -*-
#filename: peartest.py
import threading, signal
is_exit = False
def doStress(i, cc):
global is_exit
idx = i
while not is_exit:
if (idx 10000000):
print "thread[%d]: idx=%d"%(i, idx)
idx = idx + cc
else:
break
if is_exit:
print "receive a signal to exit, thread[%d] stop."%i
else:
print "thread[%d] complete."%i
def handler(signum, frame):
global is_exit
is_exit = True
print "receive a signal %d, is_exit = %d"%(signum, is_exit)
if __name__ == "__main__":
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
cc = 5
threads = []
for i in range(cc):
t = threading.Thread(target=doStress, args=(i,cc))
t.setDaemon(True)
threads.append(t)
t.start()
while 1:
alive = False
for i in range(cc):
alive = alive or threads[i].isAlive()
if not alive:
break
其实,如果用python写一个服务,也需要这样,因为负责服务的那个线程是永远在那里接收请求的,不会退出,而如果你想用Ctrl+C杀死整个服务,跟上面的压力测试程序是一个道理。总结一下,python多线程中要响应Ctrl+C的信号以杀死整个进程,需要:
1.把所有子线程设为Daemon;
2.使用isAlive()函数判断所有子线程是否完成,而不是在主线程中用join()函数等待完成;
3.写一个响应Ctrl+C信号的函数,修改全局变量,使得各子线程能够检测到,并正常退出。
FFT (Fast Fourier Transform, 快速傅里叶变换) 是离散傅里叶变换的快速算法,也是数字信号处理技术中经常会提到的一个概念。用快速傅里叶变换能将时域的数字信号转换为频域信号,转换为频域信号后我们可以很方便地分析出信号的频率成分。
当我们把双频信号FFT示例中的 fft_size 的值改为 2**12 时,这时,基频为 16Hz,不能被 1kHz整除,所以 1kHz 处发生了频谱泄露,而它能被 4kHz 整除,所以 4kHz 可以很好地被采样。
由于波形的前后不是连续的,出现波形跳变,而跳变处有着非常广泛的频谱,因此FFT的结果中出现了频谱泄漏。
为了减小FFT所截取的数据段前后的跳变,可以对数据先乘以一个窗函数,使得其前后数据能平滑过渡。常用的hanning窗函数的定义如下:
50Hz 正弦波与hann窗函数乘积之后的重复波形如下:
我们对频谱泄漏示例中的1kHz 和 4kHz 信号进行了 hann 窗函数处理,可以看出能量更加集中在 1kHz 和 4kHz,在一定程度上抑制了频谱泄漏。
以 1kHz 三角波为例,我们知道三角波信号中含有丰富的频率信息,它的傅里叶级数展开为:
当数字信号的频率随时间变化时,我们称之为扫频信号。以频率随时间线性变化的扫频信号为例,其数学形式如下:
其频率随时间线性变化,当我们在 [0,1] 的时间窗口对其进行采样时,其频率范围为 0~5kHz。当时间是连续时,扫频信号的频率也是连续的。但是在实际的处理中,是离散的点采样,因此时间是不连续的,这就使扫频信号的快速傅里叶变换问题退化为多点频信号快速傅里叶变换问题。其快速傅里叶变换得到的频谱图如下所示:
以 50Hz 正弦信号相位调制到 1kHz 的信号为例,其信号形式如下:
它的时域波形,频率响应和相位响应如下图所示:
以扫频信号为例,当我们要探究FFT中的能量守恒时,我们要回归到信号最初的形式:
在了解了Linux的信号基础之 后,Python标准库中的signal包就很容易学习和理解。signal包负责在Python程序内部处理信号,典型的操作包括预设信号处理函数,暂 停并等待信号,以及定时发出SIGALRM等。要注意,signal包主要是针对UNIX平台(比如Linux, MAC OS),而Windows内核中由于对信号机制的支持不充分,所以在Windows上的Python不能发挥信号系统的功能。
信号(signal)-- 进程之间通讯的方式,是一种软件中断。一个进程一旦接收到信号就会打断原来的程序执行流程来处理信号。
定义信号名
signal包定义了各个信号名及其对应的整数,比如:
import signal
print(signal.SIGABRT)
print(signal.SIG_DFL)
Python所用的信号名与Linux一致,可以通过$ man 7 signal 查询
预设信号处理函数
signal包的核心是使用signal.signal()函数来预设(register)信号处理函数,如下所示:
singnal.signal(signalnum, handler)
signalnum为某个信号,handler为该信号的处理函数。我们在信号基础里提到,进程可以无视信号,可以采取默认操作,还可以自定义操作。当handler为signal.SIG_IGN时,信号被无视(ignore)。当handler为singal.SIG_DFL,进程采取默认操作(default)。当handler为一个函数名时,进程采取函数中定义的操作。
import signal
# Define signal handler function
def myHandler(signum, frame):
print('I received: ', signum)
# register signal.SIGTSTP's handler
signal.signal(signal.SIGTSTP, myHandler)
signal.pause()
print('End of Signal Demo')
# 有问题待测试
在主程序中,我们首先使用signal.signal()函数来预设信号处理函数。然后我们执行signal.pause()来让该进程暂停以等待信号, 以等待信号。当信号SIGUSR1被传递给该进程时,进程从暂停中恢复,并根据预设,执行SIGTSTP的信号处理函数myHandler()。 myHandler的两个参数一个用来识别信号(signum),另一个用来获得信号发生时,进程栈的状况(stack frame)。这两个参数都是由signal.singnal()函数来传递的。
上面的程序可以保存在一个文件中(比如test.py)。我们使用如下方法运行:
$python test.py
以便让进程运行。当程序运行到signal.pause()的时候,进程暂停并等待信号。此时,通过按下CTRL+Z向该进程发送SIGTSTP信号。我们可以看到,进程执行了myHandle()函数, 随后返回主程序,继续执行。(当然,也可以用$ps查询process ID, 再使用$kill来发出信号。)
(进程并不一定要使用signal.pause()暂停以等待信号,它也可以在进行工作中接受信号,比如将上面的signal.pause()改为一个需要长时间工作的循环。)
我们可以根据自己的需要更改myHandler()中的操作,以针对不同的信号实现个性化的处理。
定时发出SIGALRM信号
一个有用的函数是signal.alarm(),它被用于在一定时间之后,向进程自身发送SIGALRM信号:
import signal
# Define signal handler function
def myHandler(signum, frame):
print("Now, it's the time")
exit()
# register signal.SIGALRM's handler
signal.signal(signal.SIGALRM, myHandler)
signal.alarm(5)
while True:
print('not yet')
我们这里用了一个无限循环以便让进程持续运行。在signal.alarm()执行5秒之后,进程将向自己发出SIGALRM信号,随后,信号处理函数myHandler开始执行。
发送信号
signal包的核心是设置信号处理函数。除了signal.alarm()向自身发送信号之外,并没有其他发送信号的功能。但在os包中,有类似于linux的kill命令的函数,分别为
os.kill(pid, sid)
os.killpg(pgid, sid)
分别向进程和进程组(见Linux进程关系)发送信号。sid为信号所对应的整数或者singal.SIG*。
实际上signal, pause,kill和alarm都是Linux应用编程中常见的C库函数,在这里,我们只不过是用Python语言来实现了一下。实际上,Python 的解释器是使用C语言来编写的,所以有此相似性也并不意外。此外,在Python 3.4中,signal包被增强,信号阻塞等功能被加入到该包中。我们暂时不深入到该包中。
总结
signal.SIG*
signal.signal()
signal.pause()
signal.alarm()
数字信号处理是把信号用数字或符号表示成序列,通过计算机或通用(专用)信号处理设备,用数值计算方法进行各种处理,达到提取有用信息便于应用的目的。例如:滤波、检测、变换、增强、估计、识别、参数提取、频谱分析等。
一般地讲,数字信号处理涉及三个步骤:
⑴模数转换(A/D转换):把模拟信号变成数字信号,是一个对自变量和幅值同时进行离散化的过程,基本的理论保证是采样定理。
⑵数字信号处理(DSP):包括变换域分析(如频域变换)、数字滤波、识别、合成等。
⑶数模转换(D/A转换):把经过处理的数字信号还原为模拟信号。通常,这一步并不是必须的。 作为DSP的成功例子有很多,如医用CT断层成像扫描仪的发明。它是利用生物体的各个部位对X射线吸收率不同的现象,并利用各个方向扫描的投影数据再构造出检测体剖面图的仪器。这种仪器中fft(快速傅里叶变换)起到了快速计算的作用。以后相继研制出的还有:采用正电子的CT机和基于核磁共振的CT机等仪器,它们为医学领域作出了很大的贡献。
信号处理的目的是:削弱信号中的多余内容;滤出混杂的噪声和干扰;或者将信号变换成容易处理、传输、分析与识别的形式,以便后续的其它处理。 下面的示意图说明了信号处理的概念。
在日常项目中,我们经常会使用python从字符串中提取我们想要的信息,以下是各种提取信息方法的总结。
格式: str[beg:end:step]
描述: 字符串[开始索引:结束索引:步长]切取字符串为开始索引到结束索引-1内的字符串步长不指定时步长为1
举例:
print(str[::2]) //::这里表示整个字符串,每两个位置提取一个
print(str[1:3]) //提取第2个到第3个
print(str[2::]) //截取2 - 末尾的字符
本小节介绍了,处理字符串经常用到的一些函数方法。
语法: str.find(str, beg=0, end=len(string))
描述: Python find() 方法检测字符串中是否包含子字符串 str ,如果指定 beg(开始) 和 end(结束) 范围,则检查是否包含在指定范围内,如果包含子字符串返回开始的索引值,否则返回-1。
语法: str.split(str="", num=string.count(str)).
描述: Python split() 通过指定分隔符对字符串进行切片,如果参数 num 有指定值,则分隔 num+1 个子字符串.返回分割后的字符串列表,该方法可以讲字符串转化为列表处理。
另外的: str.splitlines([keepends])按照行('\r', '\r\n', \n')分隔,返回一个包含各行作为元素的列表,如果参数 keepends 为 False,不包含换行符,如果为 True,则保留换行符。
语法: str.partition(str)
描述: partition() 方法用来根据指定的分隔符将字符串进行分割。如果字符串包含指定的分隔符,则返回一个3元的元组,第一个为分隔符左边的子串,第二个为分隔符本身,第三个为分隔符右边的子串。
语法: str.replace(old, new, max)
描述: Python replace() 方法把字符串中的 old(旧字符串) 替换成 new(新字符串),如果指定第三个参数max,则替换不超过 max 次。
语法: str.strip([chars]);
描述: Python strip() 方法用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列。:该方法只能删除开头或是结尾的字符,不能删除中间部分的字符。
语法: str.join(sequence)
描述: Python join() 方法用于将序列中的元素以指定的字符连接生成一个新的字符串。
上述方法还有其变形,如str.rfind(),这代表从字符串右边开始处理,正常是从左边开始处理。下表是其它常用的python字符串自带函数方法。
正则表达式是一个特殊的字符序列,它能帮助你方便的检查一个字符串是否与某种模式匹配。本小节主要介绍Python中常用的正则表达式处理函数和正则表达式的书写规则。
re 模块使 Python 语言拥有全部的正则表达式功能。所以在python中使用正则表达式处理函数需要import re
语法: re.search(pattern, string, flags=0)
描述: re.search 扫描整个字符串并返回第一个成功的匹配。匹配成功re.search方法返回一个匹配的对象,否则返回None。
语法: re.sub(pattern, repl, string, count=0, flags=0)
描述: Python 的 re 模块提供了re.sub用于替换字符串中的匹配项。
语法: pattern.findall(string, pos, endpos)
描述: 在字符串中找到正则表达式所匹配的所有子串,并返回一个列表,如果没有找到匹配的,则返回空列表。注意: match 和 search 是匹配一次 findall 匹配所有。
模式字符串使用特殊的语法来表示一个正则表达式: