异度部落格

学习是一种生活态度。

0%

生产者-消费者问题是系统进程里面的一个经典问题,这里用 Python 简单模拟一下。

#!/usr/bin/env python
#生产者-消费者问题
import threading
from random import randint
from time import sleep, ctime
from Queue import Queue
#创建MyThread子类
class MyThread(threading.Thread):
def __init__(self, func, args, name = ''):
threading.Thread.__init__(self)
self.name = name
self.func = func
self.args = args
def getResult(self):
return self.res
def run(self):
print'starting', self.name, 'at:', ctime()
self.res = apply(self.func, self.args)
print self.name, 'finished at:', ctime()
#生产者与消费者问题
def writeQueue(queue):
queue.put('Anything', 1)
print "Producing object for Queue. Size now", queue.qsize()
def readQueue(queue):
val = queue.get(1)
print 'Consumed object from Queue. Size now', queue.qsize()
def produce(queue, loops):
for i in range(loops):
writeQueue(queue)
sleep(randint(1,3))
def consume(queue, loops):
for i in range(loops):
readQueue(queue)
sleep(randint(2,5))
def main():
funcs = [produce, consume]
nfuncs = range(len(funcs))
nloops = randint(2, 5)
queue = Queue(32)
threads = []
for i in nfuncs:
t = MyThread(funcs[i], (queue, nloops),funcs[i].__name__)
threads.append(t)
for i in nfuncs:
threads[i].start()
for i in nfuncs:
threads[i].join()
print 'All Done!!'
if __name__ == '__main__':
main()

Python 的文件遍历主要使用的 os 这个模块,这里为了方便显示同时使用了一个图形化的库 Tkinter。Tkinter 是 Python 自带的一个 GUI 开发库,虽然没有 PyQt 或 wxPython 那么强大,但是基本的使用绝对足够了。

#!/usr/bin/env python
#文件遍历GUI
import os
from time import sleep
from Tkinter import *
class DirList(object):
def __init__(self, initDir = None):
self.top = Tk()
self.label = Label(self.top, text = 'Directory Lister')
self.label.pack()

self.cwd = StringVar(self.top)
self.dirLabel = Label(self.top, fg = 'blue',
font = ('Helvetica', 12, 'bold'))
self.dirLabel.pack()
self.dirFrame = Frame(self.top)
self.dirScrollbar = Scrollbar(self.dirFrame)
self.dirScrollbar.pack(side = RIGHT, fill = Y)
self.dirListbox = Listbox(self.dirFrame, height = 15,width = 50,
yscrollcommand = self.dirScrollbar.set)
self.dirListbox.bind('<Double-l>', self.setDirAndShow)
self.dirScrollbar.config(command = self.dirListbox.yview)
self.dirListbox.pack(side = LEFT, fill = BOTH)
self.dirFrame.pack()
self.dirEntry = Entry(self.top, width = 50,
textvariable = self.cwd)
self.dirEntry.bind('<Return>', self.showList)
self.dirEntry.pack()
self.buttonFrame = Frame(self.top)
self.clearButton = Button(self.buttonFrame, text = 'Clear',
command = self.clearDir,
activeforeground = 'white',
activebackground = 'blue')
self.listButton = Button(self.buttonFrame, text = 'List Directory',
command = self.showList,
activeforeground = 'white',
activebackground = 'green')
self.quitButton = Button(self.buttonFrame, text = 'Quit',
command = self.top.quit,
activeforeground = 'white',
activebackground = 'red')
self.clearButton.pack(side = LEFT)
self.listButton.pack(side = LEFT)
self.quitButton.pack(side = LEFT)
self.buttonFrame.pack()
if initDir:
self.cwd.set(os.curdir)
self.showList()

