Author: 逸飞

面试刷题总结(六)-栈和队列

栈本身是一种特别简单的数据结构,这里就不多说了,主要记录一些以前不会的题目。

单调栈

单调栈就是满足单调性的栈。比如递增的栈,如果新的元素比栈顶的元素小的话,就要一直弹出,直到找到满足条件的栈顶,然后再入栈。

单调队列

单调队列使用两个队列组成。其中一个是普通的队列,另一个是维护大小顺序的队列。这里的队列不是严格的队列,而是可以从尾部弹出的双端队列。

from collections import deque

class MonoQueue:
    def __init__(self):
        self.q = deque()  # 实际储存数据
        self.m = deque()  # 维护单调关系,队首元素总是最大值

    def push(self, x):
        self.q.append(x)
        while len(self.m) > 0 and self.m[-1] < x:
            self.m.pop()
        self.m.append(x)

    def pop(self):
        x = self.q.popleft()
        if self.m[0] == x:
            self.m.popleft()
        return x

    def __len__(self):
        return len(self.q)

    def top(self):
        return self.m[0]

例题

单调队列的题目

  • LC84. Largest Rectangle in Histogram
  • LC239. Sliding Window Maximum
  • LC739. Daily Temperatures
  • LC862. Shortest Subarray with Sum at Least K
  • LC901. Online Stock Span
  • LC907. Sum of Subarray Minimums

POJ3250 Bad Hair Day

有 N 头牛从左到右排成一排,每头牛有一个高度 h[i],设左数第 i 头牛与「它右边第一头高度 >= h[i]」的牛之间有 c[i] 头牛,试求 sum(c[i])。

这道题使用单调栈做。

def sum_distance(H):
    ans = 0
    stack = []
    for h in H:
        # 单调递减的栈
        while stack and stack[-1] < h:
            stack.pop()
        ans += len(stack)
        stack.append(h)
    return ans

参考资料

  1. https://oi-wiki.org/ds/monotonous-stack/
  2. https://www.hankcs.com/program/algorithm/poj-3250-bad-hair-day.html
  3. https://oi-wiki.org/ds/monotonous-queue/
  4. https://oi.men.ci/monotone-queue-notes/

前缀和

前缀和这个概念可以说是「会者不难,难者不会」,概念本身是很简单的,但是如果不知道的话,好多题就两眼一抹黑了。前缀和即为数组从 0 到当前位置的和,类似的概念还有后缀和,前缀积,后缀积等。

presum[0] = A[0]
presum[i] = presum[i-1] + A[i]

例题

求数组中绝对值最小的子数组

利用前缀和的性质:A[i] + A[i+1] + ... + A[j] = presum[j] - presum[i-1] 前缀和排序,取差值最小值。

有一道题,求两个时间之间的差值,其实是类似的。当差值直接累加计算不好求的时候可以转换为前缀和的差。

把一个数组从中间 p 位置分开,使得 a[0]+…+a[p-1] 与 a[p]+a[p+1]+…+a[n-1] 差值最小?

也就是使得:presum - (total - presum) = 2 * presum - total 最小。

参考资料

  1. https://www.cnblogs.com/AndyJee/p/4474073.html

面试刷题总结(十)- 指针类题目与窗口

链表的指针题目

一般使用 dummy head 的话,循环条件是 while p.next, 而不使用的话,则是 while p.

递归反转列表

int reverseList(ListNode node) {
    if (node == null || node.next == null) {
        return node;
    }
    ListNode head = reverseList(node.next);
    node.next.next = node;
    node.next = null;
    return head;
}

都比较简单,主要是细节的使用。

  • LeetCode 24
  • LeetCode 25
  • LeetCode 206

参考资料

  1. https://mp.weixin.qq.com/s/YFe9Z35CE4QWPDmPNitd4w

使用 Prometheus 监控应用数据

Prometheus 是使用 Go 语言开发的一个监控工具和时序数据库,它的实现参考了 Borgmon。监控系统大体来说分两种模式,push 和 pull。push 模式就是应用程序主动把监控数据推送到监控服务,pull 模式就是监控服务来主动拉取应用的数据。Prometheus 采用的是 Pull 模式。

对于自己编写的应用,可以使用 prometheus 的 sdk 来自己提供 metrics,对于开源的软件,可以使用对应的 exporter .

监控系统基础原则

  • 尽量简单,不要上来就想搞个大新闻,喧宾夺主
  • 告警也尽量简单,只发需要处理的告警
  • 简单的架构就是最好的架构,业务系统都挂了,监控也不能挂。

不要想着把所有的数据都显示到监控上,太多了反倒是让人失去了重点。想象一下最容易出错的情况,以及在这种情况下你应该怎么用监控来排错。

  • 一个控制台不要有超过 5 个图。
  • 每个图上不要有超过五条线。当然堆栈图和饼形图除外。
  • 当使用提供的模板时,避免在右手边的表格里有多过 20-30 个条目。

系统的每个部分都应该有一个监控,至少让你大概知道这个系统现在的情况如何。

也不要想着把特别复杂的业务数据画到监控系统上,监控是监控,业务是业务,不能相互替代。

Prometheus 的指标类型

Prometheus 常用的有四种类型:计数器 (counter), 刻度 (gauge), 直方图 (histogram), 摘要 (summary).

计数器只增不减,用来记录一件事情发生了多少次,可以使用 rate(some_counter[interval])(具体含义后面会说到)来计算一件事情的速率。Counter 类型主要是为了 rate 而存在的,即计算速率,单纯的 Counter 计数意义不大,因为 Counter 一旦重置,总计数就没有意义了。rate 会自动处理 Counter 重置的问题,Counter 的任何减少也会被视为 Counter 重置。

