计算机

面试刷题总结(五)- 贪心算法

天字第一条:永远不要首先尝试贪心算法,并尝试证明它是对的。而要使用动态规划推出一个算法,然后可以尝试简化到贪心算法。

尤其是在面试中,基本不会出现贪心算法,太 tricky 了。如果一个面试官问了一个题目,并且他只会用贪心算法解,那么这个面试官水平也一般,这样的公司不去也罢。

相比动态规划来说,贪心算法一般来说:

  1. 要求条件更严格,也就是贪心选择条件;
  2. 时间复杂度往往更低,能达到线性时间复杂度。

贪心选择性质:每一步都选择局部最优解,而局部最优解恰好是全局最优解。贪婪一般需要预排序,证明贪婪成立可以采用数学归纳法,或者证明不会有更好的方法

区间问题

合并、排序区间也是一个常见的问题。

例题

LeetCode 56

LeetCode 57 合并区间

这道题关键之处在于合并后,把剩余的区间直接加上来,这样就不用考虑不少特殊情况了。

class Solution:
    def insert(self, intervals, newInterval):
        ans = []
        start, end = newInterval
        remainder = 0
        for x, y in intervals:
            if start <= y:
                if end < x:
                    break  # 找到了结尾了
                start = min(start, x)
                end = max(end, y)
            else:
                ans.append([x, y])
            remainder += 1
        ans.append([start, end])
        ans += intervals[remainder:]
        return ans

会议室 II

LeetCode 矩形重叠问题

参考资料

  1. 面试不会考贪心算法

刷 LeetCode 的经验

最近断断续续面试了三个月,也拿了一些大厂(美团百度阿里)的 offer, 分享下刷题经验

  • 一定要使用自己最擅长的语言,不要想其他的,不要引入额外的复杂度;
  • 复习基础算法,如排序,二分查找,树的遍历等,同时做一些基础题;
  • 简单的递归,比如树的一些题目,是最容易引导你进入刷题状态的;
  • 递归是几乎所有有技巧的题目的基础,栈是递归、树是递归、回溯也是递归、动归更是递归;
  • 除了递归之外,剩下的基本是考察基础操作的题目,比如螺旋打印矩阵等等,一定要注意细节;
  • 每道题 10 分钟完全没思路就看答案;
  • 如果连续有 5 道题目看答案了,去补习知识,不要再做了;
  • 代码超过 20 行了,一般情况下说明写的有问题,直接看答案(确实有个别非常繁琐的题目,面试一般不考);
  • 开始的时候按分类刷,不要按顺序刷;
  • 如果遇到完全没有思路的题目,不要悲伤,不要觉得自己是个傻瓜。秉承「闻过则喜」的态度,试想如果是面试时候恰好考到这道题目呢?
  • 刷过了过几天又忘了也没关系,根据艾宾浩斯记忆曲线,前几天是忘得最快的时候,多看几遍;
  • 当然,死记硬背是没用的,一定要理解题目;
  • 做题的时候身边要有白纸,随时在纸上画下示意图,不要空想;
  • 做了几道同类题之后,总结做题模板,比如说 backtracking 的做题模板;
  • 后期刷熟了,不要分类刷,随机刷;
  • 已有的题库就当做例题,反复看,彻底学透。每周参加周赛,做新题,作为模拟面试。

最后一点,刷 LeetCode 其实就像是大学里的期末考试,如果你两周复习时间可以搞定一门期末考试,那么两周时间也完全能搞定 LeetCode。要做到「战略上藐视,战术上重视」。

按照考察的方面,题目大概分成两类:

  1. 递归;
  2. 其他题目。

对于递归题目,拿到题目首先要思考:

  1. 怎么才能把题目的输入规模缩小,简化为规模更小的问题
  2. 另一方面基础情形是什么

然后再考虑选用哪种实现方式,是回溯还是动归。