#清空列表
def clearDir(self, event = None):
self.cwd.set('')
#设置目录并显示
def setDirAndShow(self, event = None):
self.lastDir = self.cwd.get()
self.dirListbox.cofig(selectbackground = 'red')
check = self.dirListbox.get(self.dirListbox.curselection())
if not check:
check = os.curdir
self.cwd.set(check)
self.showList()
#显示列表
def showList(self, event = None):
error = ''
tmp = self.cwd.get()
if not tmp:
tmp = os.curdir
if not os .path.exists(tmp):
error = tmp + ': no such file'
elif not os.path.isdir(tmp):
error = tmp + ':not a directory'
if error:
self.cwd.set(error)
self.top.update()
sleep(2)
if not (hasattr(self, 'last') and self.lastDir):
self.lastDir = os.curdir
self.cwd.set(self.lastDir)
self.dirListbox.config(selectbackground = 'LightSkyBlue')
self.top.update()
return
self.cwd.set('Fetching Directory contents...')
self.top.update()
dirlist = os.listdir(tmp)
os.chdir(tmp)
self.dirLabel.config(text = os.getcwd())
self.dirListbox.delete(0, END)
self.dirListbox.insert(END, os.curdir)
self.dirListbox.insert(END, os.pardir)
for eachFile in dirlist:
self.dirListbox.insert(END, eachFile)
self.cwd.set(os.curdir)
self.dirListbox.config(selectbackground = 'LightSkyBlue')
#主函数
def main():
dirList = DirList(os.curdir)
dirList.top.mainloop()
dirList.top.destroy()
if __name__ == '__main__':
main()

这里的程序并不复杂,下面主要对随机数据的生成部分进行分析

def randNumGen(self) :
numStr = '0123456789'
randNum = ''
for i in range(4) :
n = choice(numStr)
randNum += n
numStr = numStr.replace(n, '')
self.sysNum = randNum

这里由于数字是不重复的,所以可以先定义好一个 0-9 的数字串,然后用 choice 函数随机选出一个,之后删除这个数字,再次 choice,直到选出 4 位数字为止。

下面是完整的代码:

#!/usr/bin/env python
#猜数字游戏
from string import digits
from random import randint, choice
class GuessNumber(object) :
#构造器
def __init__(self) :
self.count = 0 #用于统计猜测次数
self.usrNums = [] #用户所猜数字
self.sysNum = '' #系统生成数字
self.results = [] #猜测结果
self.isWinner = False
#打印规则
def printRules(self) :
print """由系统随机生成不重复的四位数字,用户猜,之后系统进行提示。
A代表数字正确位置正确,B代表数字正确位置错误。
如正确答案为 5234,而猜的人猜 5346,则是 1A2B,其中有一个5的位置对了
,记为1A;3和4这两个数字对了,而位置没对,因此记为 2B,合起来就是 1A2B。
   接着猜的人再根据出题者的几A几B继续猜,直到猜中(即 4A0B)为止"""
#随机数字生成
def randNumGen(self) :
numStr = '0123456789'
randNum = ''
for i in range(4) :
n = choice(numStr)
randNum += n
numStr = numStr.replace(n, '')
self.sysNum = randNum
#print 'Debug >> SysNum::', self.sysNum
#数字输入
def numInput(self) :
self.count += 1
while True:
num = raw_input('输入您想要猜的数字:')
print 'Debug >> num::',num
if len(num) < 4 :
print '***错误: 位数小于四位,请重新输入'
continue
elif len(num) > 4:
print '***错误: 位数大于四位,请重新输入'
continue
if not self.isAllNumber(num) :
print '***错误: 您输入了非数字字符,请重新输入'
continue
elif self.hasSameDigit(num) :
print '***错误: 您输入了含有相同数字的数字串,请重新输入'
continue
else :
self.usrNums.append(num)
return num
#判断是否都是数字
def isAllNumber(self, num) :
for ch in num :
if ch not in digits :
return False
return True

#判断是否有相同的字符
def hasSameDigit(self, num) :
for i in range(len(num)) :
pos = num[i + 1 :].find(num[i])
if pos >= 0 :
return True
return False
#判断
def numJudge(self, sysNum, usrNum) :
countA = 0
countB = 0
for i in range(4) :
if usrNum[i] in sysNum :
if i == sysNum.find(usrNum[i]) :
countA += 1
else :
countB += 1
result = '%dA%dB' % (countA, countB)
self.results.append(result)
if countA == 4 :
self.isWinner = True
#显示迄今为止所有猜测结果
def showResults(self, usrNums, results) :
print '-' * 20
for i in range(self.count) :
print '(%d)/t%s/t%s' % (i + 1, usrNums[i], results[i])
print '-' * 20
if self.isWinner :
print 'Total: %d times' % self.count
print 'Congratulations! Your are winner !!'
#运行
def run(self) :
self.printRules()
self.randNumGen()
while not self.isWinner:
num = self.numInput()
self.numJudge(self.sysNum, num)
self.showResults(self.usrNums, self.results)
#主函数
def main() :
guessNumber = GuessNumber()
guessNumber.run()
if __name__ == '__main__' :
main()