Gauges 可以被设定,可以增高,可以减小。用来记录状态,比如正在进行的请求的数量,空闲内存数,温度等。对于 gauge 值,不要使用 rate. 可以使用 max_over_time 等来处理 gauge 数据。

Histogram 和 Summary 都是采样观测量,典型的比如请求的时间和相应的体积等等。他们记录观测的数量和观测的所有值,允许你计算平均值等。Histogram 实际上是一个复合值,由三部分组成,一部分是观测到的值,存在不同的 bucket 中,而 bucket 的大小则由用户指定,默认情况下是观测一个网页请求的延迟。观测的数量(也就是_count) 变量是一个 counter 类型的值,观测的和(也就是_sum) 变量也类似一个 counter, 当观测值没有负数的时候。显然响应时间和响应体积都是正的。

Histogram 类型数据最常用的函数是 histogram_quantile 了,可以用来计算 P95,P99 等数据。

如果有一个观测值叫做 http_request_duration_seconds, 那么要计算刚过去的 5 分钟内的平均时长可以这样算:

# 先不用理解,后边会讲到
rate(http_request_duration_seconds_sum[5m]) / rate(http_request_duration_seconds_count[5m])

Histogram 在服务端计算,Summary 在客户端计算并且不能被重新计算。如果可能的话,最好使用 Histogram, 不要使用 summary.

另外,Prometheus 支持 labels, 也就是标签,这样就可以很好地查询过滤指标,而不需要创建很多的指标了。

输出指标到 Prometheus

这里以 Python 为例。

pip install prometheus_client

Counter

from prometheus_client import Counter

# 按照 Prometheus 的最佳实践, counter 类型的数据后缀是 _total
# prometheus 客户端会智能处理 _total 后缀,在后台总是有 _total 后缀的
c = Counter("http_request_failures_total", "http 请求出错计数")
c.inc()  # 默认是 1
c.inc(2)  # 也可以指定数字

Counter 还有一个方便的属性,叫做 count_exceptions, 可以用作装饰器或者 with 语句中。

@c.count_exceptions()
def f():
    pass

with c.count_exceptions():
    pass

with c.count_exceptions(ValueError):
    pass

Gauge

from prometheus_client import Gauge

g = Gauge("cpu_usage", "CPU 使用率")
g.inc()
g.dec(10)
g.set(4.2)

Gauge 也有一些方便的辅助函数,比如说 track_inprogress 用来记录正在执行的数量。

 g.set_to_current_time()

 # Increment when entered, decrement when exited.
@g.track_inprogress()
def f():
  pass

with g.track_inprogress():
  pass

也可以给 gauge 设定一个回调函数来取值:

d = Gauge('data_objects', 'Number of objects')
my_dict = {}
d.set_function(lambda: len(my_dict))

Histogram

值得注意的是,histogram 默认定义的 buckets 大小是为了正常的网页请求设计的,也就是围绕着一秒的一些数据。如果我们需要观测一些其他的值,那么需要重新定义 buckets 的大小。

一般来说,buckets 是呈指数分布的,中间值为最常见的典型值,这样可以更好地拟合实际的分布(幂次分布)。因为 buckets 是以 label 的形式实现的,所以 buckets 最好也不要超过十个。

from prometheus_client import Histogram

h = Histogram()
h.observe(4.7)

@h.time()
def f():
  pass

with h.time():
  pass

# 默认的 buckets[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
h = Histogram(buckets=[1, 10, 100])

标签导出

如果要导出标签的话,需要使用 labels 方法

from prometheus_client import Counter
c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
c.labels(method='get', endpoint='/').inc()
c.labels(method='post', endpoint='/submit').inc()

HTTP 服务器

前面我们提到 Prometheus 是采用的拉模型,那么从哪儿拉数据呢?需要我们的程序开启一个 http 的服务器,这样 Prometheus 才能来拉取数据。

如果实在普通的脚本里面,可以这样:

from prometheus_client import start_http_server

start_http_server(8000)

如果本身就是个 web 服务器,那么直接 mount 导一个路径就好了。不过实际上这是不可用的,因为生产中的服务器都是多进程的,而 Prometheus 的 Python 客户端不支持多进程。

from flask import Flask
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from prometheus_client import make_wsgi_app

# Create my app
app = Flask(__name__)

# Add prometheus wsgi middleware to route /metrics requests
app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {
    '/metrics': make_wsgi_app()
})

很遗憾的是, Prometheus 的 Python 客户端对于多进程的支持不好.

使用 PromQL 查询指标

数据类型

在 Prometheus 中有四种数据类型,分别是:数字,字符串,直接向量 (instant vector) 和区间向量 (range vector).

数字和字符串就不用说了,重点说一下后两个向量。直接向量其实就是指标,比如说 http_request_count, 他就是一个一维的时间向量。而区间向量其实是二维的,在每一个时间点都是一个向量。

那么怎么生成区间向量呢?使用 [] 操作符。比如说 http_requests_total[5m], 表示在每个时间点,该时间点过去五分钟的时间序列,也就是二维的。那么区间向量有什么用呢?答案很简单:给 rate 函数使用。

比如说,我们常见的计算网页 qps 的函数:rate(http_requests_toal[5m]), 意思就是,在每个时间点都取前五分钟的统计数据计算访问速率,实际上这不就是求导么,而 5m 就是其中 dx 的取值。但是和微分不一样的是,dx 肯定不是越小越好,因为 Prometheus 抓取数据有间隔,所以显然不能小于抓取间隔,一般取抓取间隔的 4 倍左右,5m 就是个很好的值。采样周期 5m 如果设置的大一些,图像就会更平滑,如果小一些就会更精确。