特别注意的地方

  • 不要沉迷于研究语言特性。

    就像上面说的,使用自己最熟悉,日常使用最多的语言。不要觉得 C++ 或者 Java 或者 Whatever 刷题最爽,或者你看的答案采用了什么语言,抓住主要矛盾,你是来刷题的,不是学习新语言的。我之前犯得错误就是边学 Rust 边刷题。

  • 不要沉迷于数论

    LeetCode 上有一些涉及到数论的题目,但是代码面试不是 ACM, 也不是你的大学期末考试。最多熟悉一下排列组合算法、全排列和欧几里得算法就行了,不要沉迷于数论。

  • 要总结自己的特殊问题

    说了这么多,都是共性的问题,但是每个人都会有自己特殊的问题。比如说我的问题就在于不太敢用字典,总觉得会让人感觉投机取巧,我一个工作四年的人,肯定是知道字典的,但是做算法题的时候总在潜意识里避免使用。实际上 LeetCode 第一题就用到了字典。

面试刷题总结(十)- 指针类题目与窗口

链表的指针题目

一般使用 dummy head 的话,循环条件是 while p.next, 而不使用的话,则是 while p.

递归反转列表

int reverseList(ListNode node) {
    if (node == null || node.next == null) {
        return node;
    }
    ListNode head = reverseList(node.next);
    node.next.next = node;
    node.next = null;
    return head;
}

都比较简单,主要是细节的使用。

  • LeetCode 24
  • LeetCode 25
  • LeetCode 206

参考资料

  1. https://mp.weixin.qq.com/s/YFe9Z35CE4QWPDmPNitd4w

前缀和

前缀和这个概念可以说是「会者不难,难者不会」,概念本身是很简单的,但是如果不知道的话,好多题就两眼一抹黑了。前缀和即为数组从 0 到当前位置的和,类似的概念还有后缀和,前缀积,后缀积等。

presum[0] = A[0]
presum[i] = presum[i-1] + A[i]

例题

求数组中绝对值最小的子数组

利用前缀和的性质:A[i] + A[i+1] + ... + A[j] = presum[j] - presum[i-1] 前缀和排序,取差值最小值。

有一道题,求两个时间之间的差值,其实是类似的。当差值直接累加计算不好求的时候可以转换为前缀和的差。

把一个数组从中间 p 位置分开,使得 a[0]+…+a[p-1] 与 a[p]+a[p+1]+…+a[n-1] 差值最小?

也就是使得:presum - (total - presum) = 2 * presum - total 最小。

参考资料

  1. https://www.cnblogs.com/AndyJee/p/4474073.html

面试刷题总结(六)-栈和队列

栈本身是一种特别简单的数据结构,这里就不多说了,主要记录一些以前不会的题目。

单调栈

单调栈就是满足单调性的栈。比如递增的栈,如果新的元素比栈顶的元素小的话,就要一直弹出,直到找到满足条件的栈顶,然后再入栈。

单调队列

单调队列使用两个队列组成。其中一个是普通的队列,另一个是维护大小顺序的队列。这里的队列不是严格的队列,而是可以从尾部弹出的双端队列。

from collections import deque

class MonoQueue:
    def __init__(self):
        self.q = deque()  # 实际储存数据
        self.m = deque()  # 维护单调关系,队首元素总是最大值

    def push(self, x):
        self.q.append(x)
        while len(self.m) > 0 and self.m[-1] < x:
            self.m.pop()
        self.m.append(x)

    def pop(self):
        x = self.q.popleft()
        if self.m[0] == x:
            self.m.popleft()
        return x

    def __len__(self):
        return len(self.q)

    def top(self):
        return self.m[0]

例题

单调队列的题目

  • LC84. Largest Rectangle in Histogram
  • LC239. Sliding Window Maximum
  • LC739. Daily Temperatures
  • LC862. Shortest Subarray with Sum at Least K
  • LC901. Online Stock Span
  • LC907. Sum of Subarray Minimums

POJ3250 Bad Hair Day

有 N 头牛从左到右排成一排,每头牛有一个高度 h[i],设左数第 i 头牛与「它右边第一头高度 >= h[i]」的牛之间有 c[i] 头牛,试求 sum(c[i])。

这道题使用单调栈做。

def sum_distance(H):
    ans = 0
    stack = []
    for h in H:
        # 单调递减的栈
        while stack and stack[-1] < h:
            stack.pop()
        ans += len(stack)
        stack.append(h)
    return ans

