Web 后端

flask 全家桶学习笔记(未完待续)

看到标题有的同学可能就问了,flask 是一个微框架,哪儿来的全家桶啊。其实作为一个框架来说,除非你提供的只有静态页面,那么肯定要和数据库打交道的,肯定是要有后台登录管理以及提供 API 等等一大堆常规工作要做的,这时候就需要各种全家桶组件了,那么这篇文章里介绍的就是 flask + peewee + login + admin + uwsgi 等等一系列的工具。

hello world

使用 blueprint

flask 登录与管理组件

flask-login

https://flask-login.readthedocs.io/en/latest/

flask admin

https://github.com/flask-admin/flask-admin/blob/master/examples/peewee/app.py

Application Factory Pattern

在前面的例子中,我们都直接在模块中 app = Flask(__name__) 了,这样做实际上是有问题的。官方推荐使用 app factory pattern。

app factory pattern 其实也很简单,就是把 app 的创建包装在了 create_app 函数中,这样做的好处主要有两点:

方便多环境部署

直接导入 app 的话,已经初始化了,无法再更改 app 的配置

from example import app

如果把 app 的创建包装在一个函数中,可以在创建 app 的时候传递不同的参数,可以区分开发测试等不同环境。

def create_app(**kwargs):
    app = Flask(**kwargs)
    return app

from example import create_app
app = create_app(DB_CONN="production")

方便依赖管理

默认情况下,代码可能是这样的,所有的代码都得依赖 app.py

# app.py
app = Flask(__name__)
db = SQLAlchemy(app)

# models.py
from example import db

class User(db.Model):
    pass

使用了 app factory pattern 之后,每个模块都可以不依赖 app.py,而是使用自己的 blueprint

def create_app():
    app = Flask(__name__)
    from example.models import db
    db.init_app(app)
    return app

# models.py
db = SQLAlchemy()

class User(db.Model):
    pass

swagger+flasgger

swagger 是一套定义 API 的工具,可以实现 API 的文档化和可交互。flasgger 是 flask 的一个插件,可以实现在注释中使用 swagger 语法。

swagger 本身是一套工具,但是后来被社区发展成了 OpenAPI 规范。最新版本是 OpenAPI 3.0,而现在用的最多的是 swagger 2.0。我们这里

完整的例子

https://github.com/coleifer/peewee/blob/master/examples/twitter/app.py

参考文献

  1. https://blog.csdn.net/u010466329/article/details/78522992
  2. https://blog.csdn.net/qq_21794823/article/details/78194164
  3. http://www.manongjc.com/article/48448.html
  4. https://juejin.im/post/5964ce816fb9a06bb21abb23
  5. https://www.cnblogs.com/whitewolf/p/4686154.html
  6. 为什么要使用 APP Factory Pattern

uwsgi

uwsgi 的使用和性能优化配置

假设我们编写了如下的 flask 应用,要用 uwsgi 部署,希望性能越高越好,那么下面是一份还说得过去的配置。

from flask import Flask

app = Flask(__name__)

@app.route("/")
def hello():
    return "world"

if __name__ == "__main__":
    app.run()

对应的 uwsgi 配置

[uwsgi]
wsgi-file=app.py  # 应用的主文件
callable=app  # 应用中的 flask 实例
chdir=/opt/app  # chdir 到给定目录
env= XXX=XXX  # 额外的环境变量

# 以下三者任选其一
http=0.0.0.0:5000  # 如果直接暴露 uwsgi 的话用这个
http-socekt=0.0.0.0:5001  # 如果用nginx反向代理的话,用这个
socket=:3031  # 在 3031 使用 uwsgi 协议,nginx 中使用 uwsgi_pass 更高效

chmod-socket = 664

pidfile=xxx  # pid 文件路径
venv=xxx  # 虚拟环境路径
logto = /var/log/www.log

# 并发设置
workers = 2  # 一般为 CPU 核数 * 2
threads = 2  # 线程比进程开销更小一点。如果没有使用 threads 那么 thread 直接不工作的,必须使用 enable_threads。
max-requests = 100000  # 处理过多少个请求后重启进程,目的是防止内存泄露
master = true  # 使用 max-requests 必须采用这个选项
listen = 65536  # 每个进程排队的请求数量,默认为 100 太小了。并发数 = procsses * threads * listen
buffer-size = 65536  # header 的 buffer 大小,默认是 4 k
thunder-lock = true  # 避免惊群效应
uid=www-data
gid=www-data
harakiri=30  # 所有进程在 30s 没有响应后傻屌
log-slow=3000  # 记录满于 3000 毫秒的请求
# lazy-apps  # 不使用 prefork,而是在需要时才启动进程

# 监控设置
stats = 127.0.0.1:9191  # 可以使用 uwsgi top 监控
python-autoreload=1  # 自动重载,开发时非常方便

# 静态文件
check-static = /var/static  # 尝试从该目录下加载静态文件
static-map = /static=/var/static  # 把对应目录映射
route = /static/(.*)\.png static:/var/www/images/pngs/$1/highres.png  # 使用高级路由模式
offload-threads = 10  # 让 uwsgi 启动额外的进程处理

参考

  1. https://blog.zengrong.net/post/2568.html
  2. https://stackoverflow.com/questions/34255044/why-use-uwsgi-max-requests-option/34255744
  3. https://blog.csdn.net/apple9005/article/details/76232852
  4. https://mhl.xyz/Python/uwsgi.html
  5. https://stackoverflow.com/questions/34824487/when-is-thunder-lock-beneficial

云时代的个人存储搭建

昨天想用 iPad 上的 GoodReader 看一本书,但是从 iCloud 同步的时候出了些问题,进度始终为零。由于国内糟糕的网络环境,这种同步失败的问题时有发生。虽然可以直接通过 WiFi 把书从电脑上传过来,但是因为偶尔需要在另一个 iPad 上查看,为了同步进度,还是最终决定还是自己搭建一套云存储设施。

