Month: 十一月 2018

使用 Nomad 编排服务

2019-01-02 更新:相对于 Kubernetes 来说,Nomad 还是太简陋了,弃坑

Nomad 是 HashiCorp 出品的一个容器编排服务,相较于重量级的 Kubernetes 来说,Nomad 的特点在于

1. 轻量级,只有一个二进制文件。K8s 的安装可能就要花上半天,在国内还有万恶的防火墙问题。
2. 概念也比较清晰,专注于任务的调度的编排,而不像 Kubernetes 一样引入了各种五花八门的概念。
3. 除了编排容器之外,Nomad 还可以直接编排普通应用,使用 cgroups 安全运行应用。

# 安装

从官网下载二进制文件,复制到 /usr/local/bin 就好了,不再赘述

# 使用

“`
$ sudo nomad agent -dev

$ nomad node status
ID DC Name Class Drain Eligibility Status
171a583b dc1 nomad false eligible ready

$ nomad server members
Name Address Port Status Leader Protocol Build Datacenter Region
nomad.global 127.0.0.1 4648 alive true 2 0.7.0 dc1 global
“`

# Job

Nomad 的调度单元称作 Job,Job 分为了三种类型:

1. Batch,也就是一次批处理,程序运行之后就结束了。不过也可以通过 cron 字段指定任务定期运行
2. Service,程序是一个常驻内存的服务,如果退出之后,Nomad 会按照给定的策略重启
3. System,在每一个 Nomad 节点上都需要运行的服务

Job 可以使用 HCL 文件来定义,HCL 文件在语义上和 JSON 是等价的,只不过是省去了一些多余的引号逗号之类的。也可以使用 JSON 文件来定义。

## 创建一个新的 Job

创建一个空白的 job 文件

“`
$ nomad job init
Example job file written to example.nomad
“`

打开生成的 example.nomad 文件,我们看到生成了一大推配置,默认定义了一个 redis 服务器的 job。Job 中包含了 Group,Group 中包含了 Task,task 可以认为是我们最终需要运行服务的那个命令。比如这里就是定义了运行 redis:3.2 这个 docker 镜像。

“`
task “redis” {
# The “driver” parameter specifies the task driver that should be used to
# run the task.
driver = “docker”

# The “config” stanza specifies the driver configuration, which is passed
# directly to the driver to start the task. The details of configurations
# are specific to each driver, so please see specific driver
# documentation for more information.
config {
image = “redis:3.2”
port_map {
db = 6379
}
}
“`

我们可以运行一下这个 job

“`
-> % nomad job run example.nomad
==> Monitoring evaluation “4f5559e0”
Evaluation triggered by job “example”
Allocation “98959767” created: node “ecf9f7cd”, group “cache”
Evaluation within deployment: “e66e0957”
Evaluation status changed: “pending” -> “complete”
==> Evaluation “4f5559e0” finished with status “complete”
“`

然后查看一下 job 的运行状态:

“`
$ nomad status example

Allocations
ID Node ID Task Group Version Desired Status Created Modified
8ba85cef 171a583b cache 0 run running 5m ago 5m ago
“`

在最下面一行我们可以看到 Allocation 的状态。Allocation 可以理解为一个 Job 的一个实例化。

我们可以再查看这个 Alloc 的状态:

“`
$ nomad alloc status 8ba85cef

Recent Events:
Time Type Description
10/31/17 22:58:49 UTC Started Task started by client
10/31/17 22:58:40 UTC Driver Downloading image redis:3.2
10/31/17 22:58:40 UTC Task Setup Building Task Directory
10/31/17 22:58:40 UTC Received Task received by client
“`

查看 Alloc 的日志

“`
$ nomad alloc logs 8ba85cef redis
“`

## 修改 Job

比如说,我们可以把这个 Job 中 cache task group 需要运行的副本数量改为 3

“`
count = 3
“`

使用 nomad job plan 来 dry run 一下。

“`
$ nomad job plan example.nomad

+/- Job: “example”
+/- Task Group: “cache” (2 create, 1 in-place update)
+/- Count: “1” => “3” (forces create)
Task: “redis”

Job Modify Index: 7
To submit the job with version verification run:

nomad job run -check-index 7 example.nomad

“`

