Python

requests cookies 为空的一个坑

有时候,requests 返回的 cookies 会为空,原因是链接发生了 301/302 跳转,而 cookies 是跟着第一个响应返回的,第二个响应没有返回 Set-Cookie header。所以直接读取 r.cookies 是空的,而在 session.cookies 中是有数据的。

解决方法是直接读 s.cookies。

“`
s = requests.Session()
r = s.get(‘http://httpbin.org/cookies/set?foo=bar’)
cookies = requests.utils.dict_from_cookiejar(s.cookies)
s.cookies.clear()
“`

不过需要注意的是如果在多线程环境中使用 session 需要注意锁的问题,建议把 session 设置成 thread local 的类型。

Python 中的 GC(垃圾回收)

# 引用计数(reference counting)

CPython 中默认使用的垃圾回收算法是 Reference Counting。也就是对每个元素标记有多少个其他元素引用了它,当引用数降到零的时候就删除。

1. 当对象增加一个引用,比如赋值给变量,属性或者传入一个方法,引用计数执行加1运算。
2. 当对象减少一个引用,比如变量离开作用域,属性被赋值为另一个对象引用,属性所在的对象被回收或者之前传入参数的方法返回,引用计数执行减1操作。
3. 当引用计数变为0,代表该对象不被引用,可以标记成垃圾进行回收。

为了解决循环引用的问题,CPython 使用了 Cyclic GC,遍历所有的环,并且把每一个元素的引用减一,来检测每一个引用环是不是循环应用。

