Python

吐槽一下 Python 混乱的 threading 和 multiprocessing

最近要写一个库往 influxdb 中打点, 因为要被很多程序使用, 而又要创建新的进程, 为了避免引起使用方的异常, 简单深入了解了下 Python 的并发控制, 这才发现标准库真是坑. 之前没过多考虑过, 只是凭感觉在 CPU 密集的时候使用 multiprocessing, 而默认使用 threading, 其实两个还是有很多不一样的, 除了都是并发执行以外还有很大的不同. Python 中试图用 threading 和 multiprocessing 实现类似的接口来统一两方面, 结果导致更混乱了. 本文探讨几个坑.

在多线程环境中 fork

首先不谈 Python, 我们思考一下, 在多线程环境下如果执行 fork 会怎样? 在新的进程中, 会不会所有线程都在运行? 答案是否定的, 在 fork 之后, 只有执行 fork 的线程在运行, 而其他线程都不会运行. 这是 POSIX 标准规定的:

A process shall be created with a single thread. If a multi-threaded process calls fork(), the new process shall contain a replica of the calling thread and its entire address space, possibly including the states of mutexes and other resources. Consequently, to avoid errors, the child process may only execute async-signal-safe operations until such time as one of the exec functions is called. Fork handlers may be established by means of the pthread_atfork() function in order to maintain application invariants across fork() calls.

但是这时候其他线程持有的锁并不会自动转化到当前线程, 所以可能造成死锁. 关于在多线程程序中执行 fork 会造成的问题, 有好多文章有详细的讨论:

  1. http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them
  2. https://stackoverflow.com/questions/1073954/fork-and-existing-threads/1074663#1074663

在 python 的 daemon thread 中 fork 又会怎样

在 Python 中可以把线程设置为 daemon 状态, 如果一个进程中只有 daemon thread, 这个进程就会自动退出. 那么问题来了, 如果我们 daemon thread 中执行 fork 会怎样呢?

理论上来说, 既然 fork 之后只有一个线程, 而这个线程又是 daemon 线程, 那么显然这个进程应该直接退出的, 然而并不会这样, 这个进程会一直运行, 直到该线程退出. 这是因为 fork 之后, 唯一的线程自动成为了 main thread, 而 Python 中硬编码了 main thread 不是 daemon thread, 所以这个线程不会退出.

参考:

  1. https://stackoverflow.com/questions/31055960/is-it-a-python-bug-that-the-main-thread-of-a-process-created-in-a-daemon-thread

在新创建的进程中创建线程又会怎样

在普通进程中, 进程在所有非daemon 的线程退出之后才会推出, 但是在新创建的进程中, 不论创建的线程是 daemon thread 还是不是 daemon thread 都会在主线程退出后退出. 这是 Python 的一个 bug, 这个 bug 最早在 2013-09-08 01:20 报告出来, 而直到 2017-08-16 18:54 的 Python 3.7 才修复…

如何复现这个 bug

#!/usr/bin/env python

from multiprocessing import Process
from threading import Thread
from time import sleep

def proc():
    mythread = Thread(target=run)
    mythread.start()
    print("Thread.daemon = %s" % mythread.daemon)
    sleep(4)
    #mythread.join()

def run():
    for i in range(10):
        sleep(1)
        print("Tick: %s" % i)

if __name__ == "__main__":
    p = Process(target=proc)
    p.start()
    print("Process.daemon = %s" % p.daemon)

下面大概说下这个 bug 的原因:

  1. 普通进程会调用 sys.exit() 退出, 在这个函数中会调用 thread.join() 也就是会等待其他线程运行结束
  2. 在 Python 3.4 之前, 默认只会使用 fork 创建线程, 而对于 fork 创建的线程, 会使用 os._exit() 退出, 也就是不会调用 thread.join(). 所以也就不会等待其他线程退出
  3. 在 Python 3.4 中引入了对 spawn 系统调用的支持, 可以通过 multiprocessing.set_start_method 来设定创建进程使用的系统调用. 而使用 spawn 调用创建的进程会通过 sys.exit() 退出, 也就避免了这个 bug 的影响. 而使用 fork 创建的进程依然受到这个 bug 的影响.
  4. 在 Python 3.7 中终于在添加了 thread._shutdown 的调用, 也就是会 join 其他的 thread.