参考资料

  1. https://oi-wiki.org/ds/monotonous-stack/
  2. https://www.hankcs.com/program/algorithm/poj-3250-bad-hair-day.html
  3. https://oi-wiki.org/ds/monotonous-queue/
  4. https://oi.men.ci/monotone-queue-notes/

小技巧

时间复杂度需要优化

如果你的算法和预期的时间复杂度还差一些数量级,那么考虑下是不是某个操作可以转化成 hash 查表,这样这个操作就是 O(1) 了。

如何证明算法正确

一般使用归纳法。

UnionFind 算法

unionfind 算法是用于计算图中的节点连通性的算法。其中连同的意思是满足『自反』『对称』『传递』三个意思的连通。

class UnionFind:
    def __init__(self, count):
        self.count = count
        self.parents = list(range(count))  # 初始化时 parent 指针指向自己

    def union(self, p, q):
        """把 p, q 两个节点连通起来"""
        p_root = self.find(p)
        q_root = self.find(q)
        if p_root == q_root:
            return
        self.parents[p_root] = q_root
        self.count -= 1

    def find(self, p):
        """找到 p 节点的根节点"""
        while self.parents[p] != p:
            p = self.parents[p]
        return p

    def is_connected(p, q):
        p_root = self.find(p)
        q_root = self.find(q)
        return p_root == q_root

在上面的算法中,我们发现 union 和 is_connected 主要依赖于 find 的时间复杂度。而在树极端不平衡的情况下,是可能退化到 O(n) 的,所以不优化的前提下,时间复杂度是 O(n)。

优化1 – 保持树平衡

class UnionFind:
    def __init__(self, count):
        self.count = count
        self.parents = list(range(count))  # 初始化时 parent 指针指向自己
        self.sizes = [1] * count  # 记录每棵树的大小

    def union(self, p, q):
        """把 p, q 两个节点连通起来"""
        p_root = self.find(p)
        q_root = self.find(q)
        if p_root == q_root:
            return
        if self.sizes(p_root) > self.sizes(q_root):
            self.parents[q_root] = p_root
            self.sizes[p_root] += self.sizes[q_root]
        else:
            self.parents[p_root] = q_root
            self.sizes[q_root] += self.sizes[p_root]
        self.count -= 1

经过这个优化之后,时间复杂度就降到 O(logN) 了。

优化2 – 路径压缩

在调用 find 函数的时候,把

class UnionFind:
    def find(self, p):
        """找到 p 节点的根节点"""
        while self.parents[p] != p:
            # 神奇的路径压缩
            self.parents[p] = self.parents[self.parents[p]]
            p = self.parents[p]
        return p

Go 语言实现

type UnionFind struct {
    count int,
    parents []int,
    sizes []int
}

func NewUnionFind(n int) (*UnionFind) {
    parents := make([]int, n)
    sizes := make([]int, n)
    for i := 0; i < n; i++ {
        parents[i] = i
        sizes[i] = 1
    }
    return &UnionFind{n, parents, sizes)
}

func (uf *UnionFind) Union (p, q int) {
    pRoot := uf.Find(p)
    qRoot := uf.Find(q)
    if pRoot == qRoot {
        return
    }
    if uf.sizes[pRoot] < uf.sizes[qRoot] {
        uf.parents[pRoot] = qRoot
        uf.sizes[qRoot] += uf.sizes[pRoot]
    } else {
        uf.parents[qRoot] = pRoot
        uf.sizes[pRoot] += uf.sizes[qRoot]
    }
    uf.count -= 1
}

func (uf *UnionFind) Find(p int) int {
    while uf.parents[p] != p {
        uf.parents[p] = uf.parents[uf.parents[p]]
        p = uf.parents[p]
    }
    return p
}

func (uf *UnionFind) IsConnected(p, q int) bool {
    pRoot := uf.Find(p)
    qRoot := uf.Find(q)
    return pRoot == qRoot
}

LeetCode 题目

LeetCode 200 岛屿的数量

这个题就很简单了,简直是 UnionFind 的直接应用。

  1. Union-Find 并查集算法详解

关机了 cron job 怎么办,开机后还会再执行吗?

在回答标题的问题之前,我们先来看下 Cron 的实现。