官方建议将 Rate 计算的范围向量的时间至少设为抓取间隔的四倍。这将确保即使抓取速度缓慢,且发生了一次抓取故障,也始终可以使用两个样本。此类问题在实践中经常出现,因此保持这种弹性非常重要。例如,对于 1 分钟的抓取间隔,您可以使用 4 分钟的 Rate 计算,但是通常将其四舍五入为 5 分钟。

查询语法

使用 {} 来过滤指标, 大概相当于 SQL 中的 where 子句。除了 = 之外,还有 !==~(正则) 和 !~(不匹配)

<metric name>{<label name>=<label value>, ...}

比如:

api_http_requests_total{method="POST", handler="/messages"}

如果要查询历史数据可以使用 offset xx 来查询。比如下面这条表示比过去一个小时的 gc fraction 还要大 1.5 倍的数据。

go_memstats_gc_cpu_fraction > 1.5 * (go_memstats_gc_cpu_fraction offset 1h)

使用 by 关键字可以聚合字段:

# sum+rate 其实是求和的意思(求导再积分), 然后按照 instance 聚合
sum(rate(node_network_receive_bytes_total[5m])) by (instance)

常用函数

使用 Dashboard 展示指标

可视化界面

Prometheus 自带了在 /graph 下有一个 expression browser, 可以绘制一些简单的图形,除此之外还是建议使用 grafana.

数据采集配置

global:
  scrape_interval: 15s

scrape_configs:
  - job_name: prometheus
    static_configs:
      - targets: ["localhost:9090"]

对每一个 job 都会自动生成一些指标:

up{job="<job-name>", instance="<instance-id>"}: 1 if the instance is healthy, i.e. reachable, or 0 if the scrape failed.
scrape_duration_seconds{job="<job-name>", instance="<instance-id>"}: duration of the scrape.
scrape_samples_post_metric_relabeling{job="<job-name>", instance="<instance-id>"}: the number of samples remaining after metric relabeling was applied.
scrape_samples_scraped{job="<job-name>", instance="<instance-id>"}: the number of samples the target exposed.
scrape_series_added{job="<job-name>", instance="<instance-id>"}: the approximate number of new series in this scrape.

其中的 up 指标可以用来监控目标服务是否正常运行

报警

Prometheus 使用 AlertManager 做告警。
可以使用 predict_linear 等函数基于预测的做一些报警。

如何选择监控指标

首先问自己一个问题:当我的程序出了问题的时候,我需要哪些数据来 debug 呢?

Google SRE Book 中提出了四个黄金原则:延迟、流量、错误数、饱和度(需要排队无法提供服务的时间)。实际使用中对于资源可以使用 USE 指标,对于在线服务可以使用 RED 指标。

  • USE 指标:Utilization、Saturation、Errors。如 Cadvisor 数据
  • RED 指标:Rate、Errors、Duration。如 Apiserver 性能指标

被监控服务的类型

就监控而言,服务大概可以分为三类:在线服务,离线处理 和 跑批任务。

在线服务

此类系统的关键指标在于 QPS, 错误率和延迟。正在进行中的请求的数量也有用。

在线服务系统在客户端和服务端都应该做监控。如果两遍有不同的行为,那么这个对调试是很有意义的。如果一个服务有很多客户端,也不可能让服务监控每个客户端,所以客户端肯定需要依赖自己的数据。

当你按照 query 开始或结束统计数量一定要使用一致的标准。推荐使用结束来作为标准,因为比较容易实现,而且能统计错误和延迟。

离线系统

对每一个 stage, 记录进入的 item 的数量,有多少在处理中,上次你处理某个东西的时间,多少 item 被发送出去。如果你采用的是批处理,也应该记录进出的批的数量。

更好的方法是通过系统发送一个心跳包:一些带着时间戳的 dummy item 通过整个系统。每个 stage 都输出他看到的最近的时间戳,这样你就知道一个 item 需要多长时间才能经过整个系统了。

批操作

关键指标是上次成功操作的时间。
This should generally be at least enough time for 2 full runs of the batch job. For a job that runs every 4 hours and takes an hour, 10 hours would be a reasonable threshold. If you cannot withstand a single run failing, run the job more frequently, as a single failure should not require human intervention.

对于其他的子系统而言,可以选择如下指标

第三方库

如果一个库会访问进程外的资源,比如网络硬盘等等,至少要记录下所有的访问次数,错误和延迟。

Depending on how heavy the library is, track internal errors and latency within the library itself, and any general statistics you think may be useful.

日志

As a general rule, for every line of logging code you should also have a counter that is incremented. If you find an interesting log message, you want to be able to see how often it has been happening and for how long.
一个比较通用的规则,对于每一条日志,应该有一个计数器。如果你发现了一条有有意思的信息,你肯定想知道这件事

错误

Failures should be handled similarly to logging. Every time there is a failure, a counter should be incremented. Unlike logging, the error may also bubble up to a more general error counter depending on how your code is structured.

线程池

对于所有的线程池来说,核心指标是排队的请求的数量,正在使用的线程的数量,总线程的数量,已经处理的任务的数量和处理任务花费的时间,以及任务排队花费的时间。

缓存

缓存核心指标是总的查询数,命中数,总的延迟以及缓存所对应的线上系统的查询数量,错误率,延迟。

合理使用标签