fork vs spawn 造成的 OS 平台差异性

我们知道, 在 *nix 系统中创建一个一个新的进程可以使用系统调用 fork, 父进程的所有资源都会被复制到子进程中, 当然是 Copy On Write 的. 如果要执行一个新的程序, 必须在 fork 之后调用 exec* 家族的系统调用, 后来 Linux 中添加了 spawn 系统调用, spawnfork 的不同是, 他是从头创建了一个新的子程序, 而不是像 fork 一样复制了一份父进程.

而在 Windows 上, 从来没有类似 fork 的系统调用, 只有类似 spawn 的系统调用, 也就是从头创建一个新的程序.

对于 Python 的影响. 在 *nix 操作系统上, 当使用 multiprocessing 的时候, 默认调用的是 fork, 在新的进程中所有导入的包都已经在了, 所以不会再 import 一次. 而在 Windows 系统上, 使用 multiprocessing 创建新的进程时, 所有包都会被在新进程中重新 import 一遍, 如果 import 操作是对外部系统有副作用的, 就会造成不同.

当然如上文所述, 在 Python 3.4 之后可以选择创建进程时使用的系统调用, 如果选择了 spawn, 那么在各个平台上行为就是统一的了.

参考:

  1. 为什么要区别 fork 和 exec: https://www.zhihu.com/question/66902460
  2. fork 和 spawn 造成的有趣影响: https://zhuanlan.zhihu.com/p/39542342
  3. https://stackoverflow.com/questions/38236211/why-multiprocessing-process-behave-differently-on-windows-and-linux-for-global-o

fork 和 asyncio

多进程和 Event Loop 也可能引起一些问题, 这篇文章 给了一个很好的例子:

假设现在有一个场景,主进程运行着一个event loop,在某个时候会fork出一个子进程,子进程再去运行一个新建的event loop:

async def coro(loop):
    pid = os.fork()
    if pid != 0:  # parent
        pass
    else:  # child
        cloop = asyncio.new_event_loop()
        cloop.run_forever()

loop = asyncio.get_event_loop()
asyncio.ensure_future(coro(loop), loop=loop)
loop.run_forever()
loop.close()

这段代码看起来没有什么问题, 在子进程中开了一个新的 Event Loop, 然而在 Python 3.5 和以下, 在真正运行时会报错:

...
cloop.run_forever()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 411, in run_forever
    "Cannot run the event loop while another loop is running")
RuntimeError: Cannot run the event loop while another loop is running

原因就在于标准库的 Event Loop 没有考虑多进程环境, 只是使用一个 thread local 来表示当前的 loop, 在多线程条件下, 这样当然是可以的, 但是在 fork 之后, 数据结构全部都得到了复制, 因此子进程就会检查到已经有 event loop 在运行了.

在 Python 3.6 中, 这个问题得到了简单粗暴的修复, 在每个 loop 上都标记一个 pid, 检查的时候再加上 pid 验证是不是当前进程就好了.

总而言之, 尽量不要同时使用多进程和多线程, 如果非要用的话, 首先尽早创建好需要的进程, 然后在进程中再开始创建线程或者开启 Event Loop.

还有一篇文章没看, 用空了再看下吧, 是讲 multiprocessing.Pool 的坑:

  1. https://codewithoutrules.com/2018/09/04/python-multiprocessing/

Python 高性能请求库 aiohttp 的基本用法

aiohttp 是 Python 异步编程最常用的一个 web 请求库了, 依托于 asyncio, 性能非常吓人. 下面列举几个常见的用法:

最基础: 并发下载网页

import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
            "http://python.org",
            "https://google.com",
            "http://yifei.me"
        ]
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls:
            tasks.append(fetch(session, url))
        htmls = await asyncio.gather(*tasks)
        for html in htmls:
            print(html[:100])

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

requests cookies 为空的一个坑

有时候,requests 返回的 cookies 会为空,原因是链接发生了 301/302 跳转,而 cookies 是跟着第一个响应返回的,第二个响应没有返回 Set-Cookie header。所以直接读取 r.cookies 是空的,而在 session.cookies 中是有数据的。

解决方法是直接读 s.cookies。