这里定义了一个 UDPServer 和 UDPClient。这里创建一个 TCP 服务程序,服务器会把客户发送过来的字符串加上一个时间戳,然后显示,并返回客户端。

UDPServer.py

#!/usr/bin/env python
from socket import *
from time import ctime
#创建一个UDP客户端
HOST = ''
PORT = 20001
BUFSIZE = 1024
ADDR = (HOST, PORT)
udpSerSock = socket(AF_INET, SOCK_DGRAM)
udpSerSockbind(ADDR)
while True:
print 'waiting for message...'
data, addr = udpSerSock.recvfrom(BUFSIZE)
udpSerSock.sendto('[%s] %s' % (ctime(), data), addr)
print'received from %s >> %s' % (addr, data)
udpSerSock.close()

UDPClient.py

#!/usr/bin/env python
from socket import *
HOST = 'localhost'
PORT = 20001
BUFSIZE = 1024
ADDR = (HOST, PORT)
udpClientSock = socket(AF_INET, SOCK_DGRAM)
while True:
data = raw_input('Enter the message you want to send >')
if not data:
break
udpClientSock.sendto(data, ADDR)
data, ADDR = udpClientSock.recvfrom(BUFSIZE)
if not data:
break
print data
udpClientSock.close()

Python 里面的 FTP 连接,主要依赖 ftplib 这个模块,具体请看帮助文档。

#!/usr/bin/env python
#FTP下载程序
import ftplib
import os
import socket
HOST = 'ftp.mozilla.org'
DIR = 'pub/mozilla.org/webtools'
FILE = 'bugzilla-LATEST.tar.gz'
def ftpDownload() :
try:
f = ftplib.FTP(HOST)
except (socket.error, socket.gaierror), e:
print 'ERROR: cannot connect "%s"' % HOST
return
print '>>Connect to host "%s"' % HOST
try:
f.login()
except ftplib.error_perm:
print 'ERROR: cannot login anonymously'
f.quit()
return
print '>>Logged in as "anonymous"'
try:
f.cwd(DIR)
except ftplib.error_perm:
print 'ERROR: cannot go to "%s"' % DIR
f.quit()
return
print '>>Go to "%s"' % DIR
try:
f.retrbinary('RETR %s' % FILE,
open(FILE, 'wb').write)
except ftplib.error_perm:
print 'ERROR: cannot read file "%s"' % FILE
os.unlink(FILE)
else:
print '>>Download "%s"' % FILE
f.quit()
return
def main() :
ftpDownload()
if __name__ == '__main__':
main()

所谓的网络爬虫就是利用程序抓取想要的网页或者数据。

下面对程序中所使用模块进行简单分析:

网络方面涉及 Python 的三个模块 htmllib,urllib,urlparse。
1)htmllib这个模块定义了一个可以担当在超文本标记语言(HTML)中解析文本格式文件的基类。该类不直接与 I/O 有关--它必须被提供字符串格式的输入,并且调用一个"格式设置"对象的方法来产生输出。该 HTMLParser 类被设计用来作为其他类增加功能性的基类,并且允许它的多数方法被扩展或者重载。该 HTMLParser 实现支持 HTML 2.0(描述在 RFC1866 中)语言。
2)urllib模块提供的上层接口,使我们可以像读取本地文件一样读取 www 和 ftp 上的数据。通过简单的函数调用,URL 所定位的资源就可以被你作为输入使用到你的程序中。如果再配以 re(正则表达式)模块,那么你就能够下载 Web 页面、提取信息、自动创建你所寻找的东西。urlretrieve 方法直接将远程数据下载到本地。参数 filename 指定了保存到本地的路径(如果未指定该参数,urllib 会生成一个临时文件来保存数据);
3)urlparse模块使我们能够轻松地把 URL 分解成元件(urlparse),之后,还能将这些元件重新组装成一个 URL(urljoin)。当我们处理 HTML 文档的时候,这项功能是非常方便的。

系统方面涉及 Python 的模块比较简单,主要是常用的sysos两个模块。主要用于文件夹的创建,文件路径的定位和判断等常用功能。这里不再介绍深入介绍。