比如说,不要创建 httpresponse500total 和 httpresponse403total 这种指标,创建一个 httpresponsetotal 指标,然后使用不同的状态码作为标签。然后你就可以把整个 metric 作为一个规则和图表。

但是也不要滥用标签,前往不要用 IP 或者 email 这种信息来做标签,因为他们可能是无限的。这时候就不应该用监控系统了,可能需要一些 OLAP 的分析工具了。

总的来说,把 metrics 的秩 (cardinality) 控制在 10 以下。整个系统要控制超过 10 的 metric 的数量。绝大多数的查询不应该有标签。

为了避免秩过高的监控数据,可以添加如下的报警规则:

# 统计每个指标的时间序列数,超出 10000 的报警
count by (__name__)({__name__=~".+"}) > 10000

“坏指标”报警出来之后,就可以用 metricrelabelconfig 的 drop 操作删掉有问题的 label(比如 userId、email 这些一看就是问题户)

如果不确定的话,首先别用标签,有了真实的 use case 再添加。

参考

  1. Should I run prometheus in a Docker?
  2. Logs and metrics and graphs, oh my!
  3. developerWorks 上的入门文档
  4. https://blog.frognew.com/2017/05/prometheus-intro.html
  5. https://github.com/yolossn/Prometheus-Basics
  6. https://mp.weixin.qq.com/s/sr8AxTMZTjUoe1XYrbRgyw
  7. https://zhuanlan.zhihu.com/p/24811652
  8. https://mp.weixin.qq.com/s?__biz=MzI4NTA1MDEwNg==&mid=2650782456&idx=1&sn=654615ca4199514687ae8ec65444dec9
  9. https://medium.com/@valyala/promql-tutorial-for-beginners-9ab455142085
  10. https://github.com/prometheus/client_python
  11. http://www.xuyasong.com/?p=1717
  12. https://www.section.io/blog/prometheus-querying/
  13. https://github.com/danielfm/prometheus-for-developers#monitoring-uptime

JavaScript 可视化库调研

核心关注指标:

  1. 好看吗?
  2. 支持多少数据,性能如何,内存占用如何?
  3. 开发活跃度
  4. 能否交互
  5. 是否支持 react
  6. 渲染后端是什么,基于 SVG 还是 canvas 还是 HTML?
  7. License,GPL 的不能要
  8. 支持绘制图形的种类

综合

  1. Vega Vega 很全面,几乎包括了所有的图形样式。
  2. nivo 基于 react 和 d3。支持的图形不少。
  3. echarts 百度出品,国内用的比较多,但是感觉有点丑。据说 bug 也比较多。

统计常用图

绝大多数的图还是画折线图这些的,大部分的库也是做这个的。

  1. recharts 基于 React 和 D3.js。使用 SVG,只支持 line chart,bar chart 这些比较常见的。
  2. reactviz 基于 react,Uber 出品,也是常见的统计图
  3. chartist 亮点是有动画,没有依赖,体积特别小。支持的图比较少
  4. nvd3 这个看起来确实不错,支持的图表类型一般,基于 d3.js。
  5. chart.js 支持的数量也比较少,主要是 line chart 和 bar chart. 这个可能是标星最多的了。
  6. xkcd 风格的图表

以下为不推荐的库:

  1. apexchart 似乎是 fusion chart 的一个开源版本。https://github.com/apexcharts/apexcharts.js
  2. uvcharts.js 开发很不活跃,才 200 个星星
  3. victory 没看出有什么特别吸引人的。
  4. chartbuilder https://github.com/Quartz/Chartbuilder。好几年没有更新了。而且比较丑。
  5. c3.js https://c3js.org/examples.html。基于 D3, 貌似图比较少。
  6. toast https://ui.toast.com/。韩国的一个东西,还包含了日历。

图(Graph)

这里的图指的是计算机科学上的图,也就是由节点和边构成的结构。

  1. sigma 用于绘制 graph 的 http://sigmajs.org/
  2. cytoscape https://js.cytoscape.org/

金融

  1. Lightweight Charts。比较小巧,适合绘制金融数据。
  2. http://dygraphs.com/gallery/。这个貌似只是画线条图的
  3. uplot。特点是非常小,不支持任何交互。主要是画时间序列的。
  4. dc.js。特点是支持 crossfilter。特别好看,不过支持的图不多。

其他

  1. plotly, plotly 是一个 Python 和 JavaScript 的绘图库。
  2. https://github.com/antvis/g2plot
  3. https://github.com/antvis/G2

关机了 cron job 怎么办,开机后还会再执行吗?

在回答标题的问题之前,我们先来看下 Cron 的实现。

Cron 是 *nix 系统中常见的有一个 daemon,用于定时执行任务。cron 的实现非常简单,以最常用的 vixie cron 为例,大概分为三步:

  1. 每分钟读取 crontab 配置
  2. 计算需要执行的任务
  3. 执行任务,主进程执行或者开启一个 worker 进程执行

Cron 的实现每次都是重新加载 crontab,哪怕计算出来下次可执行时间是 30 分钟之后,也不会说 sleep(30),这样做是为了能够在每次 crontab 变更的时候及时更新。

我们可以查看 vixie cron 的源码确认一下:

/* first-time loading of tasks */
load_database(&database);
/* run tasks set to be carried out after the system rebooted */
run_reboot_jobs(&database);
/* make TargetTime the start of the next minute */
cron_sync();
while (true) {
    /* carry out tasks, then go to sleep until the TargetTime adjusted to take into account the time spent on the tasks */
    cron_sleep(); // 在这里调用了 do_command,也就是实际执行任务
    /* reread configuration */
    load_database(&database);
    /* collect tasks for given minute */
    cron_tick(&database);
/* reset TargetTime to the start of the next minute */
    TargetTime += 60;
}