s = requests.Session()
r = s.get("http://httpbin.org/cookies/set?foo=bar")
cookies = requests.utils.dict_from_cookiejar(s.cookies)
s.cookies.clear()

不过需要注意的是如果在多线程环境中使用 session 需要注意锁的问题,建议把 session 设置成 thread local 的类型。

Python 中的 GC(垃圾回收)

一般来说,垃圾回收有两种算法:引用计数和标记删除。

引用计数(reference counting)

CPython 中默认使用的垃圾回收算法是 Reference Counting。也就是对每个元素标记有多少个其他元素引用了它,当引用数降到零的时候就删除。

  1. 当对象增加一个引用,比如赋值给变量,属性或者传入一个方法,引用计数执行加1运算。
  2. 当对象减少一个引用,比如变量离开作用域,属性被赋值为另一个对象引用,属性所在的对象被回收或者之前传入参数的方法返回,引用计数执行减1操作。
  3. 当引用计数变为0,代表该对象不被引用,可以标记成垃圾进行回收。

为了解决循环引用的问题,CPython 使用了 Cyclic GC,遍历所有的环,并且把每一个元素的引用减一,来检测每一个引用环是不是循环应用。

标记删除(Mark and Sweep)

  1. 从某一个已知的还活着的对象开始,便利对象,如果经过了某个对象就认为是活着的
  2. 如果没有被标记的就删除

避免了循环引用的问题

实际的处理过程

Pluggable
Generational
Incremental

引用计数(reference counting)

CPython 中默认使用的垃圾回收算法是 Reference Counting。也就是对每个元素标记有多少个其他元素引用了它,当引用数降到零的时候就删除。

  1. 当对象增加一个引用,比如赋值给变量,属性或者传入一个方法,引用计数执行加 1 运算。
  2. 当对象减少一个引用,比如变量离开作用域,属性被赋值为另一个对象引用,属性所在的对象被回收或者之前传入参数的方法返回,引用计数执行减 1 操作。
  3. 当引用计数变为 0,代表该对象不被引用,可以标记成垃圾进行回收。

为了解决循环引用的问题,CPython 使用了 Cyclic GC,遍历所有的环,并且把每一个元素的引用减一,来检测每一个引用环是不是循环应用。

标记删除(Mark and Sweep)

  1. 从某一个已知的还活着的对象开始,便利对象,如果经过了某个对象就认为是活着的
  2. 如果没有被标记的就删除

避免了循环引用的问题

实际的处理过程

Pluggable
Generational
Incremental

参考资料

  1. https://www.youtube.com/watch?v=iHVs_HkjdmI
  2. https://droidyue.com/blog/2015/06/05/how-garbage-collector-handles-circular-references/
  3. https://www.cnblogs.com/Xjng/p/5128269.html
  4. https://foofish.net/python-gc.html

在 Python 中优雅地处理 SIGTERM 信号

昨天写了一个服务,在本地运行很好,使用 Ctrl-C 结束运行之后会清理资源,然后取消注册,然而放到 Docker 中跑之后发现结束之后资源没有释放。查了查发现原来是下面是几个因素造成的:

  1. Ctrl-C 发送的是 SIGINT 信号,Python 会转化成 KeyboardInterrupt 异常,而我的资源是在 finally 释放资源,所以使用 Ctrl-C 可以优雅地退出
  2. Python 中对其他的信号(比如 SIGTERM、SIGHUP)都不会处理,而是直接退出
  3. Docker 在推出的时候默认发送的是 SIGTERM 信号

所以在 docker stop 的时候服务并不能优雅的推出。

解决方法

使用 atexit 模块是不可以的,atexit 不会处理 SIGTERM。需要使用 signal 模块来,在网上找到了一份源码。这个代码注册了一个 SIGTERM 的 handler,把 SIGTERM 转换成正常的 sys.exit 调用,当运行 sys.exit 的时候会运行 finally 子句中的语句。

# Author: Giampaolo Rodola" <g.rodola [AT] gmail [DOT] com>
# License: MIT

from __future__ import with_statement
import contextlib
import signal
import sys


def _sigterm_handler(signum, frame):
    sys.exit(0)
_sigterm_handler.__enter_ctx__ = False


