Python中的定时任务

TOC

1、time+循环实现

简单使用

使用while来写一个死循环,再用sleep来设置等待下一次执行的时间。

import time
from datetime import datetime

while True:
    now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f'{now_time} This is a timed task.')
    # 每多少秒执行一次秒执行一次任务
    time.sleep(5)

用函数来封装方法

import time
from datetime import datetime


def task(s):
    now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f'{now_time} This is a timed task.')
    while True:
        # 每多少秒执行一次秒执行一次任务
        time.sleep(int(s))


if __name__ == '__main__':
    task(5)   # 可以设置每多少s来执行一次任务

2、schedule库

安装schedule

pip install schedule

使用方法:

import schedule

# 每 5 秒执行一次任务
schedule.every(5).seconds.do(print('I am schedule'))
# 每10分钟执行一次任务
schedule.every(10).minutes.do(print('I am schedule'))
# 每天上午 18点13分 执行任务
schedule.every().day.at("18:13").do(print('I am schedule'))
# 每周一执行任务
schedule.every().monday.do(print('I am schedule'))
# 启动任务
schedule.run_pending()

注意:该模块的方法无法持续维持,所以需要配合while死循环来使用。
使用实例:

import schedule
import time
from datetime import datetime


def task():
    now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f'{now_time} This is a timed task.')


def set_schedule():
    # 每 5 秒执行一次任务
    schedule.every(5).seconds.do(task)
    # 每10分钟执行一次任务
    schedule.every(10).minutes.do(task)
    # 每天上午 18点13分 执行任务
    schedule.every().day.at("18:13").do(task)
    # 每周一执行任务
    schedule.every().monday.do(task)
    while True:
        schedule.run_pending()
        time.sleep(1)


if __name__ == "__main__":
    set_schedule()

3、sched库

sched库需要依赖time库来实现时间的计算。
安装sched

pip install sched

使用方法:

s = sched.scheduler(time.time, time.sleep)
s.enter(delay, priority, action, argument=())
"""
delay: 从现在开始的延迟时间(秒)。
priority: 优先级,数字越小优先级越高。
action: 要执行的函数。
argument: 传递给函数的参数(元组)。
"""
# 开始执行调度任务
s.run()

使用实例:

import sched
import time


def task(name):
    print(f"This is a timed task for {name}.")


def run_tasks():
    scheduler = sched.scheduler(time.time, time.sleep)
    scheduler.enter(2, 1, task, argument=("Task 1",))
    scheduler.enter(5, 1, task, argument=("Task 2",))
    scheduler.run()


if __name__ == "__main__":
    run_tasks()