数据库

Boto3 访问 S3 的基本用法

以前我以为文档坑爹只有一种方式,那就是没有文档。最近在用 boto3, 才让我认识到了文档的另一种坑爹方式:屁话太多。

具体来说是这样的:Boto3 中有两种 API, 低级和高级。其中低级 API 是和 AWS 的 HTTP 接口一一对应的,
通过 boto3.client(“xxx”) 暴露。高级接口是面向对象的,更加易于使用,通过 boto3.resource(“xxx”) 暴露,
美中不足是不一定覆盖了所有的 API.

坑爹的 AWS 文档中,经常混用 resource 和 client 两套接口,也没有任何提示,文档的首页除了简单的提了一句有两套 API 外再没有单独的介绍了。
在没写这篇文章之前,我的脑子里都是乱的,总觉得 S3(Simple Storage Service) 的这个狗屁接口哪里配得上 Simple 这个词,
一会儿是 listobject, 一会儿是 listobject_v2 的。相反,高级 API 是很简单易用的,然而这样一个简单的 API 被深深地埋在了一大堆的低级 API 中,
网上的文章也是一会儿 boto3.client, 一会儿 boto3.resource. 除了有人特意提问两者的区别,很难看到有人说这俩到底是啥。

吐槽完毕。

最近总是用到 S3, 在这里记录一下 Boto3 的简单用法了。Boto3 是整个 AWS 的 SDK, 而不只是包括 S3. 还可以用来访问 SQS, EC2 等等。

如果没有特殊需求的话,建议使用高级 API. 本文一下就记录一些 boto3.resource("s3") 的例子。

import boto3

import boto3

session = boto3.Session(
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    region_name=REGION_NAME,
)

# s3 = boto3.resource("s3")
s3 = session.resouce("s3")

# 创建一个 bucket
bucket = s3.create_bucket(Bucket="my-bucket")

# 获得所有的 bucket, boto 会自动处理 API 的翻页等信息。
for bucket in s3.buckets.all():
    print(bucket.name)

# 过滤 bucket, 同样返回一个 bucket_iterator
s3.buckets.fitler()

# 生成一个 Bucket 资源对象
bucket = s3.Bucket("my-bucket")
bucket.name  # bucket 的名字
bucket.delete()  # 删除 bucket

# 删除一些对象
bucket.delete_objects(
    Delete={
        'Objects': [
            {
                'Key': 'string',
                'VersionId': 'string'
            },
        ],
        'Quiet': True|False
    },
)
# 返回结果
{
    'Deleted': [
        {
            'Key': 'string',
            'VersionId': 'string',
            'DeleteMarker': True|False,
            'DeleteMarkerVersionId': 'string'
        },
    ],
    'RequestCharged': 'requester',
    'Errors': [
        {
            'Key': 'string',
            'VersionId': 'string',
            'Code': 'string',
            'Message': 'string'
        },
    ]
}

# 下载文件
bucket.download_file(Key, Filename, ExtraArgs=None, Callback=None, Config=None)

# 下载到文件对象,可能会自动开启多线程下载
with open('filename', 'wb') as data:
    bucket.download_fileobj('mykey', data)

# 上传文件
object = bucket.put_object(Body=b"data"|file, ContentMD5="", Key="xxx")

# 这个方法会自动开启多线程上传
with open('filename', 'rb') as f:
    bucket.upload_fileobj(f, 'mykey')

# 列出所有对象
bucket.objects.all()

# 过滤并返回对象
objects = bucket.objects.filter(
    Delimiter='string',
    EncodingType='url',
    Marker='string',
    MaxKeys=123,
    Prefix='string',
    RequestPayer='requester',
    ExpectedBucketOwner='string'
)

# 创建一个对象
obj = bucket.Object("xxx")
# 或者
obj = s3.Object("my-bucket", "key")

obj.bucket_name
obj.key

# 删除对象
obj.delete()
# 下载对象
obj.download_file(path)
# 自动多线程下载
with open('filename', 'wb') as data:
    obj.download_fileobj(data)
# 获取文件内容
rsp = obj.get()
body = rsp["Body"].read()  # 文件内容
obj.put(Body=b"xxx"|file, ContentMD5="")

# 上传文件
obj.upload_file(filename)
# 自动多线程上传
obj.upload_fileobj(fileobj)

