Month: 十一月 2018

使用 Nomad 编排服务

2019-01-02 更新:相对于 Kubernetes 来说,Nomad 还是太简陋了,弃坑

Nomad 是 HashiCorp 出品的一个容器编排服务,相较于重量级的 Kubernetes 来说,Nomad 的特点在于

  1. 轻量级,只有一个二进制文件。K8s 的安装可能就要花上半天,在国内还有万恶的防火墙问题。
  2. 概念也比较清晰,专注于任务的调度的编排,而不像 Kubernetes 一样引入了各种五花八门的概念。
  3. 除了编排容器之外,Nomad 还可以直接编排普通应用,使用 cgroups 安全运行应用。

安装

从官网下载二进制文件,复制到 /usr/local/bin 就好了,不再赘述

使用

$ sudo nomad agent -dev

$ nomad node status
ID        DC   Name   Class   Drain  Eligibility  Status
171a583b  dc1  nomad  <none>  false  eligible     ready

$ nomad server members
Name          Address    Port  Status  Leader  Protocol  Build  Datacenter  Region
nomad.global  127.0.0.1  4648  alive   true    2         0.7.0  dc1         global

Job

Nomad 的调度单元称作 Job,Job 分为了三种类型:

  1. Batch,也就是一次批处理,程序运行之后就结束了。不过也可以通过 cron 字段指定任务定期运行
  2. Service,程序是一个常驻内存的服务,如果退出之后,Nomad 会按照给定的策略重启
  3. System,在每一个 Nomad 节点上都需要运行的服务

Job 可以使用 HCL 文件来定义,HCL 文件在语义上和 JSON 是等价的,只不过是省去了一些多余的引号逗号之类的。也可以使用 JSON 文件来定义。

创建一个新的 Job

创建一个空白的 job 文件

$ nomad job init
Example job file written to example.nomad

打开生成的 example.nomad 文件,我们看到生成了一大推配置,默认定义了一个 redis 服务器的 job。Job 中包含了 Group,Group 中包含了 Task,task 可以认为是我们最终需要运行服务的那个命令。比如这里就是定义了运行 redis:3.2 这个 docker 镜像。

