python如何獲取進程和線程狀態

threading.active_count()
Return the number of Thread objects currently alive. The returned count is equal to the length of the list returned by enumerate().
active_count可以返回當前活動的線程枚舉
我一般是這么用的

def getHeatsParallel(self): threads = [] for i in range(0, self.threadCount): t = threading.Thread(target=self.SomeFunction, name=str(i)) threads.append(t) t.start() for t in threads: t.join()

Ⅱ python中如何在父線程中檢測其子線程是否處於運行狀態

子線程有一個方法 is_alive()
運行時會返回Bool值True

Ⅲ python redis連接 線程安全么

在ConnectionPool之前,如果需要連接redis,我都是用StrictRedis這個類,在源碼中可以看到這個類的具體解釋:

redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an
implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server
使用的方法:

?

1
2

r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)
r.xxxx()

有了ConnectionPool這個類之後,可以使用如下方法

?

1
2

pool = redis.ConnectionPool(host=xxx, port=xxx, db=xxxx)
r = redis.Redis(connection_pool=pool)

這里Redis是StrictRedis的子類
簡單分析如下:
在StrictRedis類的__init__方法中,可以初始化connection_pool這個參數,其對應的是一個ConnectionPool的對象:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

class StrictRedis(object):
........
def __init__(self, host='localhost', port=6379,
db=0, password=None, socket_timeout=None,
socket_connect_timeout=None,
socket_keepalive=None, socket_keepalive_options=None,
connection_pool=None, unix_socket_path=None,
encoding='utf-8', encoding_errors='strict',
charset=None, errors=None,
decode_responses=False, retry_on_timeout=False,
ssl=False, ssl_keyfile=None, ssl_certfile=None,
ssl_cert_reqs=None, ssl_ca_certs=None):
if not connection_pool:
..........
connection_pool = ConnectionPool(**kwargs)
self.connection_pool = connection_pool

在StrictRedis的實例執行具體的命令時會調用execute_command方法,這里可以看到具體實現是從連接池中獲取一個具體的連接,然後執行命令,完成後釋放連接:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# COMMAND EXECUTION AND PROTOCOL PARSING
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
command_name = args[0]
connection = pool.get_connection(command_name, **options) #調用ConnectionPool.get_connection方法獲取一個連接
try:
connection.send_command(*args) #命令執行,這里為Connection.send_command
return self.parse_response(connection, command_name, **options)
except (ConnectionError, TimeoutError) as e:
connection.disconnect()
if not connection.retry_on_timeout and isinstance(e, TimeoutError):
raise
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
finally:
pool.release(connection) #調用ConnectionPool.release釋放連接

在來看看ConnectionPool類:
?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

class ConnectionPool(object):
...........
def __init__(self, connection_class=Connection, max_connections=None,
**connection_kwargs): #類初始化時調用構造函數
max_connections = max_connections or 2 ** 31
if not isinstance(max_connections, (int, long)) or max_connections < 0: #判斷輸入的max_connections是否合法
raise ValueError('"max_connections" must be a positive integer')
self.connection_class = connection_class #設置對應的參數
self.connection_kwargs = connection_kwargs
self.max_connections = max_connections
self.reset() #初始化ConnectionPool 時的reset操作
def reset(self):
self.pid = os.getpid()
self._created_connections = 0 #已經創建的連接的計數器
self._available_connections = [] #聲明一個空的數組,用來存放可用的連接
self._in_use_connections = set() #聲明一個空的集合,用來存放已經在用的連接
self._check_lock = threading.Lock()
.......
def get_connection(self, command_name, *keys, **options): #在連接池中獲取連接的方法
"Get a connection from the pool"
self._checkpid()
try:
connection = self._available_connections.pop() #獲取並刪除代表連接的元素,在第一次獲取connectiong時,因為_available_connections是一個空的數組,
會直接調用make_connection方法
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection) #向代表正在使用的連接的集合中添加元素
return connection
def make_connection(self): #在_available_connections數組為空時獲取連接調用的方法
"Create a new connection"
if self._created_connections >= self.max_connections: #判斷創建的連接是否已經達到最大限制,max_connections可以通過參數初始化
raise ConnectionError("Too many connections")
self._created_connections += 1 #把代表已經創建的連接的數值+1
return self.connection_class(**self.connection_kwargs) #返回有效的連接,默認為Connection(**self.connection_kwargs)
def release(self, connection): #釋放連接,鏈接並沒有斷開,只是存在鏈接池中
"Releases the connection back to the pool"
self._checkpid()
if connection.pid != self.pid:
return
self._in_use_connections.remove(connection) #從集合中刪除元素
self._available_connections.append(connection) #並添加到_available_connections 的數組中
def disconnect(self): #斷開所有連接池中的鏈接
"Disconnects all connections in the pool"
all_conns = chain(self._available_connections,
self._in_use_connections)
for connection in all_conns:
connection.disconnect()