输入输出方面使用了cStringIO模块。StringIO 的行为与 file 对象非常像,但它不是针对磁盘上文件,而是一个内存中的"文件",我们可以将操作磁盘文件那样来操作 StringIO。

formatter模块主要用于格式化输出。这里的格式化输出不仅仅是"格式化"输出而已。它可以将 HTML 解析器的标签和数据流转换为适合输出设备的事件流( event stream ),从 而 写将事件流相应的输出到设备上。

这个程序所使用的 Python 解释器版本为 2.6.5

下面是代码部分 NetCrawl.py

#!/usr/bin/env python
#网络爬虫程序
from sys import argv
from os import makedirs, unlink, sep
from os.path import dirname, exists, isdir, splitext
from string import replace, find, lower
from htmllib import HTMLParser
from urllib import urlretrieve
from urlparse import urlparse, urljoin
from formatter import DumbWriter, AbstractFormatter
from cStringIO import StringIO
class Downloader(object) :
#构造器
def __init__(self, url) :
self.url = url
self.file = self.filename(url)
#分析获取文件名
def filename(self, url, defFile = 'index.htm') :
parsedUrl = urlparse(url, 'http:', 0)
path = parsedUrl[1] + parsedUrl[2]
ext = splitext(path)
if ext[1] == '': #使用默认文件名
if path[-1] == '/':
path += defFile
else:
path += '/' + defFile
localDir = dirname(path)
if sep != '/':
localDir = replace(localDir, '/', sep)
if not isdir(localDir):
if exists(localDir): #文件存在删除
unlink(localDir)
makedirs(localDir)
return path
#下载页面
def download(self) :
try :
retval = urlretrieve(self.url, self.file)
except IOError:
retval = ('***ERROR: invalid URL "%s"' % self.url)
return retval
#分析保存链接
def parseAndGetLinks(self) :
self.parser = HTMLParser(AbstractFormatter( /
DumbWriter(StringIO())))
self.parser.feed(open(self.file).read())
self.parser.close()
return self.parser.anchorlist
class NetCrawler(object):
count = 0 #计数器
#构造器
def __init__(self, url) :
self.queue = [url]
self.seen = []
self.dom = urlparse(url)[1]
#获取页面
def getPage(self, url) :
dl = Downloader(url)
retval = dl.download()
if retval[0] == '*' :
print retval, '...skipping parse'
return
NetCrawler.count += 1
print '/n(', NetCrawler.count, ')'
print 'Url: ', url
print 'File: ', retval[0]
self.seen.append(url)
links = dl.parseAndGetLinks()
for eachLink in links :
if eachLink[ : 4] != 'http' and /
find(eachLink, '://') == -1:
eachLink = urljoin(url, eachLink)
print '*',eachLink
if find(lower(eachLink), 'mailto:') != -1:
print '... discarded, mailto link'
continue
if eachLink not in self.seen:
if find(eachLink, self.dom) == -1 :
print '... discarded, not in domain'
else :
if eachLink not in self.queue :
self.queue.append(eachLink)
print '... new, added to queue'
else :
print '... dirscarded, already in queue'
else :
print '... discarded, already processed'
#处理队列中的链接
def run(self) :
while self.queue :
url = self.queue.pop()
self.getPage(url)
#主函数
def main() :
#url = 'http://www.hao123.com/haoserver/index.htm'
if len(argv) > 1 :
url = argv[1]
else :
try :
url = raw_input('Enter starting URL: ')
expect (KeyboardInterrupt, EOFError) :
url = ''
if not url :
return
netCrawler = NetCrawler(url)
netCrawler.run()

if __name__ == '__main__' :
main()

这里的程序只是简单的抓取网页,如果要抓取指定网页可以加上也正则表达式(模块)来进行处理

这里主要使用了一个 random 随机模块中的 randint 和 choice。Python 的随机模块还是很强大的。

#!/usr/bin/env python
#随机数据生成
from random import randint,choice
from string import lowercase
from sys import maxint
from time import ctime
def randDataGenerate () :
doms = ('sina.com', '163.com', 'cctv.cn', 'yahoo.com.cn', 'csdb.net')

for i in range(randint(5, 20)) :
#随机日期生成
dateInt = randint(0, maxint - 1)
dateStr = ctime(dateInt)
#随机E-Mail地址生成
email = ''
for j in range(randint(4, 7)) :
email += choice(lowercase)
email += '@' + choice(doms)
#随机数生成
num = randint(0, 1000)
print '%s >> %s :: %d' % (dateStr, email, num)
def main() :
randDataGenerate()
if __name__ == '__main__' :
main()