如果想进一步了解,建议直接点开参考文献 2, 阅读下 resouce 相关接口的文档,其他的低级 client 接口完全可以不看。

参考

  1. https://stackoverflow.com/questions/42809096/difference-in-boto3-between-resource-client-and-session
  2. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#service-resource

MySQL 内部原理面试常考题

  • InnoDB 有行级别的锁,而 MyISAM 只能锁定到表级别。
  • InnoDB 有更好的故障恢复机制。
  • InnoDB 实现了事务、外键和关系限制,MyISAM 没有。

总的来说,引用完整性和事物才是数据库的本质,所以说:“MyISAM is a file system that understands SQL. There’s no comparison. If you want a database engine with MySQL, use InnoDB.”

聚簇索引

MyISAM 没有使用聚簇索引,InnoDB 使用了聚簇索引。

四种隔离界别

  1. 读未提交 Read Uncommitted(在本次事务中可以读到其他事务中没有提交的数据 – 脏数据)
  2. 读已提交 Read Committed (只能读到其他事务提交过的数据。如果在当前事务中,其他事务有提交,则两次读取结果不同)
  3. 可重复读 Repeatable Read(MySQL 默认,保证了事务中每次读取结果都相同,而不管其他事物是否已经提交。会出现幻读)
  4. 序列化 Serializable(隔离级别中最严格的,开启一个 serializable 事务,那么其他事务对数据表的写操作都会被挂起)

  5. 读未提交:别人修改数据的事务尚未提交,在我的事务中也能读到。

  6. 读已提交:别人修改数据的事务已经提交,在我的事务中才能读到。
  7. 可重复读:别人修改数据的事务已经提交,在我的事务中也读不到。
  8. 串行:我的事务尚未提交,别人就别想改数据。

聚簇索引

InnoDB 使用聚簇索引,聚簇索引按照主键的顺序在磁盘上。MyISAM 不使用聚簇索引,行按照插入顺序在磁盘上。

聚簇索引的优势在于按照主键范围读取,而劣势在于主键中插入可能造成性能问题。

参考文献

  1. https://learnku.com/articles/13849/understanding-four-isolation-levels-in-mysql#2000d4
  2. https://dba.stackexchange.com/questions/1/what-are-the-main-differences-between-innodb-and-myisam
  3. https://jeremystein.com/journal/innodb-versus-myisam-no-comparison/

使用 partition by 查找并删除 MySQL 数据库中重复的行

在创建 MySQL 数据表的时候,经常会忘记给某个字段添加 unique 索引,但是等到想添加的时候又已经有了重复数据,这时候就需要删除重复数据。

准备数据

本文使用如下的数据作为演示:

CREATE TABLE contacts (
    id INT PRIMARY KEY AUTO_INCREMENT,
    first_name VARCHAR(50) NOT NULL,
    last_name VARCHAR(50) NOT NULL,
    email VARCHAR(255) NOT NULL
);

INSERT INTO contacts (first_name,last_name,email)
VALUES ("Carine ","Schmitt","[email protected]"),
       ("Jean","King","[email protected]"),
       ("Peter","Ferguson","[email protected]"),
       ("Janine ","Labrune","[email protected]"),
       ("Jonas ","Bergulfsen","[email protected]"),
       ("Janine ","Labrune","[email protected]"),
       ("Susan","Nelson","[email protected]"),
       ("Zbyszek ","Piestrzeniewicz","[email protected]"),
       ("Roland","Keitel","[email protected]"),
       ("Julie","Murphy","[email protected]"),
       ("Kwai","Lee","[email protected]"),
       ("Jean","King","[email protected]"),
       ("Susan","Nelson","[email protected]"),
       ("Roland","Keitel","[email protected]"),
       ("Roland","Keitel","[email protected]");

注意其中有一行重复了三次。输入完成后,数据如图所示:

file

查找重复的行

使用 group by 和 having

假设我们要通过 email 字段来查找重复值。通过使用 group by 和 having 子句可以查找到哪些行是重复的。

SELECT
    email,
    COUNT(email)
FROM
    contacts
GROUP BY email
HAVING COUNT(email) > 1;

file

Having 就类似于 Group by 之后的 where 子句。但是这个语句还是很难解决我们的问题,我们只知道发生重复的第一行了,而不知道哪些行是重复的。这时候可以使用 partition by 语句。

使用 partition by 找出所有的重复行