@contextlib.contextmanager
def handle_exit(callback=None, append=False):
    """A context manager which properly handles SIGTERM and SIGINT
    (KeyboardInterrupt) signals, registering a function which is
    guaranteed to be called after signals are received.
    Also, it makes sure to execute previously registered signal
    handlers as well (if any).

    >>> app = App()
    >>> with handle_exit(app.stop):
    ...     app.start()
    ...
    >>>

    If append == False raise RuntimeError if there"s already a handler
    registered for SIGTERM, otherwise both new and old handlers are
    executed in this order.
    """
    old_handler = signal.signal(signal.SIGTERM, _sigterm_handler)
    if (old_handler != signal.SIG_DFL) and (old_handler != _sigterm_handler):
        if not append:
            raise RuntimeError("there is already a handler registered for "
                               "SIGTERM: %r" % old_handler)

        def handler(signum, frame):
            try:
                _sigterm_handler(signum, frame)
            finally:
                old_handler(signum, frame)
        signal.signal(signal.SIGTERM, handler)

    if _sigterm_handler.__enter_ctx__:
        raise RuntimeError("can"t use nested contexts")
    _sigterm_handler.__enter_ctx__ = True

    try:
        yield
    except KeyboardInterrupt:
        pass
    except SystemExit, err:
        # code != 0 refers to an application error (e.g. explicit
        # sys.exit("some error") call).
        # We don"t want that to pass silently.
        # Nevertheless, the "finally" clause below will always
        # be executed.
        if err.code != 0:
            raise
    finally:
        _sigterm_handler.__enter_ctx__ = False
        if callback is not None:
            callback()


if __name__ == "__main__":
    # ===============================================================
    # --- test suite
    # ===============================================================

    import unittest
    import os

    class TestOnExit(unittest.TestCase):

        def setUp(self):
            # reset signal handlers
            signal.signal(signal.SIGTERM, signal.SIG_DFL)
            self.flag = None

        def tearDown(self):
            # make sure we exited the ctx manager
            self.assertTrue(self.flag is not None)

        def test_base(self):
            with handle_exit():
                pass
            self.flag = True

        def test_callback(self):
            callback = []
            with handle_exit(lambda: callback.append(None)):
                pass
            self.flag = True
            self.assertEqual(callback, [None])

        def test_kinterrupt(self):
            with handle_exit():
                raise KeyboardInterrupt
            self.flag = True

        def test_sigterm(self):
            with handle_exit():
                os.kill(os.getpid(), signal.SIGTERM)
            self.flag = True

        def test_sigint(self):
            with handle_exit():
                os.kill(os.getpid(), signal.SIGINT)
            self.flag = True

        def test_sigterm_old(self):
            # make sure the old handler gets executed
            queue = []
            signal.signal(signal.SIGTERM, lambda s, f: queue.append("old"))
            with handle_exit(lambda: queue.append("new"), append=True):
                os.kill(os.getpid(), signal.SIGTERM)
            self.flag = True
            self.assertEqual(queue, ["old", "new"])

        def test_sigint_old(self):
            # make sure the old handler gets executed
            queue = []
            signal.signal(signal.SIGINT, lambda s, f: queue.append("old"))
            with handle_exit(lambda: queue.append("new"), append=True):
                os.kill(os.getpid(), signal.SIGINT)
            self.flag = True
            self.assertEqual(queue, ["old", "new"])

        def test_no_append(self):
            # make sure we can"t use the context manager if there"s
            # already a handler registered for SIGTERM
            signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0))
            try:
                with handle_exit(lambda: self.flag.append(None)):
                    pass
            except RuntimeError:
                pass
            else:
                self.fail("exception not raised")
            finally:
                self.flag = True

        def test_nested_context(self):
            self.flag = True
            try:
                with handle_exit():
                    with handle_exit():
                        pass
            except RuntimeError:
                pass
            else:
                self.fail("exception not raised")

    unittest.main()

参考:

  1. docker 退出信号:https://www.ctl.io/developers/blog/post/gracefully-stopping-docker-containers/
  2. finally 中的语句并不总会执行:https://stackoverflow.com/questions/49262379/does-finally-always-execute-in-python
  3. Python 不处理 SIGTERM 信号 https://stackoverflow.com/questions/9930576/python-what-is-the-default-handling-of-sigterm
  4. sys.exit 也会运行 finally 中的语句 https://stackoverflow.com/questions/7709411/why-finally-block-is-executing-after-calling-sys-exit0-in-except-block