do_command 函数在 fork 之后子进程中实际执行需要执行的任务,实际上在 worker 中还会进行一次 fork,以便 setuid 变成 session leader,这里就不再赘述了:

switch (fork()) {
case -1:
    /*could not execute fork */
    break;
case 0:
    /* child process: just in case let’s try to acquire the main lock again */
    acquire_daemonlock(1);
    /* move on to deriving the job process */
    child_process(e, u);
    /* once it has completed, the child process shuts down */
    _exit(OK_EXIT);
    break;
default:
    /* parent process continues working */
    break;
}

cron 是没有运行记录的,并且每次都会重新加载 crontab,所以总体来说 cron 是一个无状态的服务。

在大多数情况下,这种简单的机制是非常高效且稳健的,但是考虑到一些复杂的场景也会有一些问题,包括本文标题中的问题:

  1. 如果某个任务在下次触发的时候,上次运行还没有结束怎么办?

    这个问题其实也就是也就是并发的任务是多少。如果定义并发为 1,也就是同一个任务只能执行一个实例,那么当任务运行时间超过间隔的时候,可能会造成延迟,但是好处是不会超过系统负载。如果定义并发为 n,那么可能会有多个实例同时运行,也有可能会超过系统负载。总之,这个行为是未定义的,完全看 cron 的实现是怎么来的。

  2. 当系统关机的时候有任务需要触发,开机后 cron 还会补充执行么?

    比如说,有个任务是「每天凌晨 3 点清理系统垃圾」,如果三点的时候恰好停电了,那么当系统重启之后还会执行这个任务吗?遗憾的是,因为 cron 是不记录任务执行的记录的,所以这个功能更不可实现了。要实现这个功能就需要记录上次任务执行时间,要有 job id,也就是要有执行日志。

  3. 如果错过了好多次执行,那么补充执行的时候需要执行多少次呢?

    这个问题是上一个问题的一个衍生。还是举清理垃圾的例子,比如说系统停机五天,那么开机后实际上不用触发五次,只需要清理一次就可以了。

Unix 上传统的 cron daemon 没有考虑以上三个问题,也就是说错过就错过了,不会再执行。为了解决这个问题,又一个辅助工具被开发出来了——anacron, ana 是 anachronistic(时间错误) 的缩写。anacron 通过文件的时间戳来追踪任务的上次运行时间。具体的细节就不展开了,可以参考文章后面的参考文献。

总之,如果只有 cron,那么不会执行错过的任务,但是配合上 anacron,还是有机会执行错过的任务的。

定时执行任务是一个普遍存在的需求,除了在系统层面以外,多种不同的软件中都实现了,我们可以认为他们是广义的 cron。这些广义的 cron 大多考虑了这些问题,下面以 apscheduler 和 kubernetes 为例说明一下。

apscheduler

apscheduler 是 Python 的一个库,用于周期性地触发单个任务调度,实际上我们完全可以用 apscheduler 来实现一个自己的 cron。

apscheduler 中的几个概念:

  • triggers,触发的计算引擎,apscheduler 除了支持 cron 之外,还支持 date 和 interval 两种;
  • job store,用于记录每次的运行结果,上次运行时间等,这样当有错过的任务时才能知道需要补充执行多少次。默认是记在内存里,不过也支持 redis, mongo, mysql;
  • executor,执行任务的 worker,常用的有 ThreadPoolExecutor 和 ProcessPoolExecutor, 也就是线城池和进程池;
  • scheduler, 把以上几个概念串联起来做调度。

apscheduler 的使用也非常简单,直接看函数名大概就知道了。

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
# scheduler.add_executor('processpool')  # 使用进程池,默认是线程池
# scheduler.add_jobstore("redis")  # 使用 redis 作为 job store, 默认是内存

scheduler.add_job(
    myfunc,  # 要执行的函数
    trigger='cron',  # 触发机制
    id='my_job_id',  # job_id
    args=[],   # 执行函数的参数
    kwargs={},  # 执行函数的字典参数
    )
scheduler.remove_job('my_job_id')
scheduler.pause_job('my_job_id')
scheduler.resume_job('my_job_id')
scheduler.reschedule_job("my_job_id")  # 感觉叫 modify_job 更好一点。所有属性都可以改,除了 ID

scheduler.start()
scheduler.pause()
scheduler.resume()
scheduler.shutdown()

apscheduler 如何处理上面的三个问题

  1. 可以通过 max_instances 参数设置最大执行的实例个数;
  2. 可以通过 misfire_grace_time 参数设置错过的任务的捞回时间,也就是在如果错过的时间不超过该值,就补充触发一次;
  3. 可以通过 coalesce 参数设置当需要执行多次的时候是否合并为执行一次。

另外需要注意的一点是,apscheduler 并没有像传统的 vixie cron 一样每分钟都会唤醒一次,而是会休眠到最近的可执行任务需要触发的时候。同时为了能在休眠期间增加任务,每次调用 add_job 的时候会直接唤醒 scheduler。

在计算下次可运行时间的时候,apscheduler 会维护一个按照下次触发时间排序的队列,插入新任务会采用二分查找位置插入(不过我感觉用堆好一点啊……)。当使用其他的外部 job store 的时候则会利用这些数据库的不同机制,比如 redis 中就会使用 zset。

apscheduler 还支持添加 event listener 获取 job 的运行信息:

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

K8S 中的 cron job