需要注意的是,partition by 只有在 MySQL 8.0 之后才支持,而现在常用的是 5.6 和 5.7 版本。

Partition 语句又叫做窗口聚合语句,也就是说他会把同一个值的行聚合成一个窗口,但是和 Group by 语句不同的是,窗口内的每一个行并没有被压缩成一行,具体说 Partition by 的语法是:

window_function_name(expression)
    OVER (
        [partition_defintion]
        [order_definition]
        [frame_definition]
    )

删除重复的行

删除的方法有很多种,这里介绍两种。

References

  1. https://www.mysqltutorial.org/mysql-window-functions/
    2.

Redis 与 Pika scan 性能对比

Redis 是后端常用的键值数据库。Pika 是 360 出品的一款与 Redis 协议几乎兼容的数据库。与 Redis 不同的是,Pika 基于硬盘,使用 RocksDB 作为引擎,从容量上来说,比基于内存的 Redis 大了不少,而且在性能上也能满足一般需求。

我们知道,在 Redis 中,keys * 这个操作仅限于在本地调试使用,千万不能用于线上,因为这会遍历整个数据库,可能引起数据库长时间无响应,甚至崩溃。在线上服务器,如果想要查找某个模式的键,可以使用 scan 命令。比如说要查找 user: 前缀的所有键,可以使用 scan 0 user:* 命令。然而如果服务器上的键非常多的话,虽然不会卡死服务器了,但是这个过程依然会很漫长。

Redis 是使用 hash table 实现的,所以 scan 命令其实也是遍历所有键,拿到每个键再做过滤,而不能直接读取符合对应 pattern 的键。我们使用下面的代码来验证一下 redis scan 的性能。

from redis import Redis
from uuid import uuid4
import time

def gen(r):
    for i in range(10000000):
        r.set(str(uuid4()), 1)
    r.set("user:1", "bar")

def scan(r):
    start = time.time()
    for key in r.scan_iter("user:*"):
        print("user=%s" % r.get(key).decode())
        duration = time.time() - start
        print("duration for finding user is %.3f" % duration)
    duration = time.time() - start
    print("duration for full scan is %.3f" % duration)

if __name__ == "__main__":
    import sys
    port = int(sys.argv[1])
    r = Redis(port=port)
    gen(r)
    scan(r)

首先插入一千万个随机数据,然后从中查找我们的目标数据。结果如下:

-> % python3 rb.py 6379
user=bar
duration for finding user is 80.145
duration for full scan is 180.936

和我们的预期基本是相符的,也就是说 Redis 是首先遍历然后再做过滤的。

接下来我们对 Pika 做相同的实验,Pika 默认使用 9221 端口,我们只需要把端口换一下就好了:

-> % python3 rb.py 9221
user=bar
duration for finding user is 0.002
duration for full scan is 0.003

结果是令人震惊的!Pika 几乎在瞬间就完成了遍历。原因在于 Pika 使用了 RocksDB,而 RocksDB 支持 Range 操作。RocksDB 中的数据都是有序的,所以查找起来就不需要 O(n) 了,只需要二分查找,也就是 O(logN) 即可。

Python Redis 客户端连接池解析

Python Redis 的客户端使用了链接池机制,通过复用链接可以减低服务器的压力并在失败时重试。链接池其实是一种很通用的机制,在实现客户端时是一个经常需要(或许其实不需要)重复发明的轮子。

Redis 客户端一共涉及到了三个类:

  • Connection,表示一个到服务器的链接
  • ConnectionPool,链接池
  • Redis,使用连接池,并在失败时重试

Connection 类解析

Connection 类主要负责建立和 Redis 服务器的一个 Socket 链接,并且沟通相关信息。下面的代码是 Connection 类和 socket 处理相关的代码。

class Connection(object):
    def __del__(self):
        try:
            self.disconnect()
        except Exception:
            pass

    def connect(self):
        """
        连接 Redis 服务器
        """
        if self._sock:
            return
        try:
            sock = self._connect()
        except socket.timeout:
            raise TimeoutError("Timeout connecting to server")
        except socket.error:
            e = sys.exc_info()[1]
            raise ConnectionError(self._error_message(e))

        self._sock = sock
        try:
            self.on_connect()
        except RedisError:
            # clean up after any error in on_connect
            self.disconnect()
            raise

        # run any user callbacks. right now the only internal callback
        # is for pubsub channel/pattern resubscription
        for callback in self._connect_callbacks:
            callback(self)

    def _connect(self):
        """
        建立链接的具体过程,主要是 socket 操作
        """

    def disconnect(self):
        """
        关闭链接
        """
        self._parser.on_disconnect()
        if self._sock is None:
            return
        try:
            self._sock.shutdown(socket.SHUT_RDWR)
            self._sock.close()
        except socket.error:
            pass
        self._sock = None

    def send_packed_command(self, command):
        if not self._sock:
            self.connect()
        
        # 发送命令到服务器