这里自行定义了一个 MyTime 的类,继承于系统类 object。在 Python 里面默认的情况的下都要继承于这个类。类里面对init,straddiadd函数进行了重载。其实严格上讲不能叫重载,因为 Python 不支持重载,确切说应该叫覆盖。里面我还企图对init进行再次重载,显然不允许的,放在那边做个比较。

#!/usr/bin/env python
class MyTime(object) :
'MyTime - operate hours, minutes and seconds'
def __init__(self, h, m, s) :
'MyTime - Constructor'
self.hour = h;
self.min = m;
self.sec = s;

#def __init__(self, seconds) :
# 'MyTime - Constructor'
# self.hour = seconds / 3600
# seconds = seconds % 3600
# self.min = seconds /60
# self.sec = seconds % 60

def __str__(self) : #显示函数重载
'MyTime - string representation'
return '%d : %d : %d' % (self.hour, self.min, self.sec)
__repr__ = __str__
def __add__(self, time) : #加法重载
'MyTime - overloading the addition operator'
h = m = s = 0
t = self.sec + time.sec
s = t % 60
m += t /60
t = self.min + time.min + m
m = t % 60
h = t / 60
t = self.hour + time.hour + h
h = t % 24
return self.__class__(h, m, s)

def __iadd__(self, time) : #自增重载
'MyTime - overloading the in-place addition operator'
t = self.sec + time.sec
self.sec = t % 60
self.min += t / 60
t = self.min + time.min
self.min = t % 60
self.hour += t /60
t = self.hour + time.hour
self.hour = t % 24
return self

#测试部分
t1 = MyTime(12,34,57)
t2 = MyTime(8,6,56)
print 't1 >> ', t1
print 't2 >> ', t2
print 't1 + t2 >> ', t1 + t2
t1 += t2
print 't1 += t2 >> ', t1
#测试结果
#t1 >> 12 : 34 : 57
#t2 >> 8 : 6 : 56
#t1 + t2 >> 20 : 41 : 53
#t1 += t2 >> 20 : 41 : 53

这里代码很简单,实现了一个随机序列迭代器

#!/usr/bin/env python
#随机迭代器
from random import choice
class RandSeqIterator(object) :
def __init__(self, seq) :
self.data = seq
def __iter__(self) :
return self
def next(self) :
return choice(self.data)
#Program Test
for eachItem in RandSeqIterator(('AA', 'BB', 'CC', 'DD', 'End')) :
print eachItem
if eachItem == 'End' :
break

这里定义了一个 TCPServer 和 TCPClient。这里创建一个 TCP 服务程序,服务器会把客户发送过来的字符串加上一个时间戳,然后显示,并返回客户端。主要后面无论如何都要记得 close()关上连接,虽然基本上不会执行那一句。

TCPServer.py

#!/usr/bin/env python
#创建一个TCP服务程序,这个程序会把客户发送过来的字符串加上一个时间戳,然后显示,并返回客户端
from socket import *
from time import ctime
HOST = ''
PORT = 20000
BUFSIZE = 1024 #1KB
ADDR = (HOST, PORT)
tcpSerSock = socket(AF_INET, SOCK_STREAM)
tcpSerSock.bind(ADDR)
tcpSerSock.listen(5)
while True:
print 'waiting for connection...'
tcpClientSock,clientAddr = tcpSerSock.accept()
print '...connected from :', clientAddr
while True:
data = tcpClientSock.recv(BUFSIZE)
if not data:
break
print '[%s] %s' % (ctime(), data)
tcpClientSock.send('[%s] %s' % (ctime(), data))
tcpClientSock.close()
tcpSerSock.close()

TCPClient.py

#!/usr/bin/env python
#创建一个TCP客户端
from socket import *
HOST = 'localhost'
PORT = 20000
BUFSIZE = 1024
ADDR = (HOST, PORT)
tcpClientSock = socket(AF_INET, SOCK_STREAM)
tcpClientSock.connect(ADDR)
while True:
data = raw_input('Enter a string your want to send >')
if not data:
break
tcpClientSock.send(data)
data = tcpClientSock.recv(BUFSIZE)
if not data:
break
print data
tcpClientSock.close()