在 kubernetes 中,除了 deployment 以外,我们也可以构建一次性或者定时运行的 job。定时任务也是按照 crontab 的格式来定义的。

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: hello
spec:
  schedule: "*/1 * * * *"  # cron format
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: hello
            image: busybox
            args:
            - /bin/sh
            - -c
            - date; echo Hello from the Kubernetes cluster
          restartPolicy: OnFailure

在 K8S 中,我们可以通过 .spec.concurrencyPolicy 来控制最多有多少个实例运行。K8S 建议每个 cron job 最好是幂等的,以免并发执行造成不可预料的结果。可选参数为:

  • Allow(default),允许
  • Forbid, 不允许
  • Replace,干掉原来的,执行新的

当任务执行失败的时候,K8S 的行为非常令人迷惑,如果 .spec.startingDeadlineSeconds 没有设置的话,那么任务重试 100 次失败之后就彻底放弃了……WTF……关于这个具体实现不再赘述,可以参考后面的链接 9.

在现代的分布式系统中,除了定时任务之外,更重要的是不同的任务之间的执行次序和依赖关系,在后面的文章中,会介绍一下 airflow, luigi, argo 等工具的使用和实现。敬请期待。

PS. K8S 官方文档写得真是太烂了,典型的 over engineering。

参考资料

  1. https://serverfault.com/questions/52335/job-scheduling-using-crontab-what-will-happen-when-computer-is-shutdown-during
  2. https://apscheduler.readthedocs.io/en/latest/userguide.html
  3. https://badootech.badoo.com/cron-in-linux-history-use-and-structure-70d938569b40
  4. https://askubuntu.com/questions/848610/confused-about-relationship-between-cron-and-anacron
  5. https://www.digitalocean.com/community/tutorials/how-to-schedule-routine-tasks-with-cron-and-anacron-on-a-vps
  6. http://xiaorui.cc/archives/4228
  7. https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/
  8. https://medium.com/@hengfeng/what-does-kubernetes-cronjobs-startingdeadlineseconds-exactly-mean-cc2117f9795f
  9. https://stackoverflow.com/questions/51065538/what-does-kubernetes-cronjobs-startingdeadlineseconds-exactly-mean

亚马逊公司文化学习

Day 1

在头条的时候,公司经常强调“Day 1”意识,也就是把每天当做公司创业的第一天。开始以为是原创的,后来发现原来是从亚马逊学过来的——Jeff Bezos 在给股东的信中经常提到 “Always Day 1”。那么 Day 1 究竟是什么意思呢?

Leadership Principles

亚马逊还有一项重要的公司文化,那就是 Leadership Principles。这个规则如此重要,以至于他们会在面试的时候针对这里面的 14 个 Principles 提出问题。需要注意的是,虽然叫做 Leadership,但是这个是对每一个员工的要求,也就是每一个人都应该把自己当做领导者,而不是一颗螺丝钉。我想这个隐含的条件对许多公司来说都做不到吧?

原文有中文翻译,但是我认为翻译过于“雅”而没有做到“信”,好多信息都做不到忠于原文,所以我这里又自己翻译了一遍:

https://www.amazon.jobs/en/principles

偏执于客户

领导者从客户出发并回归于客户。他们努力工作来赢得和维护客户的信任。尽管领导者也会留意竞争对手的动态,他们更偏执于客户。

主人翁意识

领导者是主人翁。他们从长远出发思考,不会为了短期结果牺牲长远利益。他们从整个公司,而不只是自己团队的角度行动。他们从不会说:“这不是我的工作”。

发明并简化

领导者期待并且要求他们的团队能够创新和发明,并且总会找到能够简化的方法。他们了解外部,从各处寻找新的主意,也不会被“不是我发明的”限制住(也就是说会积极采纳别人的优秀观点和方法,而不是重新造轮子)。当我们研究新东西的时候,我们接受很可能长时间被误解这种情形。

能够做对,并且经常做对

领导者经常是对的。他们有很好的判断力和直觉。他们寻求不同的理念,也会质疑自己的理念。

Learn and Be Curious 学习而且有好奇心

Leaders are never done learning and always seek to improve themselves. They are curious about new possibilities and act to explore them.
领导者从不停止学习而且总是寻求提升自己。他们对新的可能性充满好奇并且会用实际行动去探索。

Hire and Develop the Best 招到并且培育最优秀的人

Leaders raise the performance bar with every hire and promotion. They recognize exceptional talent, and willingly move them throughout the organization. Leaders develop leaders and take seriously their role in coaching others. We work on behalf of our people to invent mechanisms for development like Career Choice.
领导者能把每次招聘和晋升的标准提高。他们能够发现不同常人的天才,并且愿意在整个组织中让他们轮岗。领导者培养领导者并且很认真地为其他人做教练。亚马逊提供 Career Choice 等服务来让大家能够提高自己(Career Choice 是亚马逊提供的培训服务,可以报销 95%)。

Insist on the Highest Standards
坚持最高标准
Leaders have relentlessly high standards — many people may think these standards are unreasonably high. Leaders are continually raising the bar and drive their teams to deliver high quality products, services, and processes. Leaders ensure that defects do not get sent down the line and that problems are fixed so they stay fixed.
领导者坚持高标准——以至于很多人觉得这些太高了。领导者持续提高标准并且驱动着他们的团队产出高质量的产品服务和流程。领导者确保不会像下游输送残次品,问题解决了就是解决了。

Think Big 想搞个大新闻

Thinking small is a self-fulfilling prophecy. Leaders create and communicate a bold direction that inspires results. They think differently and look around corners for ways to serve customers.
想得很小只是一个自我实现的预言。领导者创造并传达一个宽广并且多产的领域。他们的思路异于常人,并且总是在找能够服务与客户的边边角角。