# ftp 与 webdav

ftp 协议有诸多问题,现在用的已经很少了。WebDav 协议基于 HTTP,相比 FTP 有不少有点,可以参见文章1。另外不少开源的网盘客户端也支持 webdav。NextCloud 支持 webdav,后面会讲到

# sftp 和 sshfs

sftp 则和 ftp 是完全独立的两个东西,虽然最终目的是一样的。好比海豚和鲨鱼都是在海里的生物,但是一个是哺乳动物,而一个是鱼类。sftp 基于 ssh 协议。

sshfs 相比 sftp 则更近了一步,通过 sftp 把远程的文件系统直接映射到本地,从而无缝衔接。

## 搭建

sftp 直接基于 linux 的用户和文件权限系统。

### 添加相应的用户和分组,以用户名 sftp,分组名 ftpaccess 为例。

“`
% sudo groupadd ftpaccess
% sudo useradd -m sftp -g ftpaccess -s /usr/sbin/nologin
% sudo passwd sftp # 更改密码
% sudo mkdir /var/sftp
% sudo chown root /var/sftp # 这一步非常坑,切记不可省略,后面讲为什么
% sudo mkdir -p /var/sftp/files
% sudo chown sftp:ftpaccess /var/sftp/files
“`

### 修改 /etc/ssh/sshd_config 文件

注释掉这一行 `Subsystem sftp /usr/lib/openssh/sftp-server`

然后在文件的结尾添加

“`
Subsystem sftp internal-sftp
Match group ftpaccess
ChrootDirectory /var/sftp # 这里可以随便指定你想要的顶级目录
X11Forwarding no
AllowTcpForwarding no
ForceCommand internal-sftp
PasswordAuthentication yes
“`

ssh 的安全配置要求 ChrootDirectory 本身必须是 root 所有的,所以登录都的根目录我们是不可写的,但是可以在新建的目录中读写。

### 重启 ssh 服务

“`
% sudo systemctl restart ssh
“`

可以使用客户端链接啦~ 如果需要使用 publickey 登录的话,只需要像普通的用户一样,把文件传到 ~sftp 的对应目录就可以了。

### 使用 sshfs mount 到本地

“`
% brew install sshfs
% brew cask install osxfuse
% sshfs -o allow_other,defer_permissions -o volname=sftp_files sftp@your.example.com:/files $HOME/sftp_files
“`