注意到其中返回了一个 check-index 这个是为了避免同时更改同一个 job 造成冲突。

“`
$ nomad job run -check-index 7 example.nomad
“`

# 集群

在生产环境中,我们当然应该使用集群模式,而不是单机。nomad 可以直接利用 consul 来实现 bootstrap 集群。

服务端配置:

“`
# /etc/nomad.d/server.hcl

data_dir = “/etc/nomad.d”

server {
enabled = true
bootstrap_expect = 3
}
“`

启动:

“`
$ nomad agent -config=/etc/nomad.d/server.hcl
“`

客户端配置:

“`
# /etc/nomad.d/client.hcl

datacenter = “dc1”
data_dir = “/etc/nomad.d”

client {
enabled = true
}
“`

启动:

“`
$ nomad agent -config=/etc/nomad.d/client.hcl
“`

Python Redis 客户端连接池解析

Python Redis 的客户端使用了链接池机制,通过复用链接可以减低服务器的压力并在失败时重试。连接池其实是一种很通用的机制,在实现客户端是是一个经常需要(或许其实不需要)重复发明的轮子。

Redis 客户端一共涉及到了三个类:

– Connection,表示一个到服务器的链接
– ConnectionPool,链接池
– Redis,使用连接池,并在失败时重试

# Connection 类解析

Connection 类主要负责建立和 Redis 服务器的一个 Socket 链接,并且沟通相关信息。下面的代码是 Connection 类和 socket 处理相关的代码。

“`
class Connection(object):

def __del__(self):
try:
self.disconnect()
except Exception:
pass

def connect(self):
“””
连接 Redis 服务器
“””
if self._sock:
return
try:
sock = self._connect()
except socket.timeout:
raise TimeoutError(“Timeout connecting to server”)
except socket.error:
e = sys.exc_info()[1]
raise ConnectionError(self._error_message(e))

self._sock = sock
try:
self.on_connect()
except RedisError:
# clean up after any error in on_connect
self.disconnect()
raise

# run any user callbacks. right now the only internal callback
# is for pubsub channel/pattern resubscription
for callback in self._connect_callbacks:
callback(self)

def _connect(self):
“””
建立链接的具体过程, 主要是 socket 操作
“””

def disconnect(self):
“””
关闭链接
“””
self._parser.on_disconnect()
if self._sock is None:
return
try:
self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close()
except socket.error:
pass
self._sock = None

def send_packed_command(self, command):
if not self._sock:
self.connect()
。。。
# 发送命令到服务器
“`

可以看出,Connection 类主要是在 socket 上的一层薄薄封装。当然,这个 Connection 不是线程安全的。

# ConnectionPool 类解析

redis.py 的代码中 ConnectionPool 分了两个类,基类 ConnectionPool,还有一个子类 BlockingConnectionPool。这里我感到有些不解,既然只有一个子类,不知道为什么还要分成两个类呢?可能是开始时候规划了好几个子类,最后只实现了一个吧……

其中 BlockingConnection 类不只是线程安全的,还是进程安全的。