![](https://ws2.sinaimg.cn/large/006tKfTcly1ftg5mu2087j30we0i6gv5.jpg)

# 标记删除(Mark and Sweep)

1. 从某一个已知的还活着的对象开始,便利对象,如果经过了某个对象就认为是活着的
2. 如果没有被标记的就删除

避免了循环引用的问题

![](https://ws1.sinaimg.cn/large/006tKfTcly1ftf9x2kejlj30wo0ic11s.jpg)

实际的处理过程

![](https://ws4.sinaimg.cn/large/006tKfTcly1ftfa55k2e7j30wi0ick3k.jpg)

![](https://ws3.sinaimg.cn/large/006tKfTcly1ftfa6amfmmj30wc0iedr8.jpg)

Pluggable
Generational
Incremental

参考资料:

1. https://www.youtube.com/watch?v=iHVs_HkjdmI
2. https://droidyue.com/blog/2015/06/05/how-garbage-collector-handles-circular-references/
3. https://www.cnblogs.com/Xjng/p/5128269.html
4. https://foofish.net/python-gc.html

在 Python 中优雅地处理 SIGTERM 信号

昨天写了一个服务,在本地运行很好,使用 Ctrl-C 结束运行之后会清理资源,然后取消注册,然而放到 Docker 中跑之后发现结束之后资源没有释放。查了查发现原来是下面是几个因素造成的:

1. Ctrl-C 发送的是 SIGINT 信号,Python 会转化成 KeyboardInterrupt 异常,而我的资源是在 finally 释放资源,所以使用 Ctrl-C 可以优雅地退出
2. Python 中对其他的信号(比如 SIGTERM、SIGHUP)都不会处理,而是直接退出
3. Docker 在推出的时候默认发送的是 SIGTERM 信号

所以在 docker stop 的时候服务并不能优雅的推出。

# 解决方法

使用 atexit 模块是不可以的,atexit 不会处理 SIGTERM。需要使用 signal 模块来,在网上找到了一份源码。这个代码注册了一个 SIGTERM 的 handler,把 SIGTERM 转换成正常的 `sys.exit` 调用,当运行 `sys.exit` 的时候会运行 finally 子句中的语句。

“`
# Author: Giampaolo Rodola’
# License: MIT

from __future__ import with_statement
import contextlib
import signal
import sys

def _sigterm_handler(signum, frame):
sys.exit(0)
_sigterm_handler.__enter_ctx__ = False

@contextlib.contextmanager
def handle_exit(callback=None, append=False):
“””A context manager which properly handles SIGTERM and SIGINT
(KeyboardInterrupt) signals, registering a function which is
guaranteed to be called after signals are received.
Also, it makes sure to execute previously registered signal
handlers as well (if any).

>>> app = App()
>>> with handle_exit(app.stop):
… app.start()

>>>

If append == False raise RuntimeError if there’s already a handler
registered for SIGTERM, otherwise both new and old handlers are
executed in this order.
“””
old_handler = signal.signal(signal.SIGTERM, _sigterm_handler)
if (old_handler != signal.SIG_DFL) and (old_handler != _sigterm_handler):
if not append:
raise RuntimeError(“there is already a handler registered for ”
“SIGTERM: %r” % old_handler)

def handler(signum, frame):
try:
_sigterm_handler(signum, frame)
finally:
old_handler(signum, frame)
signal.signal(signal.SIGTERM, handler)

if _sigterm_handler.__enter_ctx__:
raise RuntimeError(“can’t use nested contexts”)
_sigterm_handler.__enter_ctx__ = True

try:
yield
except KeyboardInterrupt:
pass
except SystemExit, err:
# code != 0 refers to an application error (e.g. explicit
# sys.exit(‘some error’) call).
# We don’t want that to pass silently.
# Nevertheless, the ‘finally’ clause below will always
# be executed.
if err.code != 0:
raise
finally:
_sigterm_handler.__enter_ctx__ = False
if callback is not None:
callback()

if __name__ == ‘__main__’:
# ===============================================================
# — test suite
# ===============================================================

import unittest
import os

class TestOnExit(unittest.TestCase):

def setUp(self):
# reset signal handlers
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self.flag = None

def tearDown(self):
# make sure we exited the ctx manager
self.assertTrue(self.flag is not None)

def test_base(self):
with handle_exit():
pass
self.flag = True

def test_callback(self):
callback = []
with handle_exit(lambda: callback.append(None)):
pass
self.flag = True
self.assertEqual(callback, [None])

def test_kinterrupt(self):
with handle_exit():
raise KeyboardInterrupt
self.flag = True

def test_sigterm(self):
with handle_exit():
os.kill(os.getpid(), signal.SIGTERM)
self.flag = True

def test_sigint(self):
with handle_exit():
os.kill(os.getpid(), signal.SIGINT)
self.flag = True

def test_sigterm_old(self):
# make sure the old handler gets executed
queue = []
signal.signal(signal.SIGTERM, lambda s, f: queue.append(‘old’))
with handle_exit(lambda: queue.append(‘new’), append=True):
os.kill(os.getpid(), signal.SIGTERM)
self.flag = True
self.assertEqual(queue, [‘old’, ‘new’])

def test_sigint_old(self):
# make sure the old handler gets executed
queue = []
signal.signal(signal.SIGINT, lambda s, f: queue.append(‘old’))
with handle_exit(lambda: queue.append(‘new’), append=True):
os.kill(os.getpid(), signal.SIGINT)
self.flag = True
self.assertEqual(queue, [‘old’, ‘new’])

def test_no_append(self):
# make sure we can’t use the context manager if there’s
# already a handler registered for SIGTERM
signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0))
try:
with handle_exit(lambda: self.flag.append(None)):
pass
except RuntimeError:
pass
else:
self.fail(“exception not raised”)
finally:
self.flag = True

def test_nested_context(self):
self.flag = True
try:
with handle_exit():
with handle_exit():
pass
except RuntimeError:
pass
else:
self.fail(“exception not raised”)

unittest.main()
“`

参考:

1. docker 退出信号:https://www.ctl.io/developers/blog/post/gracefully-stopping-docker-containers/
2. finally 中的语句并不总会执行:https://stackoverflow.com/questions/49262379/does-finally-always-execute-in-python
3. Python 不处理 SIGTERM 信号 https://stackoverflow.com/questions/9930576/python-what-is-the-default-handling-of-sigterm
4. sys.exit 也会运行 finally 中的语句 https://stackoverflow.com/questions/7709411/why-finally-block-is-executing-after-calling-sys-exit0-in-except-block

父进程退出后如何退出子进程

我们知道当子进程推出的时候,父进程会收到 SIGCHLD 信号,从而可以采取相应的操作。但是当父进程退出的时候,系统会把子进程的父进程更改为pid=0的 init 进程,而且子进程不会收到任何信号。而我们经常想在父进程退出的时候,让子进程也推出。在 Python 中可以有如下几种做法。

# 设置子进程为 daemon

这里的 daemon 和系统的守护进程没有任何关系,是 quit_when_parent_dies 的意思。也就是当父进程退出的时候,会自动尝试关闭 daemon=True 的子进程。

“`
p = multiprocessing.Process(target=foo)
p.daemon = True
p.start()
“`

[官方文档](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.daemon)

# 在子进程中设置 PDEATHSIG

在 Linux 中,进程可以要求内核在父进程退出的时候给自己发信号。使用系统调用 prctl。

“`
prctl(PR_SET_PDEATHSIG, SIGHUP);
“`

在 Python中也有[对应的包 python-prctl](https://github.com/seveas/python-prctl),可以在**子进程**中这样使用,这样在父进程挂掉的时候,子进程就会收到 SIGHUP 信号:

“`
# apt-get install build-essential libcap-dev
# pip install python-prctl

import signal
import prctl

prctl.set_pdeathsig(signal.SIGHUP)
“`

缺点:只支持 linux

# 父进程在终止的时候回收子进程

可以使用 atexit.register 在主进程中注册代码:

“`
# pip install psutil

import psutil
import atexit
import os
import signal

@atexit.register
def kill_children():
print(‘quitting, press Ctrl-C to force quit’)
current_process = psutil.Process()
children = current_process.children(recursive=True)
for child in children:
print(‘Child pid is {}’.format(child.pid))
os.kill(child.pid, signal.SIGTERM)
“`

使用 atexit 在[收到 SIGTERM 的时候并不能触发](http://yifei.me/note/558),所以最好使用 signal 注册到主进程对应的信号上。

缺点是当使用 kill -9 或者使用 os._exit 的时候不会调用这些函数。

Python 操作 ssh

“`
import paramiko
ip=’server ip’
port=22
username=’username’
password=’password’
cmd=’some useful command’
ssh=paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip,port,username,password)
stdin,stdout,stderr=ssh.exec_command(cmd)
outlines=stdout.readlines()
resp=”.join(outlines)
print(resp)
“`

Python 转义 html 实体字符

在网页中经常出现 <, &, &0x0026; 这些特殊字符,这是 html 实体字符转义,用于防止 XSS 攻击。Python3 标准库中包含了 html.entities 模块,可以用于转义和反转义这些字符。

html.entities.entitydefs 中包含了名称到符号的映射:比如{'amp': '&'}
html.entities.name2codepoint 中包含了名称到数字的映射:比如 {'amp': 0x0026}
html.entities.codepoint2name 中包含了数字到名称的映射:比如 {0x0026: 'amp'}

Python 中的 queue 模块

在Python中多进程或者多线程之间通信可以使用队列,标准库中实现了一个线程安全的库 queue.Queue,和进程安全的库 multiprocessing.Queue

There are 3 kind of queues: `Queue LifoQueue HeapQueue

“`
q = Queue(size)

get(block=True) return a object
get_nowait()
put(item, block=True) put a object

qsize()
empty()
full()

task_done() # indicate one item process finished raise ValueError
join() # wait until all item processed

Queue.Empty
Queue.Full
“`

How to iterate a queue?

by using `iter(q, None)`, note that you have to put a sentinel value manually

Queue vs multiprocessing.Queue

despite their similar api, their implementation is completely different

http://stackoverflow.com/questions/925100

如果在一个线程使用了异步代码,那么所有的操作都必须使用异步操作,但是并不是所有的操作都需要或者能够使用异步操作。

在异步线程和同步线程之间分享数据需要使用一个共用的queue

如果需要把异步操作分布式不熟使用,在异步的事件循环之间分享数据也需要使用一个queue

Python functools 中有用的一些函数

functools

partial(fn, *args, **kwargs)

# lru_cache

Decorator to wrap a function with a memoizing callable that saves up to the maxsize most recent calls. It can save time when an expensive or I/O bound function is periodically called with the same arguments.
Since a dictionary is used to cache results, the positional and keyword arguments to the function must be hashable.
If maxsize is set to None, the LRU feature is disabled and the cache can grow without bound. The LRU feature performs best when maxsize is a power-of-two.
If typed is set to true, function arguments of different types will be cached separately. For example, f(3) and f(3.0) will be treated as distinct calls with distinct results.
To help measure the effectiveness of the cache and tune the maxsize parameter, the wrapped function is instrumented with a cache_info() function that returns a named tuple showing hits, misses, maxsize and currsize. In a multi-threaded environment, the hits and misses are approximate.
The decorator also provides a cache_clear() function for clearing or invalidating the cache.

# singledispatch

To define a generic function, decorate it with the @singledispatch decorator. Note that the dispatch happens on the type of the first argument, create your function accordingly

“`
@singledispatch
def fun()
pass

@fun.register(int)
def fun_int()
pass
“`

to get the dispatched func. use fun.dispatch(type)

Python 的内置类型和函数

# 内置类型

Python的内置类型按照类来分,包括了 numerics, sequences, mappings, files, classes, instances 和 exceptions
。对用户自定义类型的实例,如果__nonzero__或者__len__返回是 0 或 False 会被认为是假。

## 注意浅拷贝

“`
>>> lists = [[]] * 3
>>> lists
[[], [], []]
>>> lists[0].append(3)
>>> lists
[[3], [3], [3]]
“`

## 字典的一些方法

“`
keys()/values()
iterkeys()/itervalues()
items()/iteritems()
update()
get(key, default)
“`

可能会抛出

read/readline/readlines
write/writelines

## 字符串方法

“`
str.capitalize() 首字母大写
center/ljust/rjust(width, filled_char) 扩充并使得原字符串居中
decode(codecsm, how_to_handle_erroe) throws UnicodeError
encode(codecs, how_to_handle_error) throws UnicodeError
startswith(string or tuple)
endswith(seq, start, end)
find(seq, start, end)
isalnum/isalpha/isdigit/islower/isupper/isspace
lower()/upper()/title()
strip/lstrip/rstrip(chars)
partition(seq) 返回一个三元组前半部分,seq,后半部分
replace(old, new, count)
split(seq, count)
splitlines(keepends)
zfill(width) 左边填零
“`

# 内置函数

## 函数式编程对应的内置函数

“`
all
any
callable
filter
iter
map
next
reduce(fn, iter, init)
reload
“`

## 操作属性的函数

“`
delattr
dir return a object’s attributes
getattr(object, name, default) when default supplied, no exception thrown
hasattr
globals
locals
“`

## 内置数学库

“`
compile
complex
bin Convert an integer number to a binary string
abs
divmod
enumerate
eval
execfile
file
hex
max/min
oct
pow
“`

## 类型

“`
frozenset
bool
dict
float note float(‘NaN’), float(‘-inf’)
int(x, base=10)
list
long
object
“`

* bytearray可以认为是一个可变的 string
* frozenset是一个immutable, hashable的 set

## 其他函数

“`
format
enumerate
id
input/raw_inoput
isinstance(object, class/class_tuple)
insubclass(class, class)
len
open
print
range/xrange(start, stop, step)
“`