Bias for Action 崇尚行动

Speed matters in business. Many decisions and actions are reversible and do not need extensive study. We value calculated risk taking. 
速度在商业世界中很重要。许多决定和行动都是可以撤销的,所以不需要过度深入研究。我们赞赏深思熟虑过的冒险行动。

Frugality 节俭

Accomplish more with less. Constraints breed resourcefulness, self-sufficiency, and invention. There are no extra points for growing headcount, budget size, or fixed expense.
用更少做到更多。捉襟见肘孕育组织多谋、自给自足和创新。增加人员编制、预算和固定支出并不会有额外收益。

Earn Trust 赢得信任

Leaders listen attentively, speak candidly, and treat others respectfully. They are vocally self-critical, even when doing so is awkward or embarrassing. Leaders do not believe their or their team’s body odor smells of perfume. They benchmark themselves and their teams against the best.
领导者认真倾听,坦率直言,尊重他人。即便会非常尴尬,他们也用于自我批评。领导者并不总是觉得自己一定是对的。他们以最高水平要求自己和团队。

Dive Deep 深入

Leaders operate at all levels, stay connected to the details, audit frequently, and are skeptical when metrics and anecdote differ. No task is beneath them.
领导者参与所有层级,他们了解细节,经常审计,当指标和描述有出入的时候会提出怀疑。没有任何事实能瞒过他们。

Have Backbone; Disagree and Commit 有脊梁;服从大局

Leaders are obligated to respectfully challenge decisions when they disagree, even when doing so is uncomfortable or exhausting. Leaders have conviction and are tenacious. They do not compromise for the sake of social cohesion. Once a decision is determined, they commit wholly.
当有不同意见的时候,领导者有责任不卑不亢地提出,尽管这么做可能会让人不舒服或者筋疲力尽。领导者应该有定力。他们不因为面子而折衷。一旦做出了决定,他们全力实现。

Deliver Results 输出结果

Leaders focus on the key inputs for their business and deliver them with the right quality and in a timely fashion. Despite setbacks, they rise to the occasion and never settle.
领导者聚焦于他们业务的关键输入,并且能够按时以正确的质量输出结果。尽管会有挫折,他们会迎难而上并且绝不妥协。

参考

  1. Amazon Letters to Shareholders
  2. Amazon Leadership Principles

面试刷题总结(三) – 深度优先搜索与回溯

回溯(backtrack)是一种系统性地搜索问题解答的方法。为了实现回溯,首先需要为问题定义一个解空间(solution space),这个空间必须至少包含问题的一个解(可能是最优的)。深度优先搜索方法本质上是对解空间的深度优先遍历,也就是说要做到「心中有树」。

回溯问题往往使用同一套代码可以解决一个问题的好几个变种。所以我们解决问题的时候应该首先尝试解决最基础的形式:判定是否有解或者有几个解,然后再去具体求解路径。

本质上来说,回溯就是一种暴力解法。

解题模板

核心就是 for 循环里面的递归,在递归调用之前「做选择」,在递归调用之后「撤销选择」。下面的 if 不可行 其实就是剪枝的过程。给定的函数往往不一定满足回溯的函数签名,添加一个 helper 函数就好了。track 就是路径的意思,所以回溯(backtrack)本身就是路径退回的意思。

需要注意的是,剪枝虽然能够提高程序运行速度,但是对于时间复杂度往往没有改善。回溯算法的时间复杂度往往是 O(2^n)

选择列表  # 题目给定
ans = []
def backtrack( track , pos ):
    """
    track: 当前路径
    pos: 当前位置,或者是剩下的未访问元素
    """
    if 满足结束条件 :  # base case
        ans.add( track )
        return

    # 通过考虑 pos,可以让选择列表小一点
    for 选择 in 选择列表 [pos] :
        if 不可行 :
            continue
        做选择  # track -> new_track
        backtrack( new_track , 选择 )
        撤销选择  # new_track -> track

我们可以看到回溯的空间复杂度实际上就是路径的最大长度。ans 参数完全可以写到 backtrack 函数中一直传递,但是本来 backtrack 函数的参数就够多了,我还是建议把他作为一个全局变量或者闭包外的变量,这样清晰一些。

另外值得注意的一点是,我们传递了 pos 这个变量,实际上从这里很容易转变成动态规划了。实际上这里的 pos 就是动态规划中遍历数组的索引。

改变函数签名与参数传递

正如上面所说的,回溯算法往往是需要定义一个 helper 函数的。

回溯最好把结果变量作为一个全局变量和输入变量都定义在外边,这样方便一些,不然就得一路传递下去,容易出问题。backtrack 函数接收两个参数:

  1. 当前路径
  2. 当前遍历位置

这里其实是更广义的 result in parameter 还是 result in return value 的问题。最好给用户用的函数是 result in return value 的,自己的内部实现可以是 result in parameter 的。另外也可以用闭包实现 result in parameter 的效果,也就是说不传递参数,直接从 closure 中读取。其实也可以用一个全局变量来做,都是类似的效果。

—存疑—
固定传递的参数也包括了向下传递还是向上返回。具体来说有以下几种:

  1. 向下传递额外参数,如边界等,这个参数可能是随着调用在变的。
  2. 向上返回额外参数,也就是后续遍历才能够使用到返回的值。
    —存疑结束—

例题

对于 Python 来说,一般没必要使用全局变量,可以使用内部函数访问闭包内的变量。

LeetCode 17

这道题可以说是最最简单的一道 dfs 题目了。

