In [ ]:
# 列表推导式
[i for i in range(10) if i % 2 == 0]
In [ ]:
seq = ['one','two','three']
In [ ]:
i = iter('abc')
In [ ]:
def power(values):
for value in values:
print('powering %s' %(value))
yield value
def adder(values):
for value in values:
print('adding to %s' %(value))
if value % 2 == 0:
yield value + 3
else:
yield value + 2
elements = [1,4,7,9,12,19]
res = adder(power(elements))
In [ ]:
res.__next__()
In [ ]:
import datetime
utc = datetime.datetime.utcnow()
now = datetime.datetime.now()
'utc:{}, now:{}'.format(utc, now)
In [ ]:
import datetime
import pytz
def utcnow():
return datetime.datetime.now(tz=pytz.utc)
print(utcnow())
print(utcnow().isoformat())
print(utcnow().isoformat() < utcnow())
In [ ]:
import iso8601
iso8601.parse_date(utcnow().isoformat())
In [ ]:
iso8601.parse_date(utcnow().isoformat()) < utcnow()
In [ ]:
utcnow()
In [ ]:
"""
函数被注册并存储在一个字段里,以便后续可以根据函数名字提取函数
"""
_functions = {}
def register(f):
global _functions
_functions[f.__name__] = f
return f
@register
def foo():
return 'bar'
In [ ]:
"""
使用functools模块中update_wrapper函数解决:新函数缺少很多原函数的属性,
如文档字符串和名字
"""
def a_decoration(func): # 函数中返回函数
def wrap_func():
print('before func... doing something...')
func()
print('after func... doing something...')
return wrap_func
@a_decoration
def a_func():
print('this is func...')
a_func()
print(a_func.__name__) # 这个时候会暴露被封装的方法
In [ ]:
from functools import wraps
def a_decoration(func): # 函数中返回函数
@wraps(func)
def wrap_func():
print('before func... doing something...')
func()
print('after func... doing something...')
return wrap_func
@a_decoration
def a_func():
print('this is func...')
a_func()
print(a_func.__name__)
In [ ]:
"""
使用inspect.getcallargs,返回一个将参数名字和值作为键值对的字典
"""
from functools import wraps
import inspect
def a_decoration(func): # 函数中返回函数
@wraps(func)
def wrap_func(*args, **kwargs):
func_args = inspect.getcallargs(func, *args, **kwargs)
print(func_args)
if func_args.get('username') != 'admin':
raise Exception("This user is not allowed to get food")
return func(*args, **kwargs)
return wrap_func
@a_decoration
def a_func(username, type='chocolate'):
print(type + " nom nom nom!")
a_func('admin','cake')
In [ ]:
class Pizza(object):
def __init__(self, size):
self.size = size
def get_size(self):
return self.size
# 可以向方法传入该类的任意实例,还可以传入任何对象
# 只要它包含方法期望的属性
# 每次调用类的一个方法都要对该类进行引用
# 所以python通过将类的方法绑定给实例为我们完成后续工作
# 换句话说,可以通过任何pizza访问get_size方法,
# 进一步说,python会自动将对象本身传给方法的self参数
one = Pizza(100)
Pizza.get_size(one)
m = one.get_size
# 一旦有了对绑定方法的引用则无需保持对Pizza对象的引用
# 如果有了对方法的引用,但是想知道它被绑定到了哪个对象
# 可以查看方法的__self__属性
print(m.__self__)
print(m == m.__self__.get_size)
In [ ]:
# 使用abc实现抽象方法
"""
为了解决Python2&3的兼容问题,需要引入six模块,
该模块中有一个针对类的装饰器 @six.add_metaclass(MetaClass)
可以为两个版本的Python类方便地添加metaclass
"""
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Pizza(object):
@abc.abstractmethod
def get_radius(self):
"""Method that should do something."""
pass
Pizza()
In [ ]:
"""
通过抽象方法使用super()
在抽象方法中会有实现代码
在Pizza中get_ingredients为累方法,并不会强迫其子类也将其定义为类方法
将其定义为静态方法也是一样,没有办法强迫子类将抽象方法实现为某种特定类型的方法
"""
import six
class Pizza(object):
default_ingredients = ['cheese']
@classmethod
@abc.abstractmethod
def get_ingredients(cls):
"""Method that should do something."""
return cls.default_ingredients
class DietPizza(Pizza):
def get_ingredients(self):
return super(DietPizza, self).get_ingredients()
dp = DietPizza().get_ingredients()
dp
In [ ]:
def parent():
return object
class A(parent()):
pass
A.mro() # 返回方法解析顺序用于解析属性
In [ ]:
"""
super()函数实际上是一个构造器
每次调用都会实例化一个super对象
接受一个或两个参数,第一个参数是一个类,第二个参数是子类或第一个参数的一个实例
构造器返回的对象就像是第一个参数的父类的一个代理,它有自己的__getattribute__
方法去遍历MRO列表汇中的类并返回第一个满足条件的属性
"""
class A(object):
bar = 42
def foo(self):
print("func foo...")
pass
class B(object):
bar = 0
class C(A, B):
xyz = 'abc'
C.mro()
In [ ]:
super(C, C()).bar
In [ ]:
super(C, C()).foo
In [ ]:
super(B).__self__
In [ ]:
super(B,B()).__self__
In [ ]:
super(C) # 对象未绑定,所有不能通过它访问类属性
In [ ]:
"""
super类通过某种方式实现了描述符协议(也就是__get__),
这种方式能够让未绑定的super对象像类属性一样有用
"""
class D(C):
sup = super(C)
D().sup
In [ ]:
D().sup.foo
In [ ]:
D().sup.bar
In [ ]:
super(C, C())
In [ ]:
class B(A):
def foo(self):
super().foo()
one_b = B()
one_b.foo()
In [ ]:
class ObjectCreator(object):
pass
my_object = ObjectCreator()
print(my_object)
In [ ]:
my_object.x = 10
In [ ]:
def func():
print("func...")
In [ ]:
print(hasattr(my_object, 'x'))
In [ ]:
print(type(ObjectCreator))
In [ ]:
print(type(ObjectCreator()))
In [ ]:
class Test:
pass
Test()
In [ ]:
Test2 = type("Test2",(),{}) # 定了一个Test2类
Test2() # 创建了一个Test2类的实例对象
In [ ]:
Foo = type('Foo', (), {'bar':True})
In [ ]:
class Foo(object):
bar = True
In [ ]:
Foo = type('Foo',(),{'bar':True})
def echo_bar(self):
print(">>:bar:", self.bar)
In [ ]:
FooChild = type('FooChild',(Foo,),{'echo_bar':echo_bar})
hasattr(Foo, 'echo_bar')
In [ ]:
hasattr(FooChild, 'echo_bar')
In [ ]:
my_foo = FooChild()
if hasattr(FooChild, 'echo_bar'):
my_foo.echo_bar()
In [ ]:
# 添加静态方法
@staticmethod
def testStatic():
print('static method...')
FooChild = type('FooChild',(Foo,),
{'echo_bar':echo_bar,'testStatic':testStatic})
In [ ]:
foochild = FooChild()
foochild.testStatic()
In [ ]:
foochild.testStatic
In [ ]:
foochild.echo_bar()
In [ ]:
# 添加类方法
@classmethod
def testClass(cls):
print(cls.bar)
Foochild = type('FooChild', (Foo,),
{'echo_bar':echo_bar,
'testStatic':testStatic,
'testClass':testClass})
In [ ]:
foochild = Foochild()
foochild.testClass()
In [ ]:
foochild.__class__.__class__
In [ ]:
def upper_attr(future_class_name, future_class_parents,future_class_attr):
# 遍历属性字典,把不是__开头的属性名字变为大写
newAttr = {}
for name,value in future_class_attr.items():
if not name.startswith("__"):
newAttr[name.upper()] = value
# 调用type来创建一个类
return type(future_class_name, future_class_parents, newAttr)
In [ ]:
# python3
class Foo(object, metaclass=upper_attr):
bar = 'bip'
# # python2
# class Foo(object):
# __metaclass__ = upper_attr
# bar = 'bip'
In [ ]:
print(hasattr(Foo, 'BAR'))
In [ ]:
f = Foo()
print(f.BAR)
In [ ]:
# 方法一:创建一个生成器,直接使用()
G = (x*2 for x in range(5))
G
In [ ]:
for x in G:
print(x)
In [ ]:
# 方法二: 使用yield
def fib(times):
n = 0
a, b = 0, 1
while n < times:
yield b
a, b = b, a+b
n+=1
In [ ]:
F = fib(5)
In [ ]:
for n in fib(5):
print(n)
In [ ]:
g = fib(5)
while True:
try:
x = next(g)
print('value: %d' %x)
except StopIteration as e:
print("生成器返回值:%s"%e.value)
break
In [ ]:
def gen():
i = 0
while i<5:
# 使用yield时,gen函数作用暂时卜村,返回i的值;
# temp接受下次c.send('python'),
# send发送过来的值,c.next()等价c.send(None)
temp = yield i
print(temp)
i+=1
f = gen()
In [ ]:
next(f) # 使用next函数
In [ ]:
f.__next__() # 使用__next__()方法
In [ ]:
f.send('haha')
In [ ]:
from collections import Iterator
isinstance(f, Iterator)
In [ ]:
from collections import Iterable
isinstance(f, Iterable)
In [ ]:
# 修饰器修饰类
class Test():
def __call__(self):
print('call me')
In [ ]:
# 类装饰器
"""
1.当用Test来装作装饰器对test函数进行装饰的时候,首先会创建Test的实例对象
并且会把test这个函数名当作参数传递到__init__方法中
即在__init__方法中的func变量指向了test函数体
2.test函数相当于指向了用Test创建出来的实例对象
3.当在使用test() 进行调用时,就相当于让这个对象(),因此会调用这个对象的__call__方法
4.为了能够在__call__ 方法中调用原来的test指向的函数体,所以在__init__ 方法中赋值给
self.__func, 才有了self.__func = func, 从而在调用__call__ 方法中能够调用test
"""
class Test(object):
def __init__(self, func):
print('>>:初始化')
print('func name is %s' %func.__name__)
self.__func = func
def __call__(self):
print(">>:装饰器中的功能")
self.__func()
@Test
def test():
print('>>:test')
In [ ]:
test()
In [ ]:
# import 导入模块
import sys
sys.path
In [ ]:
sys.path.append('/')
sys.path.insert(0, '/Downloads') # 可以确保先搜索这个路径
sys.path.append('/Users/jasonlu/Desktop/Dev/GitHub/codehub/python')
In [ ]:
sys.path
In [ ]:
from imp import *
import reload_test
reload_test.test()
In [ ]:
reload_test.test()
In [ ]:
reload(reload_test)
In [ ]:
reload_test.test()
In [ ]:
import time
for i in range(1,101):
print("\r%.2f%%"%i, end='')
time.sleep(0.01)
print("")
In [ ]:
import copy
a = [11,22,33]
b = copy.copy(a)
In [ ]:
id(a)
In [ ]:
id(b)
In [ ]:
a.append(44)
In [ ]:
a
In [ ]:
b
In [ ]:
a2 = (11,22,33)
b2 = copy.copy(a2)
In [ ]:
id(a2)
In [ ]:
id(b2)
In [ ]:
D = dict(name="zhangsan", age=27)
Co = D.copy()
In [ ]:
id(D)
In [ ]:
id(Co)
In [ ]:
D['gender'] = 'man'
In [ ]:
D
In [ ]:
Co
In [ ]:
# 使用property升级getter和setter方法
class Money(object):
def __init__(self):
self.__money = 0
def get_money(self):
return self.__money
def set_money(self, value):
if isinstance(value, int):
self.__money = value
else:
print("error:不是整形数字")
money = property(get_money, set_money)
In [ ]:
a = Money()
In [ ]:
a.money
In [ ]:
a.money = 100
In [ ]:
a.money
In [ ]:
a.get_money()
In [ ]:
# 使用property取代getter和setter方法
class Money(object):
def __init__(self):
self.__money = 0
@property
def money(self):
return self.__money
@money.setter
def money(self, value):
if isinstance(value, int):
self.__money = value
else:
print("error:不是整型数字")
In [ ]:
a = Money()
In [ ]:
a.money
In [ ]:
a.money = 100
In [ ]:
a.money
In [ ]:
int_a = -6
id(int_a)
In [ ]:
int_b = -6
id(int_b)
In [ ]:
a = "hello world"
In [ ]:
b = "hello world"
In [ ]:
id(a)
In [ ]:
id(b)
In [ ]:
c = "helloworld"
In [ ]:
d = "helloworld"
In [ ]:
id(c)
In [ ]:
id(d)
In [ ]:
import sys
str_a = 'helloworld'
sys.getrefcount(str_a)
In [ ]:
import gc
class ClassA():
pass
# def __del__(self):
# print('object born,id:%s'%str(hex(id(self))))
gc.set_debug(gc.DEBUG_LEAK)
a = ClassA()
b = ClassA()
a.next = b
b.prev = a
In [ ]:
gc.collect()
In [ ]:
del a
In [ ]:
del b
In [ ]:
gc.collect()
In [ ]:
class Person:
pass
dir(Person)
In [ ]:
import socket
hostname = 'www.baidu.com'
addr = socket.gethostbyname(hostname)
print('The IP address of {} is {}'.format(hostname, addr))
In [ ]:
import socket
hostname = 'www.python.org'
addr = socket.gethostbyname(hostname)
print('The IP address of {} is {}'.format(hostname, addr))
In [ ]:
gc.disable()
class Person(object):
def __getattribute__(self, obj):
print('---test---')
if obj.startswith('a'):
return 'haha'
else:
return self.test
def test(self):
print('heihei')
In [ ]:
ret = filter(None, "She")
ret.__next__()
In [ ]:
ret = filter(lambda x: x%2, [1,2,3,4])
In [ ]:
while True:
try:
print(ret.__next__())
except StopIteration as e:
print(">>::%s"%e)
break
In [ ]:
help(filter)
In [ ]:
from functools import reduce
help(reduce)
In [ ]:
help(sorted)
In [ ]:
# python3.5
import functools
dir(functools)
help(functools.partial)
In [ ]:
gc.disable()
import functools
def showarg(*args, **kw):
print(args)
print(kw)
p1 = functools.partial(showarg)
p1()
p1(4,5,6)
p1(a='python',b='itcast')
In [ ]:
p2 = functools.partial(showarg, a=3, b='linux')
In [ ]:
p2(1,2)
In [ ]:
p2(a='python', b='itcast')
In [ ]:
import hashlib
In [ ]:
import hashlib
import datetime
KEY_VALUE = 'Itcast'
now = datetime.datetime.now()
m = hashlib.md5()
str = '%s%s'%(KEY_VALUE, now.strftime("%Y%m%d"))
m.update(str.encode('utf-8'))
value = m.hexdigest()
print(value)
In [ ]:
# Debugging
import pdb
def make_bread():
pdb.set_trace()
return "I don't have time"
print(make_bread())
In [ ]:
from time import sleep
def sing():
for i in range(3):
print("正在唱歌...%d"%i)
sleep(1)
def dance():
for i in range(3):
print('正在跳舞...%d'%i)
sleep(1)
sing()
dance()
In [ ]:
# 程序执行到os.fork()时,操作系统会创建一个新的进程(子进程),
# 然后复制父进程的所有信息到子进程中
# 然后父进程和子进程都会从fork()函数中得到了一个返回值,
# 在子进程中这个值一定是0,而父进程中是子进程的id号
import os
pid = os.fork()
if pid == 0:
print('哈哈1')
else:
print('哈哈2')
In [ ]:
import os
x = 0
rpid = os.fork()
if rpid < 0:
print('fork调用失败...')
elif rpid == 0:
print('我是子进程(%s), 我的父进程是(%s)'%(os.getpid(), os.getppid()))
x += 1
else:
print('我是父进程(%s), 我的子进程是(%s)'%(os.getpid(), rpid))
print('父子进程都可以执行这里的代码')
In [ ]:
# 多进程修改全局变量
import os
import time
num = 0
pid = os.fork()
if pid == 0:
num += 1
print('哈哈1---num=%d'%num)
else:
time.sleep(1)
num += 1
print('哈哈2---num=%d'%num)
In [ ]:
import os
import time
pid = os.fork()
if pid == 0:
print('哈哈1')
else:
print('哈哈2')
pid = os.fork()
if pid == 0:
print('哈哈3')
else:
print('哈哈4')
time.sleep(1)
In [ ]:
from multiprocessing import Process
import os
# 子进程要执行的代码
def run_proc(name):
print('子进程运行中,name=%s, pid=%d...'%(name, os.getpid()))
print('父进程 %d.'%os.getpid())
p = Process(target=run_proc, args=('test',))
print('子进程将要执行')
p.start()
p.join()
print('子进程已结束')
In [ ]:
from multiprocessing import Process
import os
from time import sleep
def run_proc(name, age, **kwargs):
for i in range(10):
print('子进程运行中, name=%s, age=%d, pid=%d...'%(name, age, os.getpid()))
print(kwargs)
sleep(0.5)
print('父进程 %d.' %(os.getpid()))
p = Process(target=run_proc, args=('test', 18), kwargs={'m': 20})
print('子进程将要执行')
p.start()
sleep(1)
p.terminate()
p.join()
print('子进程已结束')
In [ ]:
# 进程的创建-Process子类
from multiprocessing import Process
import time
import os
def run_proc(name, age, **kwargs):
for i in range(200000000):pass
for i in range(3):
print('子进程运行中, name=%s, age=%d, pid=%d...'%(name, age, os.getpid()))
print(kwargs)
class Process_Class(Process):
def __init__(self, p):
Process.__init__(self)
self.p = p
# 重写了Process类的run()方法
def run(self):
print('子进程(%s) 开始执行,父进程(%s)'%(os.getpid(), os.getppid()))
t_start = time.time()
self.p.start()
self.p.join()
# for i in range(100000000):pass
# time.sleep(self.interval)
t_stop = time.time()
print('(%s)执行结束,耗时%0.2f秒'%(os.getpid(), t_stop-t_start))
t_start = time.time()
print('当前程序进程(%s)'%(os.getpid()))
p0 = Process(target=run_proc,args=('test', 18), kwargs={'m': 20})
p0.start()
p0.join()
# p1 = Process_Class(p0)
# p1.start()
# p1.join()
t_stop = time.time()
print("(%s)执行结束,耗时%0.2f"%(os.getpid(), t_stop-t_start))
In [13]:
# 进程池Pool
import threading
from multiprocessing import Pool
import os
import time
import random
g_num = 0
t_lock = threading.Lock()
def worker(msg):
global g_num
# g_num += 1
# print('---thread=%d, num=%d---' %(id(g_num), g_num))
if t_lock.acquire():
g_num += 1
print('---thread=%d, num=%d---' %(id(g_num), g_num))
t_lock.release()
# t_start = time.time()
# print("%s开始执行, 进程号为%d"%(msg, os.getpid()))
# print('---num=%d---' %num)
# time.sleep(random.random()*2)
# t_stop = time.time()
# print(msg, '执行完毕, 耗时%0.2f'%(t_stop-t_start))
po = Pool(3)
for i in range(0, 10):
po.apply_async(worker, (i,)) # 非阻塞方式
# po.apply(worker, (i,)) 阻塞方式
print("---start---")
po.close()
po.join()
print("---end---")
print('end num=%d, id=%d' %(g_num, id(g_num)))
In [3]:
from flask.ext.sqlalchemy import SQLAlchemy
db = SQLAlchemy()
In [ ]:
tasks = [
{
'id': 1,
'title': u'Buy groceries',
'description': u'Milk, Cheese, Pizza, Fruit, Tylenol',
'done': False
},
{
'id': 2,
'title': u'Learn Python',
'description': u'Need to find a good Python tutorial on the web',
'done': False
}
]
task_id = 3
task = filter(lambda t: t['id'] == task_id, tasks)
try:
info = task.__next__()
except StopIteration as e:
print('StopIteration ...')
In [ ]:
# Queue的使用,实现多进程之间的数据传递
from multiprocessing import Queue
q = Queue(3)
q.put("消息1")
q.put("消息2")
print(q.full())
q.put('消息3')
# 推荐方式
if not q.full():
q.put_nowait("消息4")
while True:
if not q.empty():
print(q.get_nowait())
else:
print("queue is empty....")
break
# if not q.empty():
# for i in range(q.qsize()):
# print(q.get_nowait())
In [ ]:
# Queue实例
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码
def write(q, v):
print(v)
for value in ['A','B','C']:
print('Put %s to queue'%value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码
def read(q):
while True:
if not q.empty():
value = q.get(True)
print("Get %s from queue."%value)
time.sleep(random.random())
else:
break
# 父进程创建Queue, 并传给各个子进程
q = Queue()
pw = Process(target=write, args=(q,1,))
pr = Process(target=read, args=(q,))
pw.start()
pw.join()
pr.start()
pr.join()
print('所有数据都写入并且读完...')
In [ ]:
# 进程池中的Queue
# 进程池中的进程如何通信
# 修改Queue为Manager
from multiprocessing import Manager, Pool
import os, time, random
def reader(q):
print('reader启动(%s), 父进程为(%s)'%(os.getpid(), os.getppid()))
while True:
if not q.empty():
print('reader从Queue获取到消息:%s'%(q.get(True)))
else:
break
def writer(q):
print('write启动(%s), 父进程为(%s)'%(os.getpid(), os.getppid()))
for i in 'dongGe':
q.put(i)
print('(%s) start'%(os.getpid()))
q = Manager().Queue() # 使用Manager中的Queue来初始化
po = Pool()
# 使用阻塞模式创建进程,这样就不需要在reader中使用死循环了,可以让writer写完
po.apply(writer, (q,))
po.apply(reader, (q,))
po.close()
po.join()
print('(%s) 线程结束...'%(os.getpid()))
In [ ]:
# 多线程-threading
import threading
import time
def say_sorry():
print('我错了,我能吃饭了吗?')
time.sleep(1)
for i in range(5):
t = threading.Thread(target=say_sorry)
t.start() # 启动线程,即让线程开始执行
In [ ]:
# 主线程会等待所有的子线程结束后才结束
import threading
from time import sleep, ctime
def sing():
for i in range(3):
print('正在唱歌...%d'%i)
sleep(1)
def dance():
for i in range(3):
print('正在跳舞...%d'%i)
sleep(1)
print('---开始---:%s'%ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
# 查看进程数量
# while True:
# length = len(threading.enumerate())
# print('当前运行的线程数为: %d'%length)
# if length <= 1:
# break
# sleep(0.5)
# sleep(5)
print('---结束---:%s'%ctime())
In [ ]:
# 线程执行代码的封装
import threading
import time
class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I'm " + self.name + ' @' + str(i)
print(msg)
t = MyThread()
t.start()
# 覆写run方法,通过start方法启动线程,然后由python虚拟机进行调度
In [14]:
# 线程的执行顺序
import threading
import time, os
class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I'm " + self.name + ' @' + str(i)
print(msg)
def test():
for i in range(5):
t = MyThread()
t.start()
test()
# 多线程程序的执行顺序是不确定的,
# 当执行到sleep语句时,线程将被阻塞(blocked)
# 到sleep结束后,线程进入就绪(Runnable)状态,等待调度
# 线程调度将自行选择一个线程执行
# 上面代码中只能保证每个线程都运行完整个run函数,但是线程的启动顺序,
# run函数中每次循环的执行顺序都不能确定
In [21]:
from threading import Thread
import time
g_num = 100
def work1():
global g_num
for i in range(3):
g_num += 1
print("---in work1, g_num is %d---" % g_num)
def work2():
global g_num
for i in range(3):
g_num += 1
print("---in work2, g_num is %d---" % g_num)
print('---线程创建之前g_num is %d---' % g_num)
t1 = Thread(target=work1)
t1.start()
# time.sleep(1)
t2 = Thread(target=work2)
t2.start()
In [19]:
# 列表当作实参传递到线程中
from threading import Thread
import time
def work1(nums):
nums.append(44)
print('---in work1---', nums)
def work2(nums):
time.sleep(1)
print('---in work2---', nums)
g_nums = [11,22,33]
t1 = Thread(target=work1, args=(g_nums,))
t1.start()
t2 = Thread(target=work2, args=(g_nums,))
t2.start()
In [ ]:
# 非全局变量是否要加锁
import threading
import time
class MyThread(threading.Thread):
def __init__(self, num, sleepTime):
threading.Thread.__init__(self)
self.num = num
self.sleepTime = sleepTime
def run(self):
self.num += 1
time.sleep(self.sleepTime)
print("线程(%s), num=%d" % (self.name, self.num))
mutex = threading.Lock()
t1 = MyThread(100, 5)
t1.start()
t2 = MyThread(200, 1)
t2.start()
t3 = MyThread(100, 5)
t3.start()
In [ ]:
import threading
from time import sleep
def test(sleepTime):
num = 1
sleep(sleepTime)
num += 1
print('---(%s)--num=%d' % (threading.current_thread(), num))
t1 = threading.Thread(target=test, args=(5,))
t2 = threading.Thread(target=test, args=(1,))
t1.start()
t2.start()
In [ ]:
# 全局变量加锁
import threading
import time
# 全局变量
num = 0
def task(num):
print("线程==%d" % (num))
class MyThread(threading.Thread):
def __init__(self, num, sleepTime):
threading.Thread.__init__(self)
self.sleepTime = sleepTime
def __init(self, target):
threading.Thread.__init__(self, target=target)
def run(self):
if mutex.acquire():
global num
num += 1
time.sleep(self.sleepTime)
print("线程(%s), num=%d" % (self.name, num))
mutex.release()
mutex = threading.Lock()
t1 = MyThread(100, 5)
t1.start()
t2 = MyThread(200, 1)
t2.start()
t3 = MyThread(100, 5)
t3.start()
t4 = MyThread(target=task, args=(num,))
t4.start()
In [ ]:
import threading
import time
from queue import Queue
class Producer(threading.Thread):
def run(self):
global queue
count = 0
while True:
if queue.qsize() < 1000:
for i in range(100):
count = count + 1
msg = '生产产品' + str(count)
queue.put(msg)
print(msg)
time.sleep(0.5)
class Consumer(threading.Thread):
def run(self):
global queue
while True:
if queue.qsize() > 100:
for i in range(3):
msg = self.name + '消费了' + queue.get()
print(msg)
time.sleep(0.5)
queue = Queue()
for i in range(500):
queue.put('初始产品' + str(i))
for i in range(2):
p = Producer()
p.start()
for i in range(3):
c = Consumer()
c.start()
In [33]:
# ThreadLocal
import threading
local_school = threading.local()
def process_student():
std = local_school.student
local_school.num += 1
print("hello, %s (in %s) num=%d" % (std, threading.current_thread(), local_school.num))
def process_thread(name):
local_school.student = name
local_school.num = random.randrange(5, 10)
print('old num=%d' %(local_school.num))
process_student()
t1 = threading.Thread(target=process_thread, args=('dongGe',))
t2 = threading.Thread(target=process_thread, args=('老王',))
t1.start()
t2.start()
t1.join()
t2.join()
print(t1.__getattribute__())
In [4]:
# 异步
from multiprocessing import Pool
import time
import os
def test():
print('---进程池中的进程---pid=%d, ppid=%d' % (os.getpid(), os.getppid()))
for i in range(3):
print('---%d---' %i)
time.sleep(1)
return 'hahaha'
def test2(args):
print('---callback func---pid=%d' % os.getpid())
print('---callback func---args=%s' % args)
pool = Pool(3)
pool.apply_async(func=test, callback=test2)
time.sleep(3)
print('---主进程---pid=%d---' % os.getpid())
In [81]:
import threading
import time
from threading import Thread, Lock
# 工厂头信息
class HeaderFactory():
def __init__(self):
self.list_user_agent = [
# For Android
"Mozilla/5.0 (Linux; Android 4.1.1; Nexus 7 Build/JRO03D) \
AppleWebKit/535.19 (KHTML, like Gecko) \
Chrome/18.0.1025.166 Safari/535.19",
"Mozilla/5.0 (Linux; U; Android 4.0.4; en-gb; GT-I9300 Build/IMM76D) \
AppleWebKit/534.30 (KHTML, like Gecko) \
Version/4.0 Mobile Safari/534.30",
"Mozilla/5.0 (Linux; U; Android 2.2; en-gb; GT-P1000 Build/FROYO) \
AppleWebKit/533.1 (KHTML, like Gecko) \
Version/4.0 Mobile Safari/533.1",
# For Firefox
"Mozilla/5.0 (Windows NT 6.2; WOW64; rv:21.0) \
Gecko/20100101 Firefox/21.0",
"Mozilla/5.0 (Android; Mobile; rv:14.0) \
Gecko/14.0 Firefox/14.0",
# For chrome
"Mozilla/5.0 (Windows NT 6.2; WOW64) \
AppleWebKit/537.36 (KHTML, like Gecko) \
Chrome/27.0.1453.94 Safari/537.36",
"Mozilla/5.0 (Linux; Android 4.0.4; Galaxy Nexus Build/IMM76B) \
AppleWebKit/535.19 (KHTML, like Gecko) \
Chrome/18.0.1025.133 Mobile Safari/535.19",
# For iOS
"Mozilla/5.0 (iPad; CPU OS 5_0 like Mac OS X) \
AppleWebKit/534.46 (KHTML, like Gecko) \
Version/5.1 Mobile/9A334 Safari/7534.48.3",
"Mozilla/5.0 (iPod; U; CPU like Mac OS X; en) \
AppleWebKit/420.1 (KHTML, like Gecko) \
Version/3.0 Mobile/3A101a Safari/419.3",
"Mozilla/5.0 (iPhone; CPU iPhone OS 10_3 like Mac OS X) \
AppleWebKit/602.1.50 (KHTML, like Gecko) \
CriOS/56.0.2924.75 Mobile/14E5239e Safari/602.1"
]
pass
def get_random_user_agent(self):
import random
int_random = random.randint(0,len(self.list_user_agent)-1)
return self.list_user_agent[int_random]
"""
利用property装饰器,让方法能以属性方式的调用
"""
@property
def header_info(self):
header_default = [
("Accept", "text/html,application/xhtml+xml,\
application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8"),
("Accept-Language", "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7"),
("Connection", "keep-alive"),
("referer", ""),
("Accept-Encoding", "utf-8")
]
header_default.append(('User-Agent', self.get_random_user_agent()))
return header_default
class ManagerGrapIpHtml(object):
# 单利模式的锁
_instance_lock = Lock()
def __init__(cls, *args, **kwargs):
if not hasattr(ManagerGrapIpHtml, '_instance'):
with ManagerGrapIpHtml._instance_lock:
if not hasattr(ManagerGrapIpHtml, '_instance'):
ManagerGrapIpHtml._instance = object.__new__(cls)
return ManagerGrapIpHtml._instance
@classmethod
def grab_ip_html(cls, page):
"""
抓去IP网页内容
"""
import urllib.request
url = "http://www.xicidaili.com/wn/"
headers = HeaderFactory().header_info
opener = urllib.request.build_opener()
opener.addheaders = headers
cur_page = page
url_html_ip = url + str(cur_page)
print(">>:开始抓取url:%s" % (url_html_ip))
str_html_ip = 'ip_{}.html'.format(cur_page)
data = opener.open(url_html_ip).read()
fhandle = open(str_html_ip , 'wb')
fhandle.write(data)
fhandle.close()
print('>>:finished save ip html....ip_{}.html'.format(cur_page))
return str_html_ip
# page_num = end_page - start_page + 1
# print(page_num)
# for i in range(page_num):
# cur_page = i + start_page
# url_html_ip = url + str(cur_page)
# str_html_ip = 'ip_{}.html'.format(cur_page)
# data = opener.open(url_html_ip).read()
# fhandle = open(str_html_ip , 'wb')
# fhandle.write(data)
# fhandle.close()
# print('>>:finished save ip html....ip_{}.html'.format(cur_page))
# list_ip_html.append(str_html_ip)
# return list_ip_html
url = "http://www.xicidaili.com/wn/"
for i in range(5):
t = ManagerGrapIpHtml.grab_ip_html(page=i+10)
print(t)
# class GrabIpHtmlThread(threading.Thread):
# def __init__(self, url):
# threading.Thread.__init__(self)
# self.url = url
# def run(self):
# print(">>:%s do task" %(self.name))
# print(self.url)
# def test():
# for i in range(5):
# t = GrabIpHtmlThread('http://%d' % i)
# t1.start()
# test()
In [96]:
url = "http://www.xicidaili.com/wn/1"
import re
ret = re.split('/',url)
print(type(ret))
print(ret[-1])
In [9]:
t = [1,2,3,4]
for (i, str_html) in t:
print(" " , str_html)
In [1]:
import sys
sys.path.append('/Users/jasonlu/Desktop/Dev/GitHub/codehub/python'
'/线程与进程/Processes/socket_preview')
In [5]:
import server
In [ ]: