Month: 八月 2018

Django 中使用多个数据库

有时候我们的表并不都在一个数据库中,需要使用多个数据库,django 支持配置并使用多个数据库。

定义多个数据库

首先,在 DATABASES 中定义需要使用的多个数据库:

DATABASES = {
    "default": {},
    "users": {
        "NAME": "user_data",
        "ENGINE": "django.db.backends.mysql",
        "USER": "mysql_user",
        "PASSWORD": "superS3cret"
    },
    "customers": {
        "NAME": "customer_data",
        "ENGINE": "django.db.backends.mysql",
        "USER": "mysql_cust",
        "PASSWORD": "veryPriv@ate"
    }
}

注意其中 default 是必须的,不过用不到的话,留空也行。

在使用 manage.py 的时候可以使用 --database=xxx 里指定数据库。

数据库路由

可以通过实现 Database Router 来让 django 自动选择应该使用的数据库。

DB router 需要实现下面四个方法,用来指定不同的 Model 对应的模型。

  1. db_for_read(model, **hints) 用来读取表时,查找对应的数据库。返回数据库配置名(DATABASES中定义的)
  2. db_for_write(model, **hints) 用来写入表时,查找对应的数据库。
  3. allow_relation
  4. allow_migrate

最后使用 DATABASE_ROUTERS 安装对应的路由:

DATABASE_ROUERS = ["path.to.router"]

rabbitmq 教程

更新:弃坑了,rabbitmq 在我这里总是崩溃,实在没法正常使用

评估了几款 Message Queue,发现还是 rabbitmq 比较简单一些,各种特性也支持地很好。网上好多教程说“rabbitmq 非常重量级,适合企业应用开发”,这些话可以说是人云亦云,瞎扯了。实际上 rabbitmq 采用 erlang 开发,不光性能强大,而且从操作和运维上来说都是非常轻量级的。

基础概念

rabbitmq 实现的是 AMQP 0.9.1 协议,其中重要概念有:

  • producer:生产者,生产消息
  • consumer:消费者,消费消息
  • routing-key: 每个消息中决定消息如何分发的参数
  • exchange:类似路由,消息实际发送给 exchange,可以指定几种不同的分发算法,然后用 routing-key 作为参数计算出该发送到哪个队列中,一个exchange 可以和一个或者多个 queue 绑定,exchange 有如下几种分发算法
    • direct,直接按照 routing-key 和 queue 名字匹配
    • fan-out,发送到所有绑定的 queue 中
    • topic,利用 routing-key 和 queue 的名字模式匹配
  • queue:缓冲消息,需要和 exchange 绑定
  • binding:指的是 exchange 和 queue 之间的绑定关系

安装

Ubuntu:

sudo apt-get install rabbitmq-server

Python 客户端 pika

pip install pika

基础使用

和其他一些队列不一样的是,rabbitmq 的队列需要显式创建,不能直接发消息过去生成。可以使用 sudo rabbitmqctl list_queues 命令查看已有的队列。

下面是实现一个生产者,多个消费者的关系,如图所示:

生产者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue="hello", durable=True)  # 声明一个队列,rabbitmq 中的队列必须首先创建才能使用

# 发送消息需要指明发送到的 exchange,留空表示默认 exchange
# 默认的 exchange 会根据 routing-key 把消息发到对应的队列中
channel.basic_publish(exchange="",
                      routing_key="hello",
                      body="Hello World!",  # 消息体
                      properties=pika.BasicProperties(
                         delivery_mode = 2,  # AMQP 定义的,其中 1 代表不要持久化,2 代表需要持久化
                      ))
print(" [x] Sent "Hello World!"")

# 最后关闭链接
connection.close()

消费者

消费者通过注册处理函数,来消费消息,可以同时使用多个消费者消费同一个队列。

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()


channel.queue_declare(queue="hello", durable=True)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)  # 最多有一个消息没有 ack
channel.basic_consume(callback,
                      queue="hello",
                      no_ack=False)  # 默认情况加就是 False,也就是需要 ack