“`
class ConnectionPool(object):
def __init__(self, connection_class=Connection, max_connections=None,
**connection_kwargs):
max_connections = max_connections or 2 ** 31
if not isinstance(max_connections, (int, long)) or max_connections < 0: raise ValueError('"max_connections" must be a positive integer') self.connection_class = connection_class self.connection_kwargs = connection_kwargs self.max_connections = max_connections self.reset() # 调用 reset 初始化一些属性 def reset(self): self.pid = os.getpid() # 通过 pid 检查实现进程安全 self._created_connections = 0 self._available_connections = [] # 直接使用一个 list 来存放连接 self._in_use_connections = set() self._check_lock = threading.Lock() def _checkpid(self): # 如果当前的 connection 是 fork 来的,直接关闭链接 if self.pid != os.getpid(): with self._check_lock: if self.pid == os.getpid(): # 另一个线程已经检查了,直接返回 return self.disconnect() self.reset() def get_connection(self, command_name, *keys, **options): # 从连接池中取一个连接,注意这里是弹出,也就是同一个链接只有一个用户使用 self._checkpid() try: connection = self._available_connections.pop() except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) return connection def make_connection(self): # 创建一个新的连接 if self._created_connections >= self.max_connections:
raise ConnectionError(“Too many connections”)
self._created_connections += 1
return self.connection_class(**self.connection_kwargs)

def release(self, connection):
# 使用完毕连接后需要显式调用 release 把连接归还到连接池中。
self._checkpid()
if connection.pid != self.pid:
return
self._in_use_connections.remove(connection)
self._available_connections.append(connection)

def disconnect(self):
# 断开所有连接
all_conns = chain(self._available_connections,
self._in_use_connections)
for connection in all_conns:
connection.disconnect()

class BlockingConnectionPool(ConnectionPool):
“””
这个连接池的实现是线程安全的
“””
def __init__(self, max_connections=50, timeout=20,
connection_class=Connection, queue_class=LifoQueue,
**connection_kwargs):

self.queue_class = queue_class # 使用一个队列来存放连接
self.timeout = timeout # 增加了超时功能
super(BlockingConnectionPool, self).__init__(
connection_class=connection_class,
max_connections=max_connections,
**connection_kwargs)

def reset(self):
self.pid = os.getpid()
self._check_lock = threading.Lock()

# 首先在队列中填满 None,后面会用到,这里很关键
self.pool = self.queue_class(self.max_connections)
while True:
try:
self.pool.put_nowait(None)
except Full:
break

# Keep a list of actual connection instances so that we can
# disconnect them later.
self._connections = []

def make_connection(self):
# 创建一个链接,貌似和上面的函数没有什么区别。。
connection = self.connection_class(**self.connection_kwargs)
self._connections.append(connection)
return connection

def get_connection(self, command_name, *keys, **options):
“””
获取一个新的连接,最长等待 timeout 秒

如果我们读取到的新连接是 None 的话,就会创建一个新的连接。因为我们使用的是 LIFO 队列,也就是栈,
所以我们优先得到的是已经创建的链接,而不是最开始放进去的 None。也就是我们只有在需要的时候才会创建
新的连接,也就是说连接数量是按需增长的。
“””
# 确保没有更换进程
self._checkpid()

# 尝试获取一个连接,如果在 timeout 时间内失败的话,抛出 ConnectionError
connection = None
try:
connection = self.pool.get(block=True, timeout=self.timeout)
except Empty:
# 需要注意的是这个错误并不会被 redis 捕获,需要用户自己处理
raise ConnectionError(“No connection available.”)

# 如果真的没有连接可用了,直接创建一个新的连接
if connection is None:
connection = self.make_connection()

return connection

def release(self, connection):
# 释放连接到连接池
self._checkpid()
if connection.pid != self.pid:
return

# Put the connection back into the pool.
try:
self.pool.put_nowait(connection)
except Full:
# perhaps the pool has been reset() after a fork? regardless,
# we don’t want this connection
pass

def disconnect(self):
# 释放所有的连接
for connection in self._connections:
connection.disconnect()
“`

# redis.Redis 类解析

Redis 类中使用了 ConnectionPool,如果没有显式创建的话,会自动创建一个线程池。所以每次你在使用 Redis 的时候,其实已经在使用线程池了。

“`
class Redis:
def __init__(self…):
if not connection_pool:
connection_pool = ConnectionPool(**kwargs)
self.connection_pool = connection_pool
def execute_command(self, *args, **options):
# 执行每条命令都会调用该方法
pool = self.connection_pool
command_name = args[0]
# 弹出一个连接
connection = pool.get_connection(command_name, **options)
try:
# 尝试调用 redis
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
except (ConnectionError, TimeoutError) as e:
# 如果是连接问题,关闭有问题的连接,下面再次使用这个连接的时候会重新连接。
connection.disconnect()
if not connection.retry_on_timeout and isinstance(e, TimeoutError):
raise
# 再次尝试调用 redis
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
finally:
# 不管怎样都要把这个连接归还到连接池
pool.release(connection)
“`

HAR 格式解析

HAR(HTTP Archive) 文件是一种常见的用来保存 HTTP 请求和响应的格式。本质上,HAR 文件其实就是一个 JSON 文件。