task "redis" {
  # The "driver" parameter specifies the task driver that should be used to
  # run the task.
  driver = "docker"

  # The "config" stanza specifies the driver configuration, which is passed
  # directly to the driver to start the task. The details of configurations
  # are specific to each driver, so please see specific driver
  # documentation for more information.
  config {
    image = "redis:3.2"
    port_map {
      db = 6379
    }
  }

我们可以运行一下这个 job

-> % nomad job run example.nomad
==> Monitoring evaluation "4f5559e0"
    Evaluation triggered by job "example"
    Allocation "98959767" created: node "ecf9f7cd", group "cache"
    Evaluation within deployment: "e66e0957"
    Evaluation status changed: "pending" -> "complete"
==> Evaluation "4f5559e0" finished with status "complete"

然后查看一下 job 的运行状态:

$ nomad status example
...
Allocations
ID        Node ID   Task Group  Version  Desired  Status   Created  Modified
8ba85cef  171a583b  cache       0        run      running  5m ago   5m ago

在最下面一行我们可以看到 Allocation 的状态。Allocation 可以理解为一个 Job 的一个实例化。

我们可以再查看这个 Alloc 的状态:

$ nomad alloc status 8ba85cef
...
Recent Events:
Time                   Type        Description
10/31/17 22:58:49 UTC  Started     Task started by client
10/31/17 22:58:40 UTC  Driver      Downloading image redis:3.2
10/31/17 22:58:40 UTC  Task Setup  Building Task Directory
10/31/17 22:58:40 UTC  Received    Task received by client

查看 Alloc 的日志

$ nomad alloc logs 8ba85cef redis

修改 Job

比如说,我们可以把这个 Job 中 cache task group 需要运行的副本数量改为 3

count = 3

使用 nomad job plan 来 dry run 一下。

$ nomad job plan example.nomad

+/- Job: "example"
+/- Task Group: "cache" (2 create, 1 in-place update)
  +/- Count: "1" => "3" (forces create)
      Task: "redis"
...
Job Modify Index: 7
To submit the job with version verification run:

nomad job run -check-index 7 example.nomad
...

注意到其中返回了一个 check-index 这个是为了避免同时更改同一个 job 造成冲突。

$ nomad job run -check-index 7 example.nomad

集群

在生产环境中,我们当然应该使用集群模式,而不是单机。nomad 可以直接利用 consul 来实现 bootstrap 集群。

服务端配置:

# /etc/nomad.d/server.hcl

data_dir = "/etc/nomad.d"

server {
  enabled          = true
  bootstrap_expect = 3
}

启动:

$ nomad agent -config=/etc/nomad.d/server.hcl

客户端配置:

# /etc/nomad.d/client.hcl

datacenter = "dc1"
data_dir   = "/etc/nomad.d"

client {
  enabled = true
}

启动:

$ nomad agent -config=/etc/nomad.d/client.hcl

Python Redis 客户端连接池解析

Python Redis 的客户端使用了链接池机制,通过复用链接可以减低服务器的压力并在失败时重试。连接池其实是一种很通用的机制,在实现客户端是是一个经常需要(或许其实不需要)重复发明的轮子。

Redis 客户端一共涉及到了三个类:

  • Connection,表示一个到服务器的链接
  • ConnectionPool,链接池
  • Redis,使用连接池,并在失败时重试

Connection 类解析

Connection 类主要负责建立和 Redis 服务器的一个 Socket 链接,并且沟通相关信息。下面的代码是 Connection 类和 socket 处理相关的代码。

class Connection(object):

    def __del__(self):
        try:
            self.disconnect()
        except Exception:
            pass

    def connect(self):
        """
        连接 Redis 服务器
        """
        if self._sock:
            return
        try:
            sock = self._connect()
        except socket.timeout:
            raise TimeoutError("Timeout connecting to server")
        except socket.error:
            e = sys.exc_info()[1]
            raise ConnectionError(self._error_message(e))

        self._sock = sock
        try:
            self.on_connect()
        except RedisError:
            # clean up after any error in on_connect
            self.disconnect()
            raise

        # run any user callbacks. right now the only internal callback
        # is for pubsub channel/pattern resubscription
        for callback in self._connect_callbacks:
            callback(self)

    def _connect(self):
        """
        建立链接的具体过程, 主要是 socket 操作
        """

    def disconnect(self):
        """
        关闭链接
        """
        self._parser.on_disconnect()
        if self._sock is None:
            return
        try:
            self._sock.shutdown(socket.SHUT_RDWR)
            self._sock.close()
        except socket.error:
            pass
        self._sock = None

    def send_packed_command(self, command):
        if not self._sock:
            self.connect()
        。。。
        # 发送命令到服务器

可以看出,Connection 类主要是在 socket 上的一层薄薄封装。当然,这个 Connection 不是线程安全的。

ConnectionPool 类解析

redis.py 的代码中 ConnectionPool 分了两个类,基类 ConnectionPool,还有一个子类 BlockingConnectionPool。这里我感到有些不解,既然只有一个子类,不知道为什么还要分成两个类呢?可能是开始时候规划了好几个子类,最后只实现了一个吧……

其中 BlockingConnection 类不只是线程安全的,还是进程安全的。

class ConnectionPool(object):
    def __init__(self, connection_class=Connection, max_connections=None,
                 **connection_kwargs):
        max_connections = max_connections or 2 ** 31
        if not isinstance(max_connections, (int, long)) or max_connections < 0:
            raise ValueError(""max_connections" must be a positive integer")

        self.connection_class = connection_class
        self.connection_kwargs = connection_kwargs
        self.max_connections = max_connections

        self.reset()  # 调用 reset 初始化一些属性

    def reset(self):
        self.pid = os.getpid()  # 通过 pid 检查实现进程安全
        self._created_connections = 0
        self._available_connections = []  # 直接使用一个 list 来存放连接
        self._in_use_connections = set()
        self._check_lock = threading.Lock()

    def _checkpid(self):
        # 如果当前的 connection 是 fork 来的,直接关闭链接
        if self.pid != os.getpid():
            with self._check_lock:
                if self.pid == os.getpid():
                    # 另一个线程已经检查了,直接返回
                    return
                self.disconnect()
                self.reset()

    def get_connection(self, command_name, *keys, **options):
        # 从连接池中取一个连接,注意这里是弹出,也就是同一个链接只有一个用户使用
        self._checkpid()
        try:
            connection = self._available_connections.pop()
        except IndexError:
            connection = self.make_connection()
        self._in_use_connections.add(connection)
        return connection

    def make_connection(self):
        # 创建一个新的连接
        if self._created_connections >= self.max_connections:
            raise ConnectionError("Too many connections")
        self._created_connections += 1
        return self.connection_class(**self.connection_kwargs)

    def release(self, connection):
        # 使用完毕连接后需要显式调用 release 把连接归还到连接池中。
        self._checkpid()
        if connection.pid != self.pid:
            return
        self._in_use_connections.remove(connection)
        self._available_connections.append(connection)

    def disconnect(self):
        # 断开所有连接
        all_conns = chain(self._available_connections,
                          self._in_use_connections)
        for connection in all_conns:
            connection.disconnect()


class BlockingConnectionPool(ConnectionPool):
    """
    这个连接池的实现是线程安全的
    """
    def __init__(self, max_connections=50, timeout=20,
                 connection_class=Connection, queue_class=LifoQueue,
                 **connection_kwargs):

        self.queue_class = queue_class  # 使用一个队列来存放连接
        self.timeout = timeout  # 增加了超时功能
        super(BlockingConnectionPool, self).__init__(
            connection_class=connection_class,
            max_connections=max_connections,
            **connection_kwargs)

    def reset(self):
        self.pid = os.getpid()
        self._check_lock = threading.Lock()

        # 首先在队列中填满 None,后面会用到,这里很关键
        self.pool = self.queue_class(self.max_connections)
        while True:
            try:
                self.pool.put_nowait(None)
            except Full:
                break

        # Keep a list of actual connection instances so that we can
        # disconnect them later.
        self._connections = []

    def make_connection(self):
        # 创建一个链接,貌似和上面的函数没有什么区别。。
        connection = self.connection_class(**self.connection_kwargs)
        self._connections.append(connection)
        return connection

    def get_connection(self, command_name, *keys, **options):
        """
        获取一个新的连接,最长等待 timeout 秒

        如果我们读取到的新连接是 None 的话,就会创建一个新的连接。因为我们使用的是 LIFO 队列,也就是栈,
        所以我们优先得到的是已经创建的链接,而不是最开始放进去的 None。也就是我们只有在需要的时候才会创建
        新的连接,也就是说连接数量是按需增长的。
        """
        # 确保没有更换进程
        self._checkpid()

        # 尝试获取一个连接,如果在 timeout 时间内失败的话,抛出 ConnectionError
        connection = None
        try:
            connection = self.pool.get(block=True, timeout=self.timeout)
        except Empty:
            # 需要注意的是这个错误并不会被 redis 捕获,需要用户自己处理
            raise ConnectionError("No connection available.")

        # 如果真的没有连接可用了,直接创建一个新的连接
        if connection is None:
            connection = self.make_connection()

        return connection

    def release(self, connection):
        # 释放连接到连接池
        self._checkpid()
        if connection.pid != self.pid:
            return

        # Put the connection back into the pool.
        try:
            self.pool.put_nowait(connection)
        except Full:
            # perhaps the pool has been reset() after a fork? regardless,
            # we don"t want this connection
            pass

    def disconnect(self):
        # 释放所有的连接
        for connection in self._connections:
            connection.disconnect()

redis.Redis 类解析

Redis 类中使用了 ConnectionPool,如果没有显式创建的话,会自动创建一个线程池。所以每次你在使用 Redis 的时候,其实已经在使用线程池了。

class Redis:
    def __init__(self...):
        if not connection_pool:
            connection_pool = ConnectionPool(**kwargs)
        self.connection_pool = connection_pool
    def execute_command(self, *args, **options):
        # 执行每条命令都会调用该方法
        pool = self.connection_pool
        command_name = args[0]
        # 弹出一个连接
        connection = pool.get_connection(command_name, **options)
        try:
            # 尝试调用 redis
            connection.send_command(*args)
            return self.parse_response(connection, command_name, **options)
        except (ConnectionError, TimeoutError) as e:
            # 如果是连接问题,关闭有问题的连接,下面再次使用这个连接的时候会重新连接。
            connection.disconnect()
            if not connection.retry_on_timeout and isinstance(e, TimeoutError):
                raise
            # 再次尝试调用 redis
            connection.send_command(*args)
            return self.parse_response(connection, command_name, **options)
        finally:
            # 不管怎样都要把这个连接归还到连接池
            pool.release(connection)

HAR 格式解析

HAR(HTTP Archive) 文件是一种常见的用来保存 HTTP 请求和响应的格式。本质上,HAR 文件其实就是一个 JSON 文件。

每一个 HAR Entry 都可以有以下记录存在:

  • log
    • creator
    • browser
    • pages
      • pageTimings
    • entries
      • request
        • queryString
        • postData
        • params
      • response
        • cookies
        • headers
        • content
      • cache
      • timings

log

这个是一个 HAR 文件的根字段,其他字段都是该字段的子字段

{
    "log": {
        "version" : "1.2",
        "creator" : {},
        "browser" : {},
        "pages": [],
        "entries": [],
        "comment": ""
    }
}

creator

"creator": {
    "name": "Firebug",
    "version": "1.6",
    "comment": ""
}

browser

同 creator 结构完全一样

pages

"pages": [
    {
        "startedDateTime": "2009-04-16T12:07:25.123+01:00",
        "id": "page_0",
        "title": "Test Page",
        "pageTimings": {...},
        "comment": ""
    }
]

pageTimings

"pageTimings": {
    "onContentLoad": 1720,
    "onLoad": 2500,
    "comment": ""
}

entries

"entries": [
    {
        "pageref": "page_0",
        "startedDateTime": "2009-04-16T12:07:23.596Z",
        "time": 50,
        "request": {...},
        "response": {...},
        "cache": {...},
        "timings": {},
        "serverIPAddress": "10.0.0.1",
        "connection": "52492",
        "comment": ""
    }
]

request

"request": {
    "method": "GET",
    "url": "http://www.example.com/path/?param=value",
    "httpVersion": "HTTP/1.1",
    "cookies": [],
    "headers": [],
    "queryString" : [],
    "postData" : {},
    "headersSize" : 150,
    "bodySize" : 0,
    "comment" : ""
}

queryString

"queryString": [
    {
        "name": "param1",
        "value": "value1",
        "comment": ""
    },
    {
        "name": "param1",
        "value": "value1",
        "comment": ""
    }
]

postData

"postData": {
    "mimeType": "multipart/form-data",
    "params": [],
    "text" : "plain posted data",
    "comment": ""
}

params

"params": [
    {
        "name": "paramName",
        "value": "paramValue",
        "fileName": "example.pdf",
        "contentType": "application/pdf",
        "comment": ""
    }
]

response

"response": {
    "status": 200,
    "statusText": "OK",
    "httpVersion": "HTTP/1.1",
    "cookies": [],
    "headers": [],
    "content": {},
    "redirectURL": "",
    "headersSize" : 160,
    "bodySize" : 850,
    "comment" : ""
 }

content

"content": {
    "size": 33,
    "compression": 0,
    "mimeType": "text/html; charset=utf-8",
    "text": "\n",
    "comment": ""
}

cookies

"cookies": [
    {
        "name": "TestCookie",
        "value": "Cookie Value",
        "path": "/",
        "domain": "www.janodvarko.cz",
        "expires": "2009-07-24T19:20:30.123+02:00",
        "httpOnly": false,
        "secure": false,
        "comment": ""
    }
]

headers

"headers": [
    {
        "name": "Accept-Encoding",
        "value": "gzip,deflate",
        "comment": ""
    },
    {
        "name": "Accept-Language",
        "value": "en-us,en;q=0.5",
        "comment": ""
    }
]

cache

"cache": {
    "beforeRequest": {},
    "afterRequest": {},
    "comment": ""
}

beforeRequest

"beforeRequest": {
    "expires": "2009-04-16T15:50:36",
    "lastAccess": "2009-16-02T15:50:34",
    "eTag": "",
    "hitCount": 0,
    "comment": ""
}

timings

"timings": {
    "blocked": 0,
    "dns": -1,
    "connect": 15,
    "send": 20,
    "wait": 38,
    "receive": 12,
    "ssl": -1,
    "comment": ""
}

参考资料

  1. http://www.softwareishard.com/blog/har-12-spec/#response

Linux 下分区并挂载磁盘

分区

parted -s -a optimal /dev/sda mklabel gpt -- mkpart primary ext4 1 -1s

创建文件系统

mkfs.ext4 /dev/sda1

查看分区结果

parted -l

复制数据

首先挂载到临时分区

mount /dev/sdb1 /mnt

把之前的数据考进去

# rsync -av /home/* /mnt/
OR
# cp -aR /home/* /mnt/

校验数据

diff -r /home /mnt

删除老数据

rm -rf /home/*
umount /mnt

挂载

mount /dev/sdb1 /home

写入到 fstab 中

blkid /dev/sdb1

/dev/sdb1: UUID="e087e709-20f9-42a4-a4dc-d74544c490a6" TYPE="ext4" PARTLABEL="primary" PARTUUID="52d77e5c-0b20-4a68-ada4-881851b2ca99"

在 /etc/fstab 中增加

UUID=e087e709-20f9-42a4-a4dc-d74544c490a6   /home   ext4   defaults   0   2

每一列的含义如下


    UUID – specifies the block device, you can alternatively use the device file /dev/sdb1.
    /home – this is the mount point.
    etx4 – describes the filesystem type on the device/partition.
    defaults – mount options, (here this value means rw, suid, dev, exec, auto, nouser, and async).
    0 – used by dump tool, 0 meaning don’t dump if filesystem is not present.
    2 – used by fsck tool for discovering filesystem check order, this value means check this device after root filesystem.

调整分区大小

首先使用 parted 打开对应的磁盘

tiger@iZ8vbe91kz7sqlvkjdu8p6Z:~$ sudo parted
GNU Parted 3.2
Using /dev/vda
Welcome to GNU Parted! Type "help" to view a list of commands.
(parted) select /dev/vdc
Using /dev/vdc
(parted) resizepart
Partition number? 1
Warning: Partition /dev/vdc1 is being used. Are you sure you want to continue?
Yes/No? yes
End?  [107GB]? 1100G
(parted) print
Model: Virtio Block Device (virtblk)
Disk /dev/vdc: 1100GB
Sector size (logical/physical): 512B/512B
Partition Table: msdos
Disk Flags:

Number  Start   End     Size    Type     File system  Flags
 1      1049kB  1100GB  1100GB  primary  ext4

然后使用 resize2fs 重新调整分区大小

resize2fs /dev/vdb1

参考

  1. https://www.tecmint.com/move-home-directory-to-new-partition-disk-in-linux/
  2. https://www.tecmint.com/parted-command-to-create-resize-rescue-linux-disk-partitions/