print(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()

可以直接使用多个消费者来消费同一个队列,默认情况下 rabbitmq 采用了 round robin 的算法,也就是消息会依次发送给每一个消费者。

如果没有 ack 的话,rabbitmq 的内存最终可能会占满

使用其他的 exchange

rabbitmq 中默认的 exchange 是 direct exchange,也就是直接把收到的消息放到 routing key 对应的队列中。rabbitmq 还支持不少其他的类型,可以看文章开始的讨论。

下面的例子通过使用一个 fanout 类型的 exchange 来实现消息发送给所有消费者。

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()

# 声明一个 fanout 类型的 exchange,名字为 logs
channel.exchange_declare(exchange="logs",
                         exchange_type="fanout")

message = " ".join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange="logs",
                      routing_key="",
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消费者

Exchange 需要和 queue 绑定才会发送消息,否则会直接丢掉。
queue 需要和 exchange 绑定之后才能够接收到消息,而所有的 queue 默认已经是和默认 exchange 绑定的,所以在上一个例子中并没有使用绑定。

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()

channel.exchange_declare(exchange="logs",
                         exchange_type="fanout")

# 声明一个临时的私有 queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 绑定 queue 到刚刚声明的 exchange
channel.queue_bind(exchange="logs",
                   queue=queue_name)

print(" [*] Waiting for logs. To exit press CTRL+C")

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

常见问题

在一个循环中发送消息,为什么有时候会提示 Channel Closed?

使用 BlockingConnection 需要手动管理心跳,如果超过心跳时间就会被关闭链接。常见的错误包括使用了 time.sleep 导致长时间没有 publish 消息,从而链接被关闭。

可以通过单独开一个心跳线程的方法,或者使用 connction.sleep。当然使用 connction.sleep 无法避免本身操作时长超过了心跳时间的情况。

channel 和 connection 的区别?

Connection 表示的是到 rabbitmq broker 的一个物理连接,一般一个程序使用一个链接,或者使用一个连接池,可以使用心跳来维护一个链接,理论上应该在多个线程之间分享,很遗憾 python 的客户端 pika 并不是线程安全的。

而channel 则应该是短时效的,在每个线程内部创建,不是线程安全的。

  1. https://stackoverflow.com/questions/18418936/rabbitmq-and-relationship-between-channel-and-connection
  2. https://www.rabbitmq.com/tutorials/amqp-concepts.html

如果客户端重启,之前的匿名队列会被删除吗?如果没有别删除,还能连接上之前的匿名队列吗?如果连不上是不是消息就都丢了?

To be answered

UI管理工具

在向队列中发消息的过程中,尤其是在学习或者排查错误的时候,可以通过 rabbitmq 的管理工具来查看当前消息队列中的消息。

首先,激活管理工具插件:

rabbitmq-plugins enable rabbitmq_management

然后添加用户

rabbitmqctl add_user username password
rabbitmqctl set_user_tags username administrator
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"

然后可以打开:http://server-name:15672/ 查看,使用刚刚设置的密码登录

参考:

  1. http://www.rabbitmq.com/management.html
  2. https://www.rabbitmq.com/tutorials/tutorial-three-python.html
  3. https://github.com/pika/pika/issues/196

网络协议概述

网络每层的头部基本上就是添加上本层的地址,还有一些校验和控制位

运输层的 MAC 地址是点到点的,没传递一次就会把发送者和接受者用 ARP 转换,替换为经过的路由器的 MAC 地址。而 IP 层的地址和端口号是端到端,从发送到结束始终不变。除非经过上层协议改变了地址和端口号,比如代理服务器或者 NAPT 路由器。

交换机没有任何地址,只有端口的概念。内部有一个转发表,记录了物理端口和 MAC 的对应关系,通过自主学习来建立。每台主机上都有自己的高速 ARP 地址缓存和路由表。可以通过 ip nip r 命令查看。

VLAN 可以理解为逻辑上将一台交换机分割成数台虚拟交换机,且这些虚拟交换机互不相通。Vlan 是广播域,而通常两个广播域之间由路由器相连接,广播域之间来往的数据帧通过路由器中继。因此 Vlan 间的通信也需要路由器(或者三层交换机)提供中继服务,即“Vlan 间路由”。

图解一致性哈希

起源

比如你有 N 个 cache 服务器(后面简称 cache ),那么如何将一个对象 object 映射到 N 个 cache 上呢,你很可能会采用类似下面的通用方法计算 object 的 hash 值,然后均匀的映射到到 N 个 cache ;

hash(object) % N

一切都运行正常,再考虑如下的两种情况;

  1. 一个 cache 服务器 m down 掉了(在实际应用中必须要考虑这种情况),这样所有映射到 cache m 的对象都会失效,怎么办,需要把 cache m 从 cache 中移除,这时候 cache 是 N-1 台,映射公式变成了 hash(object) % (N-1)

  2. 由于访问加重,需要添加 cache ,这时候 cache 是 N+1 台,映射公式变成了 hash(object) % (N+1)

1 和 2 意味着什么?这意味着突然之间几乎所有的 cache 都失效了。对于服务器而言,这是一场灾难,洪水般的访问都会直接冲向后台服务器;

再来考虑第三个问题,由于硬件能力越来越强,你可能想让后面添加的节点多做点活,显然上面的 hash 算法也做不到。

有什么方法可以改变这个状况呢,这就是 consistent hashing…

一致性哈希

一致性哈希把哈希值想象成一个环,比如说在 0 ~ 2^32-1 这个范围内,然后将节点(名字、IP等)求哈希之后分不到环上。当有访问请求时,把请求信息求哈希之后,寻找小于该哈希值的下一个节点。

当有节点宕机的时候,请求会依次查找下一个节点,从而不让所有节点的缓存都失效。

当加入新节点的时候,只会影响一个区间内的请求,也不会影响其他区间。

如下图所示:

虚拟节点

以上虽然解决了大部分问题,但是还有三个问题:

  1. 节点有可能在分布不均匀。
  2. 当一个节点因为负载过重宕机以后,所有请求会落到下一台主机,这样就有可能使下一台主机也宕机,这就是雪崩问题。
  3. 不同主机处理能力不同如何配置不同的量。

这时候可以引入虚拟节点。原始的一致性哈希中,每个节点通过哈希之后在环上占有一个位置,可以通过对每个节点多次计算哈希来获得过个虚拟节点。

比如说,本来我们通过节点的 IP 来计算哈希

hash("10.1.1.1")  => n1
hash("10.1.1.2")  => n2
hash("10.1.1.3")  => n3

现在引入两倍的虚拟节点之后

hash("10.1.1.1-1")  => n1-1
hash("10.1.1.1-2")  => n1-2
hash("10.1.1.2-1")  => n2-1
hash("10.1.1.2-2")  => n2-2
hash("10.1.1.3-1")  => n3-1
hash("10.1.1.3-2")  => n3-2

如图所示

引入虚拟节点之后:

  1. 平衡性得到了直接改善
  2. 主机是交替出现的,所以当一个节点宕机后,所有流量会随机分配给剩余节点
  3. 可以给处理能力强的节点配置更多地虚拟节点。

最后,一致性哈希可以用跳表或者平衡二叉树来实现

参考文档

  1. https://blog.csdn.net/MBuger/article/details/76189561
  2. https://www.cnblogs.com/23lalala/p/3588553.html
  3. https://crossoverjie.top/2018/01/08/Consistent-Hash/