参考

  1. https://www.dailycodingproblem.com/blog/an-introduction-to-backtracking/
  2. https://labuladong.gitbook.io/algo/di-ling-zhang-bi-du-xi-lie/hui-su-suan-fa-xiang-jie-xiu-ding-ban
  3. https://www.cnblogs.com/hustcat/archive/2008/04/09/1144645.html
  4. https://www.1point3acres.com/bbs/thread-583166-1-1.html
  5. https://zhuanlan.zhihu.com/p/92782083

面试刷题总结(二) – 分治

分解 -> 解决子问题 -> 合并

分治的时间复杂度

正如上一节所说的,分治问题在递归过程中实际上是遍历了状态空间树,所以时间复杂度的计算也需要根据树的高度和每一层的时间复杂度来计算。

recursion-tree

T(n) = a T(n/b) + f(n), if f(n) ∈ O(n^d), d >=0
T(n)∈ { 
    O(n^d), a < b^d;
    O(n^d logn), a = b^d;
    O(n^(logba), a > b^d;
}

通过 O(n) 的时间,把 n 的问题,变为了两个 n/2 的问题,复杂度是多少?

T(n) = 2T(n/2) + O(n)
    = 2 * 2T(n/4) + O(n) + O(n)
    = n + nlogn
    ≈ O(NlogN)

通过 O(1) 的时间,把 n 的问题,变成了两个 n/2 的问题,复杂度是多少?

T(n) = 2T(n/2) + O(1)
    = 2 * 2T(n/4) + O(1) + O(1)
    = n + (1 + 2 + 4 +…+ n)
    ≈ n + 2n
    ≈ O(n)

对于 O(nlogn) 的算法 T(n) = 2T(n/2) + O(n)

证明一般采用数学归纳法,反证法

改变函数签名与参数传递

参见递归中的说明

例题

LeetCode 241

class Solution:
    def diffWaysToCompute(self, s: str) -> List[int]:
        if s.isdigit():
            return [int(s)]
        ans = []
        for i in range(len(s)):
            if s[i] in ("+", "-", "*"):
                left = self.diffWaysToCompute(s[:i])
                right = self.diffWaysToCompute(s[i+1:])
                for l in left:
                    for r in right:
                        if s[i] == "+":
                            ans.append(l+r)
                        elif s[i] == "-":
                            ans.append(l-r)
                        else:
                            ans.append(l*r)
        return ans

参考资料

  1. https://mp.weixin.qq.com/s?__biz=MzUyNjQxNjYyMg==&mid=2247487045&idx=3&sn=e9f67f1fd33649c60478638c1d6cc2d9
  2. https://oi-wiki.org/basic/divide-and-conquer/

使用 partition by 查找并删除 MySQL 数据库中重复的行

在创建 MySQL 数据表的时候,经常会忘记给某个字段添加 unique 索引,但是等到想添加的时候又已经有了重复数据,这时候就需要删除重复数据。

准备数据

本文使用如下的数据作为演示:

CREATE TABLE contacts (
    id INT PRIMARY KEY AUTO_INCREMENT,
    first_name VARCHAR(50) NOT NULL,
    last_name VARCHAR(50) NOT NULL,
    email VARCHAR(255) NOT NULL
);

INSERT INTO contacts (first_name,last_name,email)
VALUES ("Carine ","Schmitt","carine.schmitt@verizon.net"),
       ("Jean","King","jean.king@me.com"),
       ("Peter","Ferguson","peter.ferguson@google.com"),
       ("Janine ","Labrune","janine.labrune@aol.com"),
       ("Jonas ","Bergulfsen","jonas.bergulfsen@mac.com"),
       ("Janine ","Labrune","janine.labrune@aol.com"),
       ("Susan","Nelson","susan.nelson@comcast.net"),
       ("Zbyszek ","Piestrzeniewicz","zbyszek.piestrzeniewicz@att.net"),
       ("Roland","Keitel","roland.keitel@yahoo.com"),
       ("Julie","Murphy","julie.murphy@yahoo.com"),
       ("Kwai","Lee","kwai.lee@google.com"),
       ("Jean","King","jean.king@me.com"),
       ("Susan","Nelson","susan.nelson@comcast.net"),
       ("Roland","Keitel","roland.keitel@yahoo.com"),
       ("Roland","Keitel","roland.keitel@yahoo.com");

注意其中有一行重复了三次。输入完成后,数据如图所示:

file

查找重复的行

使用 group by 和 having

假设我们要通过 email 字段来查找重复值。通过使用 group by 和 having 子句可以查找到哪些行是重复的。

SELECT
    email,
    COUNT(email)
FROM
    contacts
GROUP BY email
HAVING COUNT(email) > 1;

file

Having 就类似于 Group by 之后的 where 子句。但是这个语句还是很难解决我们的问题,我们只知道发生重复的第一行了,而不知道哪些行是重复的。这时候可以使用 partition by 语句。

使用 partition by 找出所有的重复行

需要注意的是,partition by 只有在 MySQL 8.0 之后才支持,而现在常用的是 5.6 和 5.7 版本。

Partition 语句又叫做窗口聚合语句,也就是说他会把同一个值的行聚合成一个窗口,但是和 Group by 语句不同的是,窗口内的每一个行并没有被压缩成一行,具体说 Partition by 的语法是:

window_function_name(expression)
    OVER (
        [partition_defintion]
        [order_definition]
        [frame_definition]
    )

删除重复的行

删除的方法有很多种,这里介绍两种。

References

  1. https://www.mysqltutorial.org/mysql-window-functions/
    2.