可以看出,Connection 类主要是在 socket 上的一层薄薄封装。当然,这个 Connection 不是线程安全的。

ConnectionPool 类解析

redis.py 的代码中 ConnectionPool 分了两个类,基类 ConnectionPool,还有一个子类 BlockingConnectionPool。
这里我感到有些不解,既然只有一个子类,不知道为什么还要分成两个类呢?可能是开始时候规划了好几个子类,最后只实现了一个吧……
其中 ConnectionPool 类不会阻塞,而 Blocking ConnectionPool 则限定了链接的数量。

其中 BlockingConnection 类不只是线程安全的,还是进程安全的。

class ConnectionPool(object):
    def __init__(self, connection_class=Connection, max_connections=None,
                 **connection_kwargs):
        max_connections = max_connections or 2 ** 31
        if not isinstance(max_connections, (int, long)) or max_connections < 0:
            raise ValueError(""max_connections" must be a positive integer")

        self.connection_class = connection_class
        self.connection_kwargs = connection_kwargs
        self.max_connections = max_connections

        self.reset()  # 调用 reset 初始化一些属性

    def reset(self):
        self.pid = os.getpid()  # 通过 pid 检查实现进程安全
        self._created_connections = 0
        self._available_connections = []  # 直接使用一个 list 来存放连接
        self._in_use_connections = set()
        self._check_lock = threading.Lock()

    def _checkpid(self):
        # 如果当前的 connection 是 fork 来的,直接关闭链接
        if self.pid != os.getpid():
            with self._check_lock:
                if self.pid == os.getpid():
                    # 另一个线程已经检查了,直接返回
                    return
                self.disconnect()
                self.reset()

    def get_connection(self, command_name, *keys, **options):
        # 从连接池中取一个连接,注意这里是弹出,也就是同一个链接只有一个用户使用
        self._checkpid()
        try:
            connection = self._available_connections.pop()
        except IndexError:
            connection = self.make_connection()
        self._in_use_connections.add(connection)
        return connection

    def make_connection(self):
        # 创建一个新的连接
        if self._created_connections >= self.max_connections:
            raise ConnectionError("Too many connections")
        self._created_connections += 1
        return self.connection_class(**self.connection_kwargs)

    def release(self, connection):
        # 使用完毕连接后需要显式调用 release 把连接归还到连接池中。
        self._checkpid()
        if connection.pid != self.pid:
            return
        self._in_use_connections.remove(connection)
        self._available_connections.append(connection)

    def disconnect(self):
        # 断开所有连接
        all_conns = chain(self._available_connections,
                          self._in_use_connections)
        for connection in all_conns:
            connection.disconnect()