![](https://ws3.sinaimg.cn/large/006tKfTcly1g09iu47ttpj30i207cabm.jpg)

# nextcloud

未完待续

# 参考资料
1. https://stackoverflow.com/questions/11216884/which-file-access-is-the-best-webdav-or-ftp
2. [SSHFS](https://github.com/osxfuse/osxfuse/wiki/SSHFS)
3. [搭建 sftp 服务器](https://askubuntu.com/questions/420652/how-to-setup-a-restricted-sftp-server-on-ubuntu)
4. [sftp 的一个坑](https://www.minstrel.org.uk/papers/sftp/builtin/)

使用 cProfile 和火焰图调优 Python 程序性能

本来想坐下来写篇 2018 年的总结,仔细想想这一年发生的事情太多了,还是写篇技术文章吧。

前几天调试程序,发现 QPS 总是卡在 20 左右上不去。开始以为是 IO 问题,就多开了些并发,然并卵,这才想到可能是 CPU 的问题。看了看监控,发现程序某一步的延迟在 400ms 左右,而且这一步是 CPU 密集的。当时开了 4 台双核的机器:(1s / 400ms) * 2 * 4 = 20 啊。看来需要优化下这一步的代码了,那么第一步就是找到可以优化的地方。

![](https://ws2.sinaimg.cn/large/006tNbRwly1fyqb4f5jhij30yc0pwtej.jpg)

测量程序的性能之前并没有实际做过,Google 了一番,感觉标准库的 cProfile 似乎值得一试。

要测量的代码逻辑也很简单,使用 lxml 解析 HTML,然后提取一些字段出来,这些基本都是调用的 C 库了,解析的算法也不在 Python 中。看起来似乎没有什么能改进的地方,不管怎样,还是先跑一下吧。

cProfile 有多种调用方法,可以直接从命令行调用:

“`
python -m cProfile -s tottime your_program.py
“`

其中的 `-s` 的意思是 sort。常用的 sort 类型有两个:

1. tottime,指的是函数本身的运行时间,扣除了子函数的运行时间
2. cumtime,指的是函数的累计运行时间,包含了子函数的运行时间

要获得对程序性能的全面理解,经常需要两个指标都看一下。

不过在这里,我们并不能直接使用命令行方式调用,因为我的代码中还需要一些比较繁重的配置加载,如果把这部分时间算进去了,多少有些干扰,那么我们也可以直接在代码中调用 cProfile。

使用 cProfile 的代码如下:

“`
import cProfile, pstats, io

pr = cProfile.Profile()
pr.enable()

extractor.extract(crawl_doc=doc, composition=PageComposition.row, rule=rule)

pr.disable()
s = io.StringIO()
sortby = “cumtime” # 仅适用于 3.6, 3.7 把这里改成常量了
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
ps.print_stats()
print(s.getvalue())
“`
把需要 profile 的代码放到 pr.enable 和 pr.disable 中间就好了。注意这里我们使用了 cumtime 排序,也就是累计运行时间。

结果如下:

![](https://ws2.sinaimg.cn/large/006tNbRwly1fyqc3j79mvj31o80lanbu.jpg)

我们可以看到总的运行时间是 200ms,而其中红框内的部分就占了 100ms! 如果能够优化调的话,性能一下子就能提高一倍。红框内的代码是做什么的呢?我们知道解析一个 html 文档,第一步是建立 DOM 树,通常情况下,我们可能会从其中抽取一些链接。在网页中,链接不一定是绝对路径,也可能是 `/images/2018-12-31-xxx.jpg` 这样的相对路径。lxml 库帮我们做了一个贴心的默认值,那就是在构造 DOM 树的时候,根据传入的 url 来吧页面中的所有 url 都重写成绝对路径。看起来这是个很贴心的功能,但是在这里却成了性能瓶颈。为什么很耗时呢?大概是因为需要遍历整个 DOM 树,重写所有的链接吧。这显然不是我们需要的,我们只需要把抽取之后的链接还原成绝对路径就好了,而不是事先把所有链接都重写一遍。所以在这里我们直接去掉这个功能就好了。

修改代码之后,再次运行 profile 脚本,时长变成了 100ms:

![](https://ws2.sinaimg.cn/large/006tNbRwly1fyqcamiop3j31ly08sjwy.jpg)

这时候我们接着看,程序中下一个比较大头的时间占用:jsonfinder 和 json decode/encode。

jsonfinder 是一个有意思的库,它自动从 HTML 中查找 json 字符串并尝试解析,但是有时候也不太准。经常为了找到特定的值,还是需要使用正则。那么对于这个可有可无的功能,性能有这么差,还是删掉好了。

通过删代码,现在性能已经是原来的四倍了。

这时候发现代码里面有正则还挺花费时间的,不过还好,暂时先不管了。

![](https://ws2.sinaimg.cn/large/006tNbRwly1fyqcja5bj5j31oi0kydva.jpg)

刚刚都是只运行了一遍,测量结果难免有随机性,必定有失偏颇,实际上应该使用多个测试用例,成千上万次的跑,才能得到一个比较准确地结果。

上面这个小步骤基本没有什么可以优化的了,下面我们把优化目标扩大一点,并把次数先定为100.

下面这种图是按照 tottime 来排序的:

![](https://ws4.sinaimg.cn/large/006tNbRwly1fyqclx98l2j31lg0gu7ei.jpg)

注意其中最耗时的步骤是 parseUnicodeDoc,也就是建树了,这是符合预期的,然而旁边的 ncalls 一栏却不太对劲了。我们明明只运行了 100 次,为什么这个函数调用了 300 次呢?显然代码中有重复建树的地方,也就是有隐藏的 bug。这种地方不经过 profile 很难浮现出来,因为程序本身的逻辑是对的,只是比较耗时而已。

![](https://ws3.sinaimg.cn/large/006tNbRwly1fyqcrdtifcj31lw0j2woi.jpg)

优化之后,终于变成了 100. 从 cProfile 的表格现在已经看不出什么结果来了,下一步我们开始使用火焰图,可视化往往能让我们更容易注视到性能瓶颈。(为什么不一开始就用火焰图呢?因为我以为很麻烦。。实际很简单)

Python 中有一个第三方包(见参考文献)可以直接从 cProfile 的结果生成火焰图:

1. 在原有的代码中加上一句: `pr.dump_stats(“pipeline.prof”)`
2. 调用该工具:`flameprof pipeline.prof > pipeline.svg`

然后打开 SVG 文件就可以了:

![](https://ws1.sinaimg.cn/large/006tNbRwly1fyqcvj3ah8j30ys0jm42r.jpg)

其中火焰的宽度代表了运行的时长,我们现在的优化目标就是这些耗时比较长的步骤。

可以看大其中 mysql 的访问占了绝对的大头,按理说跑100次的话,不应该每次都花费时间在建立连接上啊,这里一定有问题。经过排查发现在某处链接是使用了 `close_old_connections` 来保证不会抛出数据库断开的异常,这还是在头条带来的习惯。。close_old_connections 的功能是关闭已经失效的链接,看来我的理解还是有误的。先把这块删掉,最终解决应该是这块放到一个队列里,统一存入数据库。

去掉之后:

![](https://ws4.sinaimg.cn/large/006tNbRwly1fyqd1728dwj30xd0cvwgn.jpg)

现在的大头又变成 lxml 的,又动了优化它的心思,lxml 是 libxml2 的一个 Python binding,查了下应该是最快的 html parser 了,这块真的没有什么优化空间。盯了一会儿,眼睛最终看到了一个小角落:

![](https://ws1.sinaimg.cn/large/006tNbRwly1fyqd351qyvj309v04omxm.jpg)

一个正则匹配居然占用了 8% 的运行时间,太不像话了。老早之前就听说 Python 的标准库正则性能不行,现在才发现原来是真的挺差劲的。Python 标准库的 re 模块采用的是 PCRE 的处理方式,而采用 NFA 的处理方式的正则要快很多,这块还需要再看一下。不过眼下倒是可以直接换一个库来解决。regex 模块是 re 模块的一个 drop-in replacement.

`pip install regex` and `import regex as re`,就搞定了

![](https://ws2.sinaimg.cn/large/006tNbRwly1fyqd8orglej30xi0dh410.jpg)

可以看到正则那块直接消失了。提升还是很大的。时间不早了,当天的优化就到此结束了。上线之后,积压一下子就下去了:

![](https://ws2.sinaimg.cn/large/006tNbRwly1fyqda1lv19j30sw0jmt9v.jpg)

# 后记

要想调试的时候方便,在写代码的时候就要注意,尽量使自己的代码 mock-friendly 一点。如果需要引入外部的数据库、服务、API等等各种资源,最好有一个开关或者选项能够不加载外部资源,或者至少能够很方便地 mock 这些外部服务,这样方便对每一个小单元进行 profile。

总有人吐槽 Python 的性能低下,但是 Python 本来就不是做计算任务的呀,Python 是一门胶水语言,是用来写业务逻辑的,而不是用来写CPU密集的算法的。事实上复杂的解析一般都会用 C++ 这种硬核语言来写了,比如 numpy TensorFlow lxml。大多数程序员一天 90% 的工作除了和产品经理撕逼以外,也就是在写 CRUD,也就是调用这些包。所以瓶颈一般在 IO 上而不在 CPU 上,而解决 IO 的瓶颈手段就多了,Python 中至少有 多进程、多线程、AsyncIO、Gevent 等多种方法。不过方法多其实也是一个弊端,这几种方法可以说是基本互不兼容,对各种第三方库的支持也参差不齐。而 Go 在这方面就做地很好了,语言直接内置了 go 关键字,甚至都不支持多线程。所有的库都是支持一个统一的并发模型,对于使用者来说更简单。

Zen of Python 中有一句:There should be one way — preferably only one way — to do a thing. 这点上 Python 本身没有做到,反倒是 Go 实践地非常好。

扯远了,程序的瓶颈其实不外乎CPU、内存和 IO 三个方面,而 cProfile 和火焰图是判断 CPU 瓶颈的一把利器。

后面还发现了一些性能瓶颈,也列在这里:

1. yaml 的反序列化时间过长。解决方法是添加了一个 Expiring LRU Cache,不要每次都去加载,当然牺牲的是一点点内存,以及当规则变更时会有一些延迟,不过都是可以接受的。之前早就听人说 Thrift 的序列化性能相比 Protobuf 太低,现在想想序列化和反序列化还真是一个很常见的性能瓶颈啊。

2. 存储使用了 360 的 pika,pika 可以理解为一个基于 rocksdb 的硬盘版 redis。最开始的时候没多想,随便找了台机器搭了起开,把上面的问题解决之后,pika 的延迟很快大了起来,机器的监控也显示 IO 基本被打满了。这时候才发现原来这台机器没有用 SSD,果断换了 SSD 问题基本解决了。如果再有问题可能就需要集群了。

性能这个问题其实是典型的木桶理论的场景,系统的整体性能是由最差的一块决定的。所以也是一个不断迭代的过程。

祝大家新年快乐~

# 参考文献

1. https://toucantoco.com/en/tech-blog/tech/python-performance-optimization
2. https://docs.python.org/3.6/library/profile.html
3. https://medium.com/build-smarter/blazing-fast-python-40a2b25b0495
4. https://swtch.com/~rsc/regexp/regexp1.html

Python Redis 客户端连接池解析

Python Redis 的客户端使用了链接池机制,通过复用链接可以减低服务器的压力并在失败时重试。连接池其实是一种很通用的机制,在实现客户端是是一个经常需要(或许其实不需要)重复发明的轮子。

Redis 客户端一共涉及到了三个类:

– Connection,表示一个到服务器的链接
– ConnectionPool,链接池
– Redis,使用连接池,并在失败时重试

# Connection 类解析

Connection 类主要负责建立和 Redis 服务器的一个 Socket 链接,并且沟通相关信息。下面的代码是 Connection 类和 socket 处理相关的代码。

“`
class Connection(object):

def __del__(self):
try:
self.disconnect()
except Exception:
pass

def connect(self):
“””
连接 Redis 服务器
“””
if self._sock:
return
try:
sock = self._connect()
except socket.timeout:
raise TimeoutError(“Timeout connecting to server”)
except socket.error:
e = sys.exc_info()[1]
raise ConnectionError(self._error_message(e))

self._sock = sock
try:
self.on_connect()
except RedisError:
# clean up after any error in on_connect
self.disconnect()
raise

# run any user callbacks. right now the only internal callback
# is for pubsub channel/pattern resubscription
for callback in self._connect_callbacks:
callback(self)

def _connect(self):
“””
建立链接的具体过程, 主要是 socket 操作
“””

def disconnect(self):
“””
关闭链接
“””
self._parser.on_disconnect()
if self._sock is None:
return
try:
self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close()
except socket.error:
pass
self._sock = None

def send_packed_command(self, command):
if not self._sock:
self.connect()
。。。
# 发送命令到服务器
“`

可以看出,Connection 类主要是在 socket 上的一层薄薄封装。当然,这个 Connection 不是线程安全的。

# ConnectionPool 类解析

redis.py 的代码中 ConnectionPool 分了两个类,基类 ConnectionPool,还有一个子类 BlockingConnectionPool。这里我感到有些不解,既然只有一个子类,不知道为什么还要分成两个类呢?可能是开始时候规划了好几个子类,最后只实现了一个吧……

其中 BlockingConnection 类不只是线程安全的,还是进程安全的。

“`
class ConnectionPool(object):
def __init__(self, connection_class=Connection, max_connections=None,
**connection_kwargs):
max_connections = max_connections or 2 ** 31
if not isinstance(max_connections, (int, long)) or max_connections < 0: raise ValueError('"max_connections" must be a positive integer') self.connection_class = connection_class self.connection_kwargs = connection_kwargs self.max_connections = max_connections self.reset() # 调用 reset 初始化一些属性 def reset(self): self.pid = os.getpid() # 通过 pid 检查实现进程安全 self._created_connections = 0 self._available_connections = [] # 直接使用一个 list 来存放连接 self._in_use_connections = set() self._check_lock = threading.Lock() def _checkpid(self): # 如果当前的 connection 是 fork 来的,直接关闭链接 if self.pid != os.getpid(): with self._check_lock: if self.pid == os.getpid(): # 另一个线程已经检查了,直接返回 return self.disconnect() self.reset() def get_connection(self, command_name, *keys, **options): # 从连接池中取一个连接,注意这里是弹出,也就是同一个链接只有一个用户使用 self._checkpid() try: connection = self._available_connections.pop() except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) return connection def make_connection(self): # 创建一个新的连接 if self._created_connections >= self.max_connections:
raise ConnectionError(“Too many connections”)
self._created_connections += 1
return self.connection_class(**self.connection_kwargs)

def release(self, connection):
# 使用完毕连接后需要显式调用 release 把连接归还到连接池中。
self._checkpid()
if connection.pid != self.pid:
return
self._in_use_connections.remove(connection)
self._available_connections.append(connection)

def disconnect(self):
# 断开所有连接
all_conns = chain(self._available_connections,
self._in_use_connections)
for connection in all_conns:
connection.disconnect()

class BlockingConnectionPool(ConnectionPool):
“””
这个连接池的实现是线程安全的
“””
def __init__(self, max_connections=50, timeout=20,
connection_class=Connection, queue_class=LifoQueue,
**connection_kwargs):

self.queue_class = queue_class # 使用一个队列来存放连接
self.timeout = timeout # 增加了超时功能
super(BlockingConnectionPool, self).__init__(
connection_class=connection_class,
max_connections=max_connections,
**connection_kwargs)

def reset(self):
self.pid = os.getpid()
self._check_lock = threading.Lock()

# 首先在队列中填满 None,后面会用到,这里很关键
self.pool = self.queue_class(self.max_connections)
while True:
try:
self.pool.put_nowait(None)
except Full:
break

# Keep a list of actual connection instances so that we can
# disconnect them later.
self._connections = []

def make_connection(self):
# 创建一个链接,貌似和上面的函数没有什么区别。。
connection = self.connection_class(**self.connection_kwargs)
self._connections.append(connection)
return connection

def get_connection(self, command_name, *keys, **options):
“””
获取一个新的连接,最长等待 timeout 秒

如果我们读取到的新连接是 None 的话,就会创建一个新的连接。因为我们使用的是 LIFO 队列,也就是栈,
所以我们优先得到的是已经创建的链接,而不是最开始放进去的 None。也就是我们只有在需要的时候才会创建
新的连接,也就是说连接数量是按需增长的。
“””
# 确保没有更换进程
self._checkpid()

# 尝试获取一个连接,如果在 timeout 时间内失败的话,抛出 ConnectionError
connection = None
try:
connection = self.pool.get(block=True, timeout=self.timeout)
except Empty:
# 需要注意的是这个错误并不会被 redis 捕获,需要用户自己处理
raise ConnectionError(“No connection available.”)

# 如果真的没有连接可用了,直接创建一个新的连接
if connection is None:
connection = self.make_connection()

return connection

def release(self, connection):
# 释放连接到连接池
self._checkpid()
if connection.pid != self.pid:
return

# Put the connection back into the pool.
try:
self.pool.put_nowait(connection)
except Full:
# perhaps the pool has been reset() after a fork? regardless,
# we don’t want this connection
pass

def disconnect(self):
# 释放所有的连接
for connection in self._connections:
connection.disconnect()
“`

# redis.Redis 类解析

Redis 类中使用了 ConnectionPool,如果没有显式创建的话,会自动创建一个线程池。所以每次你在使用 Redis 的时候,其实已经在使用线程池了。

“`
class Redis:
def __init__(self…):
if not connection_pool:
connection_pool = ConnectionPool(**kwargs)
self.connection_pool = connection_pool
def execute_command(self, *args, **options):
# 执行每条命令都会调用该方法
pool = self.connection_pool
command_name = args[0]
# 弹出一个连接
connection = pool.get_connection(command_name, **options)
try:
# 尝试调用 redis
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
except (ConnectionError, TimeoutError) as e:
# 如果是连接问题,关闭有问题的连接,下面再次使用这个连接的时候会重新连接。
connection.disconnect()
if not connection.retry_on_timeout and isinstance(e, TimeoutError):
raise
# 再次尝试调用 redis
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
finally:
# 不管怎样都要把这个连接归还到连接池
pool.release(connection)
“`

rabbitmq 教程

更新:弃坑了,rabbitmq 在我这里总是崩溃,实在没法正常使用

评估了几款 Message Queue,发现还是 rabbitmq 比较简单一些,各种特性也支持地很好。网上好多教程说“rabbitmq 非常重量级,适合企业应用开发”,这些话可以说是人云亦云,瞎扯了。实际上 rabbitmq 采用 erlang 开发,不光性能强大,而且从操作和运维上来说都是非常轻量级的。

# 基础概念

rabbitmq 实现的是 AMQP 0.9.1 协议,其中重要概念有:

* producer:生产者,生产消息
* consumer:消费者,消费消息
* routing-key: 每个消息中决定消息如何分发的参数
* exchange:类似路由,消息实际发送给 exchange,可以指定几种不同的分发算法,然后用 routing-key 作为参数计算出该发送到哪个队列中,一个exchange 可以和一个或者多个 queue 绑定,exchange 有如下几种分发算法
* direct,直接按照 routing-key 和 queue 名字匹配
* fan-out,发送到所有绑定的 queue 中
* topic,利用 routing-key 和 queue 的名字模式匹配
* queue:缓冲消息,需要和 exchange 绑定
* binding:指的是 exchange 和 queue 之间的绑定关系

# 安装

Ubuntu:

“`
sudo apt-get install rabbitmq-server
“`

Python 客户端 pika

“`
pip install pika
“`

# 基础使用

和其他一些队列不一样的是,rabbitmq 的队列需要显式创建,不能直接发消息过去生成。可以使用 `sudo rabbitmqctl list_queues` 命令查看已有的队列。

下面是实现一个生产者,多个消费者的关系,如图所示:

![](https://www.rabbitmq.com/img/tutorials/prefetch-count.png)

生产者

“`
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))
channel = connection.channel()
channel.queue_declare(queue=’hello’, durable=True) # 声明一个队列,rabbitmq 中的队列必须首先创建才能使用

# 发送消息需要指明发送到的 exchange,留空表示默认 exchange
# 默认的 exchange 会根据 routing-key 把消息发到对应的队列中
channel.basic_publish(exchange=”,
routing_key=’hello’,
body=’Hello World!’, # 消息体
properties=pika.BasicProperties(
delivery_mode = 2, # AMQP 定义的,其中 1 代表不要持久化,2 代表需要持久化
))
print(” [x] Sent ‘Hello World!'”)

# 最后关闭链接
connection.close()
“`

消费者

消费者通过注册处理函数,来消费消息,可以同时使用多个消费者消费同一个队列。

“`
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))
channel = connection.channel()

channel.queue_declare(queue=’hello’, durable=True)

def callback(ch, method, properties, body):
print(” [x] Received %r” % body)
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1) # 最多有一个消息没有 ack
channel.basic_consume(callback,
queue=’hello’,
no_ack=False) # 默认情况加就是 False,也就是需要 ack

print(‘ [*] Waiting for messages. To exit press CTRL+C’)
channel.start_consuming()

“`

可以直接使用多个消费者来消费同一个队列,默认情况下 rabbitmq 采用了 round robin 的算法,也就是消息会依次发送给每一个消费者。

如果没有 ack 的话,rabbitmq 的内存最终可能会占满

# 使用其他的 exchange

rabbitmq 中默认的 exchange 是 `direct` exchange,也就是直接把收到的消息放到 routing key 对应的队列中。rabbitmq 还支持不少其他的类型,可以看文章开始的讨论。

下面的例子通过使用一个 fanout 类型的 exchange 来实现消息发送给所有消费者。

![](https://www.rabbitmq.com/img/tutorials/python-three-overall.png)

“`
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))
channel = connection.channel()

# 声明一个 fanout 类型的 exchange,名字为 logs
channel.exchange_declare(exchange=’logs’,
exchange_type=’fanout’)

message = ‘ ‘.join(sys.argv[1:]) or “info: Hello World!”
channel.basic_publish(exchange=’logs’,
routing_key=”,
body=message)
print(” [x] Sent %r” % message)
connection.close()
“`

消费者

Exchange 需要和 queue 绑定才会发送消息,否则会直接丢掉。
queue 需要和 exchange 绑定之后才能够接收到消息,而所有的 queue 默认已经是和默认 exchange 绑定的,所以在上一个例子中并没有使用绑定。

“`
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))
channel = connection.channel()

channel.exchange_declare(exchange=’logs’,
exchange_type=’fanout’)

# 声明一个临时的私有 queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 绑定 queue 到刚刚声明的 exchange
channel.queue_bind(exchange=’logs’,
queue=queue_name)

print(‘ [*] Waiting for logs. To exit press CTRL+C’)

def callback(ch, method, properties, body):
print(” [x] %r” % body)

channel.basic_consume(callback,
queue=queue_name,
no_ack=True)

channel.start_consuming()
“`

# 常见问题

## 在一个循环中发送消息,为什么有时候会提示 Channel Closed?

使用 BlockingConnection 需要手动管理心跳,如果超过心跳时间就会被关闭链接。常见的错误包括使用了 time.sleep 导致长时间没有 publish 消息,从而链接被关闭。

可以通过单独开一个心跳线程的方法,或者使用 [connction.sleep](https://github.com/pika/pika/commit/df6a31630c530559cc61df14c1f23813b870d80a)。当然使用 connction.sleep 无法避免本身操作时长超过了心跳时间的情况。

# channel 和 connection 的区别?

Connection 表示的是到 rabbitmq broker 的一个物理连接,一般一个程序使用一个链接,或者使用一个连接池,可以使用心跳来维护一个链接,理论上应该在多个线程之间分享,很遗憾 python 的客户端 pika 并不是线程安全的。

而channel 则应该是短时效的,在每个线程内部创建,不是线程安全的。

1. https://stackoverflow.com/questions/18418936/rabbitmq-and-relationship-between-channel-and-connection
2. https://www.rabbitmq.com/tutorials/amqp-concepts.html

## 如果客户端重启,之前的匿名队列会被删除吗?如果没有别删除,还能连接上之前的匿名队列吗?如果连不上是不是消息就都丢了?

To be answered

# UI管理工具

在向队列中发消息的过程中,尤其是在学习或者排查错误的时候,可以通过 rabbitmq 的管理工具来查看当前消息队列中的消息。

首先,激活管理工具插件:

“`
rabbitmq-plugins enable rabbitmq_management
“`

然后添加用户

“`
rabbitmqctl add_user username password
rabbitmqctl set_user_tags username administrator
rabbitmqctl set_permissions -p / username “.*” “.*” “.*”
“`

然后可以打开:http://server-name:15672/ 查看,使用刚刚设置的密码登录

![](https://ws1.sinaimg.cn/large/0069RVTdly1fu228vp43hj31kw0v9jyg.jpg)

# 参考:

1. http://www.rabbitmq.com/management.html
2. https://www.rabbitmq.com/tutorials/tutorial-three-python.html
3. https://github.com/pika/pika/issues/196

在阿里云上为内网VPC搭建NAT出口服务器

对于大多数的内部服务来说,我们是不希望他们暴露在公网上的;而且服务之间通过公网通信效率也比较低。阿里云提供了虚拟私网的服务,我们可以把服务都部署在内网。但是与此同时如何让内网的服务器能够上网也就成了问题,毕竟还是经常需要`apt-get` 一下。

首先,不可能每个服务器都绑定一个弹性IP,贵且不说,这样和又把内部服务暴露在了公网。

其次,阿里云提供了专用的NAT服务器,但是太贵了!!!

其实 NAT 服务器也很简单啦,就是一个路由转发而已,利用 iptables 可以轻松实现。下面以一个例子来讲解一下。

首先说一下 NAT 的两种术语:SNAT 和 DNAT。SNAT的意思就是 source NAT,也就是我们访问其他网站,作为 TCP 链接的来源。而 DNAT 就是 destination NAT,也就是我们作为服务器,作为 TCP 链接的重点。在这里我们要实现的是内网上网,而不是内网提供服务,所以我们只需要 SNAT 就好了。

假设我们有三台服务器,在一个内网中,分别是:10.1.1.1, 10.1.1.2, 10.1.1.3。其中 10.1.1.1 绑定了外网IP可以上网。这里要说明的是:阿里云的弹性 IP 实际上是一个“伪IP”,也就是并没有真的绑定到我们的主机上,而是通过 SNAT 和 DNAT 的方式来模拟了绑定IP的行为。可以通过 `ip addr show` 命令验证一下,并没用弹性 IP 的任何信息。

阿里云内网必须建立一个虚拟交换机来连接各个主机,在后台我们可以配置这个主机的路由表。为了实现让 10.1.1.1 作为出口的功能,我们配置交换机的路由表,添加如下一行:

![](https://ws2.sinaimg.cn/large/006tNc79ly1ftbtme2354j319m0eo3zv.jpg)

把所有的流量都转发到 10.1.1.1

然后,在 10.1.1.1 上执行:

“`
echo 1 > /proc/sys/net/ipv4/ip_forward # 打开转发功能

# 所有来自10.0.0.0/8 的流量通过 eth0 发出
iptables -t nat -A POSTROUTING -s 10.0.0.0/8 -o eth0 -j MASQUERADE

iptables -A FORWARD -d 10.0.0.0/8 -j ACCEPT # 有人说需要这两句,但是亲测这两句不需要,但是也不知道什么意思
iptables -A FORWARD -s 10.0.0.0/8 -j ACCEPT
“`

这时候在 10.1.1.2 上就可以上网了

说在最后:自己搭建DNAT/SNAT只能单机,无法做到高可用,因为阿里云不给我们提供VIP。如果你考虑构建高可用的私有云,还是直接购买阿里云的负载均衡+NAT网关吧,它们分别对应DNAT和SNAT,但是可靠性更高。

参考:https://yuerblog.cc/2017/03/25/vpc-in-aliyun/

一篇简单的 Python gRPC 教程

# 安装

“`
pip install grpcio grpcio-tools protobuf googleapis-common-protos
“`

# IDL

grpc 使用 protobuf 来定义接口。按照 protobuf 的 [Style Guide](https://developers.google.com/protocol-buffers/docs/style) 的要求,service 和其中的方法都应该使用 CamelCase。

service 关键字定义一个服务,相当于一个接口。把下面的文件保存为 helloworld.proto

需要注意的是,grpc 中的方法只能接受一个参数,返回一个参数。

“`
// The greeter service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user’s name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloResponse {
string message = 1;
}
“`

## 生成 rpc 代码

“`
python -m grpc_tools.protoc –python_out=. –grpc_python_out=. helloworld.proto
“`

生成了两个文件:

– helloworld_pb2,包含了 protobuf 中结构的定义
– helloworld_pb2_grpc, 包含了 protobuf grpc 接口的定义

## 实现 rpc 服务

“`
from current.futures import ThreadPoolExecutor
from helloworld_pb2 import HelloRepsonse
from helloworld_pb2_grpc import GreeterServicer, add_GreeterServicer_to_server

class Greeter(GreeterServicer):

def SayHello(self, request, context):
return HelloResponse(message=’Hello, %s!’ % request.name)

def SayHelloAgain(self, request, context):
return HelloResponse(message=’Hello again, %s!’ % request.name)

def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port(‘[::]:50051’)
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
“`

## 客户端调用

“`
import grpc
from helloworld_pb2 import HelloRequest
from helloworld_pb2_grpc import GreeterStub

def run():
# NOTE(gRPC Python Team): .close() is possible on a channel and should be
# used in circumstances in which the with statement does not fit the needs
# of the code.
with grpc.insecure_channel(‘localhost:50051′) as channel:
stub = GreeterStub(channel)
response = stub.SayHello(HelloRequest(name=’you’))
print(“Greeter client received: ” + response.message)
“`

# 高级话题

stream

未完待续

# 参考

1. [A simplified guide to gRPC in Python](

序列化协议的选择 json vs msgpack vs thrift vs protobuf

当我们的程序需要保存一些对象到硬盘上供下次运行时使用,或者需要和其他程序交换数据
的时候,需要把对象用某种方式编程二进制字符串然后保存到硬盘上或者发送出去,这种方
法我们一般称作序列化。序列化有很多不同的方法, 一般考虑三个方面:

1. 速度,序列化和反序列化的速度越快越好
2. 体积,序列化之后的文件体积越小越好
3. 跨语言,序列化能够支持的语言越多越好

下面考察几种序列化的方法

1. 语言内置的序列化。比如 Python 的 pickle,显然这种协议只能在一种语言内部使用,
而且对于Python来说,甚至不同版本的 pickle 协议都是不兼容的。
2. json / xml。这两个都可以把对象序列化成人类可读的字符串的形式,但是序列化后之
后体积都变大不少,而且性能也不好,适合于简单的场景。另外一点就是 json 不能定
义 schema(接口规范),*在大型项目中 schema 是必须的*。
3. msgpack 序列化之后的体积也比较紧致,但是同样不能定义 schema。
3. 专门的序列化库。比如 protobuf/thrift。这些库都支持多个语言,需要预先定义
schema, 并且把对象序列化成二进制的模式,性能也都不错,所以我们重点关注一下。

考虑到需要定义接口规范,所以我们只考虑 thrift 和 protobuf 两种

Thrift 的缺点:

– 不支持 uint64。
– 查过一些文档之后,发现 thrift 的性能差于 pb。

所以先淘汰了 thrift。我们选择 protobuf

## 编译步骤放在哪里?

protobuf 和 thrift 两个的用法都是先定义 IDL(接口)文件,然后由编译器编译生成对应的语言
的代码。对于 C++ 这样的编译语言来说问题不大,我们可以把 IDL 编译的过程放到
makefile 里面去,但是对于 Python 这种没有编译的动态语言就尴尬了。具体来说,IDL
文件是需要提交到代码仓库的,但是生成的 Python 代码需不需要呢?

1. 不提交,在运行之前多一个编译步骤,不过可以把编译这一步写到 dockerfile 里面
2. 提交,这样会造成提交的代码冗余,相当于把二进制文件提交到了仓库

所以我还是倾向于只向代码库中提交 `*.proto` 或者 `*.thrift` 源文件,而不提交编译过后的文件。

# Protobuf

## 基本语法

protobuf 现在有两个主流版本,显然 proto2 要被逐渐废弃,本文使用的是 proto3。

“`
syntax = “proto3”;
package foo.bar;
import “myproject/other_protos.proto”;

message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 result_per_page = 3;
enum Corpus {
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 6;
}
Corpus corpus = 4;
}

message SearchResponse {
repeated Result result = 1;
}

message Result {
string url = 1;
string title = 2;
repeated string snippets = 3;
}
“`

上面的结构和 C 语言的 struct 定义很像。

1. message 关键字用于声明一个结构,后面加结构的名字
2. protobuf 3 不支持默认值。
3. 类型。protobuf 中定义的标量类型有 double/float/int32(64)/uint32(64)/bool/string/bytes.
其中 bytes 用来表示任意的二进制字符串
4. 序号,每个字段后面的数字表示的是序号。protobuf 用这个序号来进行高效编码,需要
注意的是,如果要增添字段不能复用已有的序号。
6. 枚举。可以使用 enum 关键字定义枚举。枚举可以定义在 message 的外面或者里面
7. 在一个文件中可以定义多个 message。像是 enum 一样,message 也可以嵌套在另一个
message中。比如可以把上面的 Result 嵌套在 SearchResponse 中。不过这时候再引用
Result,需要使用 SearchResponse.Result
8. message 中可以使用另一个 message 作为类型。
9. 使用 import 语句来引入其他的 proto 文件。这样就可以直接使用引入
10. package 语句用来声明定义的 message 所处的命名空间(namespace)

## 编译

### 在 Python 中使用

ParseFromString: 从字符串中解析protobuf对象. 虽然这个方法名字中包含了string,但是实际上使用的是 bytes。

“`
r = SearchResponse()
r.ParseFromString(data)
“`

SerializeToString: 序列化成字符串。同样使用 bytes。

“`
data = r.SerializeToString()
“`

属性可以直接访问和设置,如果属性名或者类型出错会抛出异常。

repeated 类型的基础类型属性可以像一个数组一样访问,map 类型可以像字典一样访问。但是赋值必须通过 append 和 extend 赋值,而不能直接赋值.

repeated 类型的 message 类型不能使用 append,而必须使用 add 或者 extend 方法。这样可以确保 message 类型被拷贝进去。

# REF

1. https://tech.meituan.com/serialization_vs_deserialization.html
2. https://my.oschina.net/fir01/blog/468123
3. http://colobu.com/2015/01/07/Protobuf-language-guide/
4. https://developers.google.com/protocol-buffers/docs/pythontutorial
5. [在 Python 中使用 ProtoBuf](https://blog.csdn.net/losophy/article/details/17006573)
6. https://developers.google.com/protocol-buffers/docs/reference/python-generated

后端工具和算法集

总结一下后端常用的工具和蕴含的算法

TODO: 应该把 GitHub 5000 star 以上的项目都看一下

– 前端
– 框架
– vue

– 数据库

– mysql(btree)
– postgresql
– redis
– rocksdb/leveldb
– elasticsearch
– memcached

– 消息队列

– kafka
– redis stream
– rabbitmq
– amqp 协议

– RPC 和序列化

– protobuf/grpc
– thrift
– msgpack
– envoy
– service mesh 的思想

– 负载均衡

– 四层和七层负载均衡的区别
– nginx
– LVS/IPVS
– 一致性哈希

– 部署和容器化

– docker
– cgroups
– kubernetes
– borg 论文

– CI
– jenkins

– web 框架

– django(MVC 模式)
– flask

– 日志收集

分布式系统中需要日志可能分布在不同的机器上,要想查找一条错误日志,可能需要 ssh 到不同的机器上,非常浪费时间和精力。可以把日志收集到一个统一的存储中,方便检索查看

– filebeat
– sentry

– 监控

时序数据库

– OpenTSDB
– influxdb
– prometheus

前端面板

– grafana

– 高并发厂家服务

– 计数服务
– [计数系统架构实践一次搞定](http://zhuanlan.51cto.com/art/201706/542217.htm)
– [微博计数器的设计](http://blog.cydu.net/weidesign/2012/09/09/weibo-counter-service-design-2/)
– [instagram 使用 redis 计数](https://instagram-engineering.com/storing-hundreds-of-millions-of-simple-key-value-pairs-in-redis-1091ae80f74c)
– 限流服务
– token bucket

– 搜索引擎相关

– simhash
– tf-idf

– 统一登录

– CAS,[了解CAS协议](https://blog.csdn.net/csdnxingyuntian/article/details/54970102)
– kerbos

– 操作系统

– 内存换页算法
– OPT
– FIFO
– LRU
– CLOCK

– 限流

– 锁

– 分布式锁
– 自旋锁
– 乐观锁、悲观锁