每一个 HAR Entry 都可以有以下记录存在:

– log
– creator
– browser
– pages
– pageTimings
– entries
– request
– queryString
– postData
– params
– response
– cookies
– headers
– content
– cache
– timings

# log

这个是一个 HAR 文件的根字段,其他字段都是该字段的子字段

“`
{
“log”: {
“version” : “1.2”,
“creator” : {},
“browser” : {},
“pages”: [],
“entries”: [],
“comment”: “”
}
}
“`

# creator

“`
“creator”: {
“name”: “Firebug”,
“version”: “1.6”,
“comment”: “”
}
“`

# browser

同 creator 结构完全一样

# pages

“`
“pages”: [
{
“startedDateTime”: “2009-04-16T12:07:25.123+01:00”,
“id”: “page_0”,
“title”: “Test Page”,
“pageTimings”: {…},
“comment”: “”
}
]
“`

## pageTimings

“`
“pageTimings”: {
“onContentLoad”: 1720,
“onLoad”: 2500,
“comment”: “”
}
“`

# entries

“`
“entries”: [
{
“pageref”: “page_0”,
“startedDateTime”: “2009-04-16T12:07:23.596Z”,
“time”: 50,
“request”: {…},
“response”: {…},
“cache”: {…},
“timings”: {},
“serverIPAddress”: “10.0.0.1”,
“connection”: “52492”,
“comment”: “”
}
]
“`

## request

“`
“request”: {
“method”: “GET”,
“url”: “http://www.example.com/path/?param=value”,
“httpVersion”: “HTTP/1.1”,
“cookies”: [],
“headers”: [],
“queryString” : [],
“postData” : {},
“headersSize” : 150,
“bodySize” : 0,
“comment” : “”
}
“`

### queryString

“`
“queryString”: [
{
“name”: “param1”,
“value”: “value1”,
“comment”: “”
},
{
“name”: “param1”,
“value”: “value1”,
“comment”: “”
}
]
“`

### postData

“`
“postData”: {
“mimeType”: “multipart/form-data”,
“params”: [],
“text” : “plain posted data”,
“comment”: “”
}
“`

#### params

“`
“params”: [
{
“name”: “paramName”,
“value”: “paramValue”,
“fileName”: “example.pdf”,
“contentType”: “application/pdf”,
“comment”: “”
}
]
“`

## response

“`
“response”: {
“status”: 200,
“statusText”: “OK”,
“httpVersion”: “HTTP/1.1”,
“cookies”: [],
“headers”: [],
“content”: {},
“redirectURL”: “”,
“headersSize” : 160,
“bodySize” : 850,
“comment” : “”
}
“`

### content

“`
“content”: {
“size”: 33,
“compression”: 0,
“mimeType”: “text/html; charset=utf-8”,
“text”: “\n”,
“comment”: “”
}
“`

### cookies

“`
“cookies”: [
{
“name”: “TestCookie”,
“value”: “Cookie Value”,
“path”: “/”,
“domain”: “www.janodvarko.cz”,
“expires”: “2009-07-24T19:20:30.123+02:00”,
“httpOnly”: false,
“secure”: false,
“comment”: “”
}
]
“`

### headers

“`
“headers”: [
{
“name”: “Accept-Encoding”,
“value”: “gzip,deflate”,
“comment”: “”
},
{
“name”: “Accept-Language”,
“value”: “en-us,en;q=0.5”,
“comment”: “”
}
]
“`

## cache

“`
“cache”: {
“beforeRequest”: {},
“afterRequest”: {},
“comment”: “”
}
“`

### beforeRequest

“`
“beforeRequest”: {
“expires”: “2009-04-16T15:50:36”,
“lastAccess”: “2009-16-02T15:50:34”,
“eTag”: “”,
“hitCount”: 0,
“comment”: “”
}
“`

## timings

“`
“timings”: {
“blocked”: 0,
“dns”: -1,
“connect”: 15,
“send”: 20,
“wait”: 38,
“receive”: 12,
“ssl”: -1,
“comment”: “”
}
“`

# 参考资料

1. http://www.softwareishard.com/blog/har-12-spec/#response

