def singleton_scheduler():
"""文件锁实现单例定时启动"""
import os
import platform, atexit
def start_scheduler():
if platform.system() != "Windows":
print('开始执行')
# path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'scheduler.lock')
pid_file = open("scheduler.lock", "wb")
try:
import fcntl
fcntl.flock(pid_file, fcntl.LOCK_EX | fcntl.LOCK_NB) # 创建一个排他锁,并且所被锁住其他进程不会阻塞
start_scheduler()
except IOError as e:
print("已经有一个实例在运行")
except ModuleNotFoundError as e:
print("windows 没有这个fcntl模块, 定时任务未启动")
def unlock():
print('退出回调')
fcntl.flock(pid_file, fcntl.LOCK_UN)
pid_file.close()
'''注册一个退出回调函数,用于在程序退出时进行一些清理工作,关闭文件,解除锁'''
atexit.register(unlock)
singleton_scheduler()
fcntl linux系统才可以用
测试:
完整代码:
from flask import Flask
import sys
app = Flask(__name__)
def singleton():
"""文件锁"""
import os
import platform, atexit
def start_scheduler():
if platform.system() != "Windows":
print('开始执行')
# path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'scheduler.lock')
pid_file = open("scheduler.lock", "wb")
try:
import fcntl
fcntl.flock(pid_file, fcntl.LOCK_EX | fcntl.LOCK_NB) # 创建一个排他锁,并且所被锁住其他进程不会阻塞
start_scheduler()
except IOError as e:
print("已经有一个实例在运行")
except ModuleNotFoundError as e:
print("windows 没有这个fcntl模块, 定时任务未启动")
def unlock():
print('退出回调')
fcntl.flock(pid_file, fcntl.LOCK_UN)
pid_file.close()
'''注册一个退出回调函数,用于在程序退出时进行一些清理工作,关闭文件,解除锁'''
atexit.register(unlock)
@app.route('/')
def hello_world():
return 'Hello, World!'
singleton()
if __name__ == '__main__':
app.run(port=sys.argv[1])