一篇简单的 Python gRPC 教程

安装

pip install grpcio grpcio-tools protobuf googleapis-common-protos

IDL

grpc 使用 protobuf 来定义接口。按照 protobuf 的 Style Guide 的要求,service 和其中的方法都应该使用 CamelCase。

service 关键字定义一个服务,相当于一个接口。把下面的文件保存为 helloworld.proto

需要注意的是,grpc 中的方法只能接受一个参数,返回一个参数。

// The greeter service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user"s name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloResponse {
  string message = 1;
}

生成 rpc 代码

python -m grpc_tools.protoc  --python_out=. --grpc_python_out=. helloworld.proto

生成了两个文件:

  • helloworld_pb2,包含了 protobuf 中结构的定义
  • helloworldpb2grpc, 包含了 protobuf grpc 接口的定义

实现 rpc 服务

from current.futures import ThreadPoolExecutor
from helloworld_pb2 import HelloRepsonse
from helloworld_pb2_grpc import GreeterServicer, add_GreeterServicer_to_server

class Greeter(GreeterServicer):

  def SayHello(self, request, context):
    return HelloResponse(message="Hello, %s!" % request.name)

  def SayHelloAgain(self, request, context):
    return HelloResponse(message="Hello again, %s!" % request.name)


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port("[::]:50051")
    server.start()
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)

客户端调用

import grpc
from helloworld_pb2 import HelloRequest
from helloworld_pb2_grpc import GreeterStub


def run():
    # NOTE(gRPC Python Team): .close() is possible on a channel and should be
    # used in circumstances in which the with statement does not fit the needs
    # of the code.
    with grpc.insecure_channel("localhost:50051") as channel:
        stub = GreeterStub(channel)
        response = stub.SayHello(HelloRequest(name="you"))
    print("Greeter client received: " + response.message)

高级话题

stream

未完待续