Python 环境变量的一个坑

Python 中可以使用 os.environ 操作环境变量,前几天看到了其他几个函数 os.getenv 和 os.putenv。然而 os.putenv 是一个大坑,os.putenv 之后,在后面的 os.getenv 中并不能读出来。囧

# 参考

1. https://mail.python.org/pipermail/python-list/2013-June/650294.html

网页更新与重抓策略

我们知道网页总是会更新的。在大规模的网络爬取中,一个很重要的问题是重抓策略,也就是在什么时候去重新访问同一个网页已获得更新。要获得这个问题的解,需要满足如下两个条件:

1. 尽可能地少访问,以减少自身和对方站点的资源占用
2. 尽可能快的更新,以便获得最新结果

这两个条件几乎是对立的,所以我们必须找到一种算法,并获得一个尽可能优的折衷。

可以使用泊松过程:https://stackoverflow.com/questions/10331738/strategy-for-how-to-crawl-index-frequently-updated-webpages

如何调试 Python 的 Core Dump

如果需要记录 Core Dump 的原因,首先需要使用 faulthandler 参数启动 Python

“`
python -X faulthandler main.py
“`

出 core 之后,可以使用 gdb 调试

“`
gdb python core
“`

参考

1. https://stackoverflow.com/questions/2663841/python-tracing-a-segmentation-fault/2664232#2664232

Linux 下分区并挂载磁盘

# 分区

“`
parted -s -a optimal /dev/sda mklabel gpt — mkpart primary ext4 1 -1s
“`

# 创建文件系统

“`
mkfs.ext4 /dev/sda1
“`

# 查看分区结果

“`
parted -l
“`

# 复制数据

首先挂载到临时分区

“`
mount /dev/sdb1 /mnt
“`

把之前的数据考进去

“`
# rsync -av /home/* /mnt/
OR
# cp -aR /home/* /mnt/
“`

校验数据

“`
diff -r /home /mnt
“`

删除老数据

“`
rm -rf /home/*
“`

“`
umount /mnt
“`

# 挂载

“`
mount /dev/sdb1 /home
“`

写入到 fstab 中

“`
blkid /dev/sdb1

/dev/sdb1: UUID=”e087e709-20f9-42a4-a4dc-d74544c490a6″ TYPE=”ext4″ PARTLABEL=”primary” PARTUUID=”52d77e5c-0b20-4a68-ada4-881851b2ca99″
“`

在 /etc/fstab 中增加

“`
UUID=e087e709-20f9-42a4-a4dc-d74544c490a6 /home ext4 defaults 0 2
“`

每一列的含义如下

“`

UUID – specifies the block device, you can alternatively use the device file /dev/sdb1.
/home – this is the mount point.
etx4 – describes the filesystem type on the device/partition.
defaults – mount options, (here this value means rw, suid, dev, exec, auto, nouser, and async).
0 – used by dump tool, 0 meaning don’t dump if filesystem is not present.
2 – used by fsck tool for discovering filesystem check order, this value means check this device after root filesystem.
“`

# 调整分区大小

首先使用 parted 打开对应的磁盘

“`
tiger@iZ8vbe91kz7sqlvkjdu8p6Z:~$ sudo parted
GNU Parted 3.2
Using /dev/vda
Welcome to GNU Parted! Type ‘help’ to view a list of commands.
(parted) select /dev/vdc
Using /dev/vdc
(parted) resizepart
Partition number? 1
Warning: Partition /dev/vdc1 is being used. Are you sure you want to continue?
Yes/No? yes
End? [107GB]? 1100G
(parted) print
Model: Virtio Block Device (virtblk)
Disk /dev/vdc: 1100GB
Sector size (logical/physical): 512B/512B
Partition Table: msdos
Disk Flags:

Number Start End Size Type File system Flags
1 1049kB 1100GB 1100GB primary ext4
“`

然后使用 resize2fs 重新调整分区大小

“`
resize2fs /dev/vdb1
“`

# 参考

1. https://www.tecmint.com/move-home-directory-to-new-partition-disk-in-linux/
2. https://www.tecmint.com/parted-command-to-create-resize-rescue-linux-disk-partitions/