Cron 是 *nix 系统中常见的有一个 daemon,用于定时执行任务。cron 的实现非常简单,以最常用的 vixie cron 为例,大概分为三步:

  1. 每分钟读取 crontab 配置
  2. 计算需要执行的任务
  3. 执行任务,主进程执行或者开启一个 worker 进程执行

Cron 的实现每次都是重新加载 crontab,哪怕计算出来下次可执行时间是 30 分钟之后,也不会说 sleep(30),这样做是为了能够在每次 crontab 变更的时候及时更新。

我们可以查看 vixie cron 的源码确认一下:

/* first-time loading of tasks */
load_database(&database);
/* run tasks set to be carried out after the system rebooted */
run_reboot_jobs(&database);
/* make TargetTime the start of the next minute */
cron_sync();
while (true) {
    /* carry out tasks, then go to sleep until the TargetTime adjusted to take into account the time spent on the tasks */
    cron_sleep(); // 在这里调用了 do_command,也就是实际执行任务
    /* reread configuration */
    load_database(&database);
    /* collect tasks for given minute */
    cron_tick(&database);
/* reset TargetTime to the start of the next minute */
    TargetTime += 60;
}

do_command 函数在 fork 之后子进程中实际执行需要执行的任务,实际上在 worker 中还会进行一次 fork,以便 setuid 变成 session leader,这里就不再赘述了:

switch (fork()) {
case -1:
    /*could not execute fork */
    break;
case 0:
    /* child process: just in case let’s try to acquire the main lock again */
    acquire_daemonlock(1);
    /* move on to deriving the job process */
    child_process(e, u);
    /* once it has completed, the child process shuts down */
    _exit(OK_EXIT);
    break;
default:
    /* parent process continues working */
    break;
}

cron 是没有运行记录的,并且每次都会重新加载 crontab,所以总体来说 cron 是一个无状态的服务。

在大多数情况下,这种简单的机制是非常高效且稳健的,但是考虑到一些复杂的场景也会有一些问题,包括本文标题中的问题:

  1. 如果某个任务在下次触发的时候,上次运行还没有结束怎么办?

    这个问题其实也就是也就是并发的任务是多少。如果定义并发为 1,也就是同一个任务只能执行一个实例,那么当任务运行时间超过间隔的时候,可能会造成延迟,但是好处是不会超过系统负载。如果定义并发为 n,那么可能会有多个实例同时运行,也有可能会超过系统负载。总之,这个行为是未定义的,完全看 cron 的实现是怎么来的。

  2. 当系统关机的时候有任务需要触发,开机后 cron 还会补充执行么?

    比如说,有个任务是「每天凌晨 3 点清理系统垃圾」,如果三点的时候恰好停电了,那么当系统重启之后还会执行这个任务吗?遗憾的是,因为 cron 是不记录任务执行的记录的,所以这个功能更不可实现了。要实现这个功能就需要记录上次任务执行时间,要有 job id,也就是要有执行日志。

  3. 如果错过了好多次执行,那么补充执行的时候需要执行多少次呢?

    这个问题是上一个问题的一个衍生。还是举清理垃圾的例子,比如说系统停机五天,那么开机后实际上不用触发五次,只需要清理一次就可以了。

Unix 上传统的 cron daemon 没有考虑以上三个问题,也就是说错过就错过了,不会再执行。为了解决这个问题,又一个辅助工具被开发出来了——anacron, ana 是 anachronistic(时间错误) 的缩写。anacron 通过文件的时间戳来追踪任务的上次运行时间。具体的细节就不展开了,可以参考文章后面的参考文献。

总之,如果只有 cron,那么不会执行错过的任务,但是配合上 anacron,还是有机会执行错过的任务的。

定时执行任务是一个普遍存在的需求,除了在系统层面以外,多种不同的软件中都实现了,我们可以认为他们是广义的 cron。这些广义的 cron 大多考虑了这些问题,下面以 apscheduler 和 kubernetes 为例说明一下。

apscheduler

apscheduler 是 Python 的一个库,用于周期性地触发单个任务调度,实际上我们完全可以用 apscheduler 来实现一个自己的 cron。