class BlockingConnectionPool(ConnectionPool):
    """
    这个连接池的实现是线程安全的
    """
    def __init__(self, max_connections=50, timeout=20,
                 connection_class=Connection, queue_class=LifoQueue,
                 **connection_kwargs):

        self.queue_class = queue_class  # 使用一个队列来存放连接
        self.timeout = timeout  # 增加了超时功能
        super(BlockingConnectionPool, self).__init__(
            connection_class=connection_class,
            max_connections=max_connections,
            **connection_kwargs)

    def reset(self):
        self.pid = os.getpid()
        self._check_lock = threading.Lock()

        # 首先在队列中填满 None,后面会用到,这里很关键
        self.pool = self.queue_class(self.max_connections)
        while True:
            try:
                self.pool.put_nowait(None)
            except Full:
                break

        # Keep a list of actual connection instances so that we can
        # disconnect them later.
        self._connections = []

    def make_connection(self):
        # 创建一个链接,貌似和上面的函数没有什么区别。
        connection = self.connection_class(**self.connection_kwargs)
        self._connections.append(connection)
        return connection

    def get_connection(self, command_name, *keys, **options):
        """
        获取一个新的连接,最长等待 timeout 秒

        如果我们读取到的新连接是 None 的话,就会创建一个新的连接。因为我们使用的是 LIFO 队列,也就是栈,
        所以我们优先得到的是已经创建的链接,而不是最开始放进去的 None。也就是我们只有在需要的时候才会创建
        新的连接,也就是说连接数量是按需增长的。
        """
        # 确保没有更换进程
        self._checkpid()

        # 尝试获取一个连接,如果在 timeout 时间内失败的话,抛出 ConnectionError
        connection = None
        try:
            connection = self.pool.get(block=True, timeout=self.timeout)
        except Empty:
            # 需要注意的是这个错误并不会被 redis 捕获,需要用户自己处理
            raise ConnectionError("No connection available.")

        # 如果真的没有连接可用了,直接创建一个新的连接
        if connection is None:
            connection = self.make_connection()

        return connection

    def release(self, connection):
        # 释放连接到连接池
        self._checkpid()
        if connection.pid != self.pid:
            return

        # Put the connection back into the pool.
        try:
            self.pool.put_nowait(connection)
        except Full:
            # perhaps the pool has been reset() after a fork? regardless,
            # we don"t want this connection
            pass

    def disconnect(self):
        # 释放所有的连接
        for connection in self._connections:
            connection.disconnect()

redis.Redis 类解析

Redis 类中使用了 ConnectionPool,如果没有显式创建的话,会自动创建一个线程池。所以每次你在使用 Redis 的时候,其实已经在使用线程池了。

class Redis:
    def __init__(self...):
        if not connection_pool:
            connection_pool = ConnectionPool(**kwargs)
        self.connection_pool = connection_pool
    def execute_command(self, *args, **options):
        # 执行每条命令都会调用该方法
        pool = self.connection_pool
        command_name = args[0]
        # 弹出一个连接
        connection = pool.get_connection(command_name, **options)
        try:
            # 尝试调用 redis
            connection.send_command(*args)
            return self.parse_response(connection, command_name, **options)
        except (ConnectionError, TimeoutError) as e:
            # 如果是连接问题,关闭有问题的连接,下面再次使用这个连接的时候会重新连接。
            connection.disconnect()
            if not connection.retry_on_timeout and isinstance(e, TimeoutError):
                raise
            # 再次尝试调用 redis
            connection.send_command(*args)
            return self.parse_response(connection, command_name, **options)
        finally:
            # 不管怎样都要把这个连接归还到连接池
            pool.release(connection)

MySQL 性能小技巧和在 Django 中的应用

对于 delete update insert 等语句一定要使用 limit 子句限制影响的行数,避免一次更改特别多的行,造成数据库假死

while (1) {
    // 每次只做 1000 
    mysql_query("DELETE FROM logs WHERE log_date <= "2009-11-01" LIMIT 1000");
    if (mysql_affected_rows() == 0) {
        // 没得可删了退出
        break;
    }
    // 每次都要休息一会儿
    usleep(50000);
}

2. 垂直分割

把不会用作索引的,或者是过长的字段可以考虑使用其他存储引擎,比如 rocksdb 等。

3. IPv4 地址可以存为 uint32

使用 uint32 存储 IP 地址不光可以节省空间,而且可以按区间查询。

4. 避免 select *

从数据库里读出越多的数据,那么查询就会变得越慢。并且,如果你的数据库服务器和应用服务器是两台独立的服务器的话,这还会增加网络传输的负载。

所以,你应该养成一个需要什么就取什么的好的习惯。

不要使用:

SELECT * FROM user WHERE user_id = 1

使用:

SELECT username FROM user WHERE user_id = 1

在 django 中,可以使用 only

books = Book.objects.filter(author="Jim").only("book_name")

5. 当只要一行数据时使用 LIMIT 1

当你查询表的有些时候,你已经知道结果只会有一条结果,但因为你可能需要去 fetch 游标,或是你也许会去检查返回的记录数。

在这种情况下,加上 LIMIT 1 可以增加性能。这样一样,MySQL 数据库引擎会在找到一条数据后停止搜索,而不是继续往后查少下一条符合记录的数据。

下面的示例,只是为了找一下是否有“中国”的用户,很明显,后面的会比前面的更有效率。(请注意,第一条中是 Select *,第二条是 Select 1)

SELECT * FROM user WHERE country = "China"
SELECT 1 FROM user WHERE country = "China" LIMIT 1