参考

  1. [A simplified guide to gRPC in Python](

父进程退出后如何退出子进程

我们知道当子进程推出的时候,父进程会收到 SIGCHLD 信号,从而可以采取相应的操作。但是当父进程退出的时候,系统会把子进程的父进程更改为pid=0的 init 进程,而且子进程不会收到任何信号。而我们经常想在父进程退出的时候,让子进程也推出。在 Python 中可以有如下几种做法。

设置子进程为 daemon

这里的 daemon 和系统的守护进程没有任何关系,是 quitwhenparent_dies 的意思。也就是当父进程退出的时候,会自动尝试关闭 daemon=True 的子进程。

p = multiprocessing.Process(target=foo)
p.daemon = True
p.start()

官方文档

在子进程中设置 PDEATHSIG

在 Linux 中,进程可以要求内核在父进程退出的时候给自己发信号。使用系统调用 prctl。

prctl(PR_SET_PDEATHSIG, SIGHUP);

在 Python中也有对应的包 python-prctl,可以在子进程中这样使用,这样在父进程挂掉的时候,子进程就会收到 SIGHUP 信号:

# apt-get install build-essential libcap-dev
# pip install python-prctl

import signal
import prctl

prctl.set_pdeathsig(signal.SIGHUP)

缺点:只支持 linux

父进程在终止的时候回收子进程

可以使用 atexit.register 在主进程中注册代码:

# pip install psutil


import psutil
import atexit
import os
import signal

@atexit.register
def kill_children():
    print("quitting, press Ctrl-C to force quit")
    current_process = psutil.Process()
    children = current_process.children(recursive=True)
    for child in children:
        print("Child pid is {}".format(child.pid))
        os.kill(child.pid, signal.SIGTERM)

使用 atexit 在收到 SIGTERM 的时候并不能触发,所以最好使用 signal 注册到主进程对应的信号上。

缺点是当使用 kill -9 或者使用 os._exit 的时候不会调用这些函数。

curio asks 源码解析

asks 是 Python 的异步框架 curio 中的 一个 http 库。它基于 h11 这个库来做 http 协议的解析,然后提供了在 curio 下的 IO 操作。下面按照功能逐个介绍其中的每个部分。

杂项

auth.py

该文件中主要包含了 http auth 相关函数, 支持了 Basic Auth 的 Digest Auth。值得注意的是,digest auth 作为一种既很复杂又不安全的认证方式,已经没有人用了。如果需要使用 http auth 的话,现在推荐的方式使用 https + basic auth。

base_funcs.py

提供了一些快捷方式函数,比如 curio.get。

cookie_utils.py

该文件主要包含了 CookieTracker, 对外的方法主要有两个 get_additional_cookies 用于获取域名对应的 cookie,_store_cookies 用于添加 cookie。

parse_cookies 函数主要用于解析 set-cookie 头部,并把解析到的 cookie 附加到 response 对象上。

errors.py

asks 中抛出的异常的类

http_utils.py

处理编码和压缩的两个函数。

请求与响应

request_object.py

该文件中主要是定义了 RequestProcessor 类。RequestProcessor 用于生成一个 HTTP 请求。

makerequest 方法。hconnection定义和使用的地方相距太远了。cookie的生成应该使用join。之后调用 _requestio 发送请求

_request_io 调用 首先掉用 _send, 然后调用 _catch_response

_catch_response 调用 recv_event

_recv_event 不断调用 _async_lib.recv(self.sock, 10000) 从而不断产生数据,知道读完为之

sessions.py

session 类

request 调用 grabconnection 获取一个socket,然后把这个socket交给Request对象
grab
connection 调用 checkoutconnection 获得一个socket或者,调用makeconnection产生一个新的socket,注意其中有一个奇怪的 await sleep(0),可能意思是把循环交回给event loop

make_connection 调用 _connect 方法,并把host和port作为属性写到socket上

session 中有两个SocketQ的类,connpool, checkedout_sockets 分别用来保存已连接未使用的socket和正在使用中的socket

response_objects.py

Response 表示了一个响应。如果在发起请求的时候选择了 stream=True, response.body 会是一个 StreamBody 实例,StreamBody 用于流式处理响应。

Cookie 类表示了一个 Cookie,不知道这里为什么没有用标准库的 cookie。

Connection Pool

如果使用代理的话

req_structs.py

SocketQ 是一个 socket 的连接池。使用一个 deque 作为存储,实际上相当于又模拟了一个字典 {netloc => socket}(思考:为什么不使用OrderedDict呢?)index 返回指定 hostloc 对应的 index。pull 弹出指定 index 的 socket。__contains__ 遍历看是否包含对应的socket。需要注意的是这个类不是线程安全的,不过对于 curio 来说,线程安全似乎无关紧要,毕竟只有一个线程。

CaseIncesitiveDict 是一个对大小写不敏感的词典,直接从 requests 里面拿过来的。

curio 的网络通信

首先,需要引入curio.socket 而不是使用内置的socket

TCP通信,使用 sock.bind/listen/accept 等建立服务器,使用recv和sendall发送接收消息。
UDP通信,使用recvfrom和sendto函数通信

作为客户端使用 curio.open_connection 打开到服务器的链接,其中 ssl=True打开的是HTTPS连接诶

对于其他要使用ssl的情况,应该使用curio.ssl而不是标准库的ssl

curio.network.

ssl.wrapsocket 不支持serverhostname sslcontext.wrap_socket 支持

不要把 proxy 传递给 request 对象

添加 http 代理支持

asks 把繁重的 http 解析工作都用 h11 这个库巧妙的解决了,所以 asks 本身是非常轻量的一个库。很遗憾的是,在 asks 中尚未支持代理,下面我们尝试为 asks 增加 http 代理的支持 😛

在 http 代理模式中,每个请求都需要添加上 Proxy-Authorization 的 Header。而在 https 请求中,只有 Connect 的时候需要添加上 Proxy-Authorization 的 Header。

代理的 socket 池和真正的 socket 池要分开,这样设计还简单一点。

Python 操作 ssh

import paramiko
ip="server ip"
port=22
username="username"
password="password"
cmd="some useful command" 
ssh=paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip,port,username,password)
stdin,stdout,stderr=ssh.exec_command(cmd)
outlines=stdout.readlines()
resp="".join(outlines)
print(resp)