apscheduler 中的几个概念:

  • triggers,触发的计算引擎,apscheduler 除了支持 cron 之外,还支持 date 和 interval 两种;
  • job store,用于记录每次的运行结果,上次运行时间等,这样当有错过的任务时才能知道需要补充执行多少次。默认是记在内存里,不过也支持 redis, mongo, mysql;
  • executor,执行任务的 worker,常用的有 ThreadPoolExecutor 和 ProcessPoolExecutor, 也就是线城池和进程池;
  • scheduler, 把以上几个概念串联起来做调度。

apscheduler 的使用也非常简单,直接看函数名大概就知道了。

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
# scheduler.add_executor('processpool')  # 使用进程池,默认是线程池
# scheduler.add_jobstore("redis")  # 使用 redis 作为 job store, 默认是内存

scheduler.add_job(
    myfunc,  # 要执行的函数
    trigger='cron',  # 触发机制
    id='my_job_id',  # job_id
    args=[],   # 执行函数的参数
    kwargs={},  # 执行函数的字典参数
    )
scheduler.remove_job('my_job_id')
scheduler.pause_job('my_job_id')
scheduler.resume_job('my_job_id')
scheduler.reschedule_job("my_job_id")  # 感觉叫 modify_job 更好一点。所有属性都可以改,除了 ID

scheduler.start()
scheduler.pause()
scheduler.resume()
scheduler.shutdown()

apscheduler 如何处理上面的三个问题

  1. 可以通过 max_instances 参数设置最大执行的实例个数;
  2. 可以通过 misfire_grace_time 参数设置错过的任务的捞回时间,也就是在如果错过的时间不超过该值,就补充触发一次;
  3. 可以通过 coalesce 参数设置当需要执行多次的时候是否合并为执行一次。

另外需要注意的一点是,apscheduler 并没有像传统的 vixie cron 一样每分钟都会唤醒一次,而是会休眠到最近的可执行任务需要触发的时候。同时为了能在休眠期间增加任务,每次调用 add_job 的时候会直接唤醒 scheduler。

在计算下次可运行时间的时候,apscheduler 会维护一个按照下次触发时间排序的队列,插入新任务会采用二分查找位置插入(不过我感觉用堆好一点啊……)。当使用其他的外部 job store 的时候则会利用这些数据库的不同机制,比如 redis 中就会使用 zset。

apscheduler 还支持添加 event listener 获取 job 的运行信息:

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

K8S 中的 cron job

在 kubernetes 中,除了 deployment 以外,我们也可以构建一次性或者定时运行的 job。定时任务也是按照 crontab 的格式来定义的。

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: hello
spec:
  schedule: "*/1 * * * *"  # cron format
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: hello
            image: busybox
            args:
            - /bin/sh
            - -c
            - date; echo Hello from the Kubernetes cluster
          restartPolicy: OnFailure

在 K8S 中,我们可以通过 .spec.concurrencyPolicy 来控制最多有多少个实例运行。K8S 建议每个 cron job 最好是幂等的,以免并发执行造成不可预料的结果。可选参数为:

  • Allow(default),允许
  • Forbid, 不允许
  • Replace,干掉原来的,执行新的

当任务执行失败的时候,K8S 的行为非常令人迷惑,如果 .spec.startingDeadlineSeconds 没有设置的话,那么任务重试 100 次失败之后就彻底放弃了……WTF……关于这个具体实现不再赘述,可以参考后面的链接 9.

在现代的分布式系统中,除了定时任务之外,更重要的是不同的任务之间的执行次序和依赖关系,在后面的文章中,会介绍一下 airflow, luigi, argo 等工具的使用和实现。敬请期待。

PS. K8S 官方文档写得真是太烂了,典型的 over engineering。