在 Django 中可以使用 [:1] 来添加 limit 1

6. EXPLAIN 你的 SELECT 查询

使用 EXPLAIN 关键字可以让你知道 MySQL 是如何处理你的 SQL 语句的。这可以帮你分析你的查询语句或是表结构的性能瓶颈。

7. 尽量让查询能 fit 进内存中

参考:

  1. https://coolshell.cn/articles/1846.html

MySQL 中的 wait_timeout 是做什么的?

Mysql 中默认的 waittimeout 和 interactivetimeout 的值是八小时,也就是一个连接(交互式和非交互式的)只有在 8 小时没有活动之后才会被关闭掉。对于互联网公司来说,这个值实在太大了,一个库可能被很多脚本和服务访问,可能只是一个简短的查询就不需要数据库了,如果每个查询都占据了 8 小时的时间,那么 mysql 很快连接数就会满了,报出 too many connections 错误。

mysql 默认的连接数可以修改 max_connections 参数,但是一个服务器能支撑的连接数显然是由硬件决定的。

设置 waittimeout 过短可能会造成一些问题,如果在 django 中两次查询的之间时间大于 waittimeout,就会报 (2006, ‘MySQL server has gone away’)。django 官方给的建议是:

  1. 当你的脚本不需要使用数据库的时候,主动关闭连接,比如在 django 中使用 from django.db import connection; connection.close()
  2. 增大 wait_timeout 的值

不过 django 默认 CONNMAXAGE 是 0,也就是在查询数据库之后会立即关闭链接,理论上应该不会报这个错误。但是这样不能复用链接,会造成对数据压力很大。

CONNMAXAGE 应该小于数据库本身的最大连接时间 wait_timeout,否则应用程序可能会获取到连接超时的数据库连接,这时会出现 MySQL server has gone away 的报错。

可以在 settings.py 中动态地获取并填充这个值,然后写到 CONNMAXAGE 中

理论上这样就不会再报错了,但是难免数据库重启或者什么时候会报错,总是使用 closeoldconnections 还是很烦。

有一种思路是在检测到和数据库链接断开的时候,自动重连,但是这样会破坏 django.db.atomic,但是可以实现一种不同的 backend。可以参考这两个:

  1. https://github.com/django/django/pull/2740/commits/38f58aa4d751ad83f1dc76d5b945a1036239584f

  2. https://github.com/django/django/pull/2454/commits/36b8bf870cab183b7ad63c0d8e7e8c02e314a053#diff-f8a587a973ef4c3a94d7550a5b85342c

还有一种解决思路是使用 connection pooling,我们可以使用 sqlalchemy 的 连接池作为 django 连接数据库的工具。参考这里, 不过这种方法比较 hack。

参考

  1. https://code.djangoproject.com/ticket/21597#no2
  2. https://github.com/django/django/commit/2ee21d9f0d9eaed0494f3b9cd4b5bc9beffffae5
  3. https://stackoverflow.com/questions/1125504/django-persistent-database-connection
  4. django 优化
  5. https://docs.djangoproject.com/en/2.1/ref/databases/#persistent-connections
  6. 如何设置 max_age

redis 常见问题

主要参考这篇文章:https://mp.weixin.qq.com/s/vS8IMgBIrfGpZYNUwtXrPQ

1. 集合操作避免范围过大

使用 sortedset、set、list、hash 等集合类的 O(N) 操作时要评估当前元素个数的规模以及将来的增长规模,对于短期就可能变为大集合的 key,要预估 O(N) 操作的元素数量,避免全量操作,可以使用 HSCAN、SSCAN、ZSCAN 进行渐进操作。集合元素数量过大在使用过程中会影响 redis 的实际性能,元素个数建议尽量不要超过 5000,元素数量过大可考虑拆分成多个 key 进行处理。

2. 合理使用过期时间

如果 key 没有设置超时时间,会导致一直占用内存。对于可以预估使用生命周期的 key 应当设置合理的过期时间或在最后一次操作时进行清理,避免垃圾数据残留 redis。redis 不是垃圾桶。

3. 利用批量操作命令

假设要给一个集合导入 5000 个元素:

方案 1:直接使用 redis 的 HSET 逐个设置

for _ in 0..5000
    HSET hash, k,v

结果:失败。redis ops 飙升,同时接口响应超时

方案 2:改用 redis 的 HMSET 一次将所有元素设置到 hash 中