execute_command最終調用的是Connection.send_command方法,關閉鏈接為 Connection.disconnect方法,而Connection類的實現:

?

1
2
3
4
5
6
7

class Connection(object):
"Manages TCP communication to and from a Redis server"
def __del__(self): #對象刪除時的操作,調用disconnect釋放連接
try:
self.disconnect()
except Exception:
pass

核心的鏈接建立方法是通過socket模塊實現:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

def _connect(self):
err = None
for res in socket.getaddrinfo(self.host, self.port, 0,
socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address = res
sock = None
try:
sock = socket.socket(family, socktype, proto)
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# TCP_KEEPALIVE
if self.socket_keepalive: #構造函數中默認 socket_keepalive=False,因此這里默認為短連接
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
for k, v in iteritems(self.socket_keepalive_options):
sock.setsockopt(socket.SOL_TCP, k, v)
# set the socket_connect_timeout before we connect
sock.settimeout(self.socket_connect_timeout) #構造函數中默認socket_connect_timeout=None,即連接為blocking的模式
# connect
sock.connect(socket_address)
# set the socket_timeout now that we're connected
sock.settimeout(self.socket_timeout) #構造函數中默認socket_timeout=None
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
.....

關閉鏈接的方法:

?

1
2
3
4
5
6
7
8
9
10
11

def disconnect(self):
"Disconnects from the Redis server"
self._parser.on_disconnect()
if self._sock is None:
return
try:
self._sock.shutdown(socket.SHUT_RDWR) #先shutdown再close
self._sock.close()
except socket.error:
pass
self._sock = None

可以小結如下
1)默認情況下每創建一個Redis實例都會構造出一個ConnectionPool實例,每一次訪問redis都會從這個連接池得到一個連接,操作完成後會把該連接放回連接池(連接並沒有釋放),可以構造一個統一的ConnectionPool,在創建Redis實例時,可以將該ConnectionPool傳入,那麼後續的操作會從給定的ConnectionPool獲得連接,不會再重復創建ConnectionPool。
2)默認情況下沒有設置keepalive和timeout,建立的連接是blocking模式的短連接。
3)不考慮底層tcp的情況下,連接池中的連接會在ConnectionPool.disconnect中統一銷毀。

Ⅳ 如何使用Python實現多進程編程