参考资料

  1. https://serverfault.com/questions/52335/job-scheduling-using-crontab-what-will-happen-when-computer-is-shutdown-during
  2. https://apscheduler.readthedocs.io/en/latest/userguide.html
  3. https://badootech.badoo.com/cron-in-linux-history-use-and-structure-70d938569b40
  4. https://askubuntu.com/questions/848610/confused-about-relationship-between-cron-and-anacron
  5. https://www.digitalocean.com/community/tutorials/how-to-schedule-routine-tasks-with-cron-and-anacron-on-a-vps
  6. http://xiaorui.cc/archives/4228
  7. https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/
  8. https://medium.com/@hengfeng/what-does-kubernetes-cronjobs-startingdeadlineseconds-exactly-mean-cc2117f9795f
  9. https://stackoverflow.com/questions/51065538/what-does-kubernetes-cronjobs-startingdeadlineseconds-exactly-mean

面试刷题总结(三) – 深度优先搜索与回溯

回溯(backtrack)是一种系统性地搜索问题解答的方法。为了实现回溯,首先需要为问题定义一个解空间(solution space),这个空间必须至少包含问题的一个解(可能是最优的)。深度优先搜索方法本质上是对解空间的深度优先遍历,也就是说要做到「心中有树」。

回溯问题往往使用同一套代码可以解决一个问题的好几个变种。所以我们解决问题的时候应该首先尝试解决最基础的形式:判定是否有解或者有几个解,然后再去具体求解路径。

本质上来说,回溯就是一种暴力解法。

解题模板

核心就是 for 循环里面的递归,在递归调用之前「做选择」,在递归调用之后「撤销选择」。下面的 if 不可行 其实就是剪枝的过程。给定的函数往往不一定满足回溯的函数签名,添加一个 helper 函数就好了。track 就是路径的意思,所以回溯(backtrack)本身就是路径退回的意思。

需要注意的是,剪枝虽然能够提高程序运行速度,但是对于时间复杂度往往没有改善。回溯算法的时间复杂度往往是 O(2^n)

选择列表  # 题目给定
ans = []
def backtrack( track , pos ):
    """
    track: 当前路径
    pos: 当前位置,或者是剩下的未访问元素
    """
    if 满足结束条件 :  # base case
        ans.add( track )
        return

    # 通过考虑 pos,可以让选择列表小一点
    for 选择 in 选择列表 [pos] :
        if 不可行 :
            continue
        做选择  # track -> new_track
        backtrack( new_track , 选择 )
        撤销选择  # new_track -> track

我们可以看到回溯的空间复杂度实际上就是路径的最大长度。ans 参数完全可以写到 backtrack 函数中一直传递,但是本来 backtrack 函数的参数就够多了,我还是建议把他作为一个全局变量或者闭包外的变量,这样清晰一些。

另外值得注意的一点是,我们传递了 pos 这个变量,实际上从这里很容易转变成动态规划了。实际上这里的 pos 就是动态规划中遍历数组的索引。

改变函数签名与参数传递

正如上面所说的,回溯算法往往是需要定义一个 helper 函数的。

回溯最好把结果变量作为一个全局变量和输入变量都定义在外边,这样方便一些,不然就得一路传递下去,容易出问题。backtrack 函数接收两个参数:

  1. 当前路径
  2. 当前遍历位置

这里其实是更广义的 result in parameter 还是 result in return value 的问题。最好给用户用的函数是 result in return value 的,自己的内部实现可以是 result in parameter 的。另外也可以用闭包实现 result in parameter 的效果,也就是说不传递参数,直接从 closure 中读取。其实也可以用一个全局变量来做,都是类似的效果。

—存疑—
固定传递的参数也包括了向下传递还是向上返回。具体来说有以下几种:

  1. 向下传递额外参数,如边界等,这个参数可能是随着调用在变的。
  2. 向上返回额外参数,也就是后续遍历才能够使用到返回的值。
    —存疑结束—

例题

对于 Python 来说,一般没必要使用全局变量,可以使用内部函数访问闭包内的变量。

LeetCode 17

这道题可以说是最最简单的一道 dfs 题目了。

参考

  1. https://www.dailycodingproblem.com/blog/an-introduction-to-backtracking/
  2. https://labuladong.gitbook.io/algo/di-ling-zhang-bi-du-xi-lie/hui-su-suan-fa-xiang-jie-xiu-ding-ban
  3. https://www.cnblogs.com/hustcat/archive/2008/04/09/1144645.html
  4. https://www.1point3acres.com/bbs/thread-583166-1-1.html
  5. https://zhuanlan.zhihu.com/p/92782083