map<k, v> = 50000 个元素
HMSET hash map

结果:失败。出现 redis 慢日志

方案 3:依然使用 HMSET,只是每次设置 500 个,循环 100 次

map_chunk<k, v> = 500 个元素
for i in 0..100
    HMSET hash map_chunk[i]

结果:成功

MSET/HMSET 等都支持一次输入多个 key,LPUSH/RPUSH/SADD 等命令都支持一次输入多个 value, 也要注意每次操作数量不要过多,建议控制在 500 个以内

4. 合理设置值的大小

String 类型尽量控制在 10KB 以内。虽然 redis 对单个 key 可以缓存的对象长度能够支持的很大,但是实际使用场合一定要合理拆分过大的缓存项,1k 基本是 redis 性能的一个拐点。当缓存项超过 10k、100k、1m 性能下降会特别明显。关于吞吐量与数据大小的关系可见下面官方网站提供的示意图。

在局域网环境下只要传输的包不超过一个 MTU(以太网下大约 1500 bytes),那么对于 10、100、1000 bytes 不同包大小的处理吞吐能力实际结果差不多。

5. 禁用一些命令

keys、monitor、flushall、flushdb 应当通过 redis 的 rename 机制禁掉命令,若没有禁用,开发人员要谨慎使用。其中 flushall、flushdb 会清空 redis 数据;keys 命令可能会引起慢日志;monitor 命令在开启的情况下会降低 redis 的吞吐量,根据压测结果大概会降低 redis50% 的吞吐量,越多客户端开启该命令,吞吐量下降会越多。

keys 和 monitor 在一些必要的情况下还是有助于排查线上问题的,建议可在重命名后在必要情况下由 redis 相关负责人员在 redis 备机使用,monitor 命令可借助 redis-faina 等脚本工具进行辅助分析,能更快排查线上 ops 飙升等问题。

6. 避免大量 key 同时过期

如果大量的 key 过期时间设置得过于集中,到过期的时间点,redis 可能会出现短暂的卡顿现象。一般需要在时间上加一个随机值,使得过期时间分散一些。

7. Redis 如何做持久化

bgsave 做镜像全量持久化,aof 做增量持久化。因为 bgsave 会耗费较长时间,不够实时,在停机的时候回导致大量丢失数据,所以需要 aof 来配合使用。在 redis 实例重启时,优先使用 aof 来回复内存状态,如果没有 aof 日志,就会使用 rdb 来恢复。

如果 aof 文件过大会导致恢复时间过长,不过 redis 会定期做 aof 重写,压缩 aof 文件日志大小。在 redis 4.0 之后还有了混合持久化的功能,将 bgsave 的全量和 aof 的增量做了融合处理,这样既保证了回复的效率有兼容了数据的安全性。

为了避免断电时后丢失数据,还可以设置 aof 日志的 sync 属性,极端情况下,可以每次写入都执行,不过会对性能有影响,一般每秒一次就可以。

8. 保存失败

redis 报错:Can’t save in background: fork: Cannot allocate memory。

原因是 redis 在后台保存的时候会直接 fork 一下,然后保存。由于数据库过大,就会 fork 失败,但是实际上由于 copy-on-write 机制的存在,并不会产生问题。所以可以直接更改系统的配置,允许 fork。

/etc/sysctl.conf 文件修改如下:

vm.overcommit_memory=1

然后重新加载:

sysctl -p /etc/sysctl.conf

参考资料:

  1. https://stackoverflow.com/questions/11752544/redis-bgsave-failed-because-fork-cannot-allocate-memory

redis 中如何给集合中的元素设置 TTL

我们知道在 redis 中可以给每个 key 设置过期时间(TTL),但是没法为每个集合中的每一个元素设置过期时间,可以使用 zset 来实现这个效果。

直接上代码吧,Python 版的。

class RedisSet:

    def __init__(self, key):
        self.client = redis.StrictRedis()
        self.key = key

    def add(self, val, ttl=60):
        now = time.time()
        # 把过期时间作为 score 添加到 zset 中
        self.client.zadd(self.key, now + ttl, val)
        # 删除已经过期的元素
        self.client.zremrangebyscore(self, now, "+inf")

    def getall(self):
        # 只读取还没有过期的元素
        return self.client.zrangebyscore(self.key, "-inf", time.time())

参考

  1. https://github.com/antirez/redis/issues/135