1.Process
創建進程的類:Process([group[,target[,name[,args[,kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name為別名。group實質上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程。
屬性:authkey、daemon(要通過start()設置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。其中daemon是父進程終止後自動終止,且自己不能產生新進程,必須在start()之前設置。

例1.1:創建函數並將其作為單個進程
importmultiprocessing
importtime

defworker(interval):
n=5
whilen>0:
print("Thetimeis{0}".format(time.ctime()))
time.sleep(interval)
n-=1

if__name__=="__main__":
p=multiprocessing.Process(target=worker,args=(3,))
p.start()
print"p.pid:",p.pid
print"p.name:",p.name
print"p.is_alive:",p.is_alive()
結果
12345678p.pid:8736p.name:Process-1p.is_alive:TrueThetimeisTueApr2120:55:122015ThetimeisTueApr2120:55:152015ThetimeisTueApr2120:55:182015ThetimeisTueApr2120:55:212015ThetimeisTueApr2120:55:242015

例1.2:創建函數並將其作為多個進程
importmultiprocessing
importtime

defworker_1(interval):
print"worker_1"
time.sleep(interval)
print"endworker_1"

defworker_2(interval):
print"worker_2"
time.sleep(interval)
print"endworker_2"

defworker_3(interval):
print"worker_3"
time.sleep(interval)
print"endworker_3"

if__name__=="__main__":
p1=multiprocessing.Process(target=worker_1,args=(2,))
p2=multiprocessing.Process(target=worker_2,args=(3,))
p3=multiprocessing.Process(target=worker_3,args=(4,))

p1.start()
p2.start()
p3.start()

print("ThenumberofCPUis:"+str(multiprocessing.cpu_count()))
forpinmultiprocessing.active_children():
print("childp.name:"+p.name+" p.id"+str(p.pid))
print"END!!!!!!!!!!!!!!!!!"
結果
1234567891011ThenumberofCPUis:4childp.name:Process-3p.id7992childp.name:Process-2p.id4204childp.name:Process-1p.id6380END!!!!!!!!!!!!!!!!!worker_1worker_3worker_2endworker_1endworker_2endworker_3

例1.3:將進程定義為類
importmultiprocessing
importtime

classClockProcess(multiprocessing.Process):
def__init__(self,interval):
multiprocessing.Process.__init__(self)
self.interval=interval

defrun(self):
n=5
whilen>0:
print("thetimeis{0}".format(time.ctime()))
time.sleep(self.interval)
n-=1

if__name__=='__main__':
p=ClockProcess(3)
p.start()
註:進程p調用start()時,自動調用run()
結果
12345thetimeisTueApr2120:31:302015thetimeisTueApr2120:31:332015thetimeisTueApr2120:31:362015thetimeisTueApr2120:31:392015thetimeisTueApr2120:31:422015

Ⅳ python進程能實現嗎

序. multiprocessing
python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。藉助這個包,可以輕松完成從單進程到並發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

回到頂部

1. Process

創建進程的類:Process([group [, target [, name [, args [, kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name為別名。group實質上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程。

屬性:authkey、daemon(要通過start()設置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。其中daemon是父進程終止後自動終止,且自己不能產生新進程,必須在start()之前設置。

例1.1:創建函數並將其作為單個進程

結果

1234567891011The number of CPU is:4child p.name:Process-3p.id7992child p.name:Process-2p.id4204child p.name:Process-1p.id6380END!!!!!!!!!!!!!!!!!worker_1worker_3worker_2end worker_1end worker_2end worker_3

Ⅵ 如何用python寫一個簡單的12306搶票軟體

看看這個可以嗎功能:1:全自動爬所有車站列表2:爬所有車站之間的所有車次,過濾重復,保存文件爬過程中保存文件遇到http異常,停止繼續腳本代碼[python]viewplain#coding:utf-8__author__='watsy'#城市對象classcityObject(object):def__init__(self,abbr_pinyin="",full_pinyin="",chinaname="",shortCode=""):self.abbr_pinyin=abbr_pinyinself.full_piyin=full_pinyinself.chinaname=chinanameself.shortCode=shortCode#火車classtrainObject(object):def__init__(self,tid="",code="",start_city="",start_time="",end_city="",end_time="",full_time=""):self.tid=tidself.code=codeself.start_city=start_cityself.start_time=start_timeself.end_city=end_cityself.end_time=end_timeself.full_time=full_timedefget_writestr(self):#return("%s,%s,%s,%s,%s,%s,%s")%(self.tid,self.code,self.start_city.encode('utf-8'),self.start_time,self.end_city.encode('utf-8'),self.end_time,self.full_time)str_return=self.tid+",";str_return+=self.code+",";str_return+=self.start_city+",";str_return+=self.start_time+",";str_return+=self.end_city+",";str_return+=self.end_time+",";str_return+=self.full_time;returnstr_return#火車列表classtrainModel(list):defisExist(self,train):forsub_traininself:ifsub_train.code==train.code:returnTruereturnFalsedefsave(self):train=self[-1]withopen(("%s.txt")%(train.code),"w")aswf:printtrain.get_writestr()wf.write(train.get_writestr().encode('utf-8'))#解析城市defparserCitys(data):parser_citys=[]fororiginal_cityindata:iforiginal_cityandlen(original_city)>1:split_city=original_city.split('|')parser_city=cityObject(split_city[0],split_city[3],split_city[1],split_city[2])parser_citys.append(parser_city)printlen(parser_citys)returnparser_citys#生成url地址defgetBookingTrainListUrl(start_code,end_code,day):strUrl=("/otsquery/query/queryRemanentTicketAction.do?method=queryLeftTicket&")strUrl+=("orderRequest.train_date=%s&")%(day)strUrl+=("orderRequest.from_station_telecode=%s&")%(start_code)strUrl+=("orderRequest.to_station_telecode=%s&")%(end_code)strUrl+=("orderRequest.train_no=&trainPassType=QB&trainClass=QB%23D%23Z%23T%23K%23QT%23&includeStudent=00&seatTypeAndNum=&orderRequest.start_time_str=00%3A00--24%3A00")returnstrUrltrains=trainModel()#解析預定車次列表defparser_booking_str(str_booking):json_book=json.loads(str_booking)datas=json_book['datas']ifdatasandlen(datas)>1:#printdatas.replace("","")trainlist=datas.replace("","").split("\\n")fortrain_strintrainlist:train_str_list=train_str.split(',')iflen(train_str_list)==17:str_id_and_code=train_str_list[1]str_start_city_and_time=train_str_list[2]str_end_city_and_time=train_str_list[3]str_full_time=train_str_list[4]#printstr_id_and_codestr_id=str_id_and_code[13:25]str_code=str_id_and_code[131:-7]#printstr_start_city_and_timeiflen(str_start_city_and_time)>50:str_start_city=str_start_city_and_time[43:-9]else:str_start_city=str_start_city_and_time[0:-9]str_start_time=str_start_city_and_time[-5:]#printstr_end_city_and_timeiflen(str_end_city_and_time)>50:str_end_city=str_end_city_and_time[42:-9]else:str_end_city=str_end_city_and_time[0:-9]str_end_time=str_end_city_and_time[-5:]tobj=trainObject(str_id,str_code,str_start_city,str_start_time,str_end_city,str_end_time,str_full_time)iftrains.isExist(tobj)==False:trains.append(tobj)trains.save()#打開城市列表頁面u=urllib2.urlopen("mon/station_name.js?version=1.40")buffer=u.read()u.close()#獲取列表buffer=buffer[20:-3]unformatter_citys=buffer.split('@')#得到城市parser_citys=parserCitys(unformatter_citys)city_length=len(parser_citys)today=datetime.date.today()torrow=datetime.timedelta(days=1)today=today+torrowday_str=("%s-%02d-%02d")%(today.year,int(today.month),int(today.day))print(day_str)strPath=os.getcwd()os.chdir("%s/train/"%strPath)foriinrange(1,city_length):forjinrange(0,len(parser_citys)-i):try:print("[%d%d]"%(i,j))sleep(0.09)strurl=getBookingTrainListUrl(parser_citys[i].shortCode,parser_citys[j].shortCode,day_str)url_add_header=urllib2.Request(strurl)url_add_header.add_header('X-Requested-With',"XMLHttpRequest")url_add_header.add_header('Referer',"/otsquery/query/queryRemanentTicketAction.do?method=init")url_add_header.add_header('Content-Type','application/x-www-form-urlencoded')url_add_header.add_header('Connection','keep-alive')resp=urllib2.urlopen(url_add_header)urlread=resp.read()resp.close()parser_booking_str(urlread)excepturllib2.HTTPErroraserr:print("error:[%s]url=[%s]")%(err,strurl)exit(1)os.chdir(strPath)printlen(trains)

Ⅶ python threads can only be started once

出現RuntimeError: threads can only be started once是因為你要啟動的這個進程已經在執行了,無法再次啟動它

Ⅷ python怎麼判斷線程的狀態

is_alive():
Return whether the thread is alive.

http://docs.python.org/3/library/threading.html#threading.Thread.is_alive

Ⅸ python 多線程狀態

threading.active_count()


Return the number of Thread objects currently alive. The returned count is
equal to the length of the list returned by enumerate().

active_count可以返抄回當前活動的線程枚舉


我一般是這么用的

defgetHeatsParallel(self):
threads=[]
foriinrange(0,self.threadCount):
t=threading.Thread(target=self.SomeFunction,name=str(i))
threads.append(t)
t.start()
fortinthreads:
t.join()

不過我不關心幾個活動,因為指定最多就是threadCount個線程

Ⅹ python 對於線程結束問題,注釋那邊怎麼寫結束語句呢sys.exit()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

def _exitCheckfunc():
print "ok"
try:
while 1:
alive=False
if thread_.isAlive():
alive=True
if not alive:
break
time.sleep(1)
#為了使得統計時間能夠運行,要捕捉 KeyboardInterrupt :ctrl-c
except KeyboardInterrupt, e:
traceback.print_exc()
print "consume time :",time.time()-start

threading._shutdown=_exitCheckfunc

自己在主線程中寫一個死循環來接受ctrl+c的信號。