1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
import rumps
import yaml
import subprocess
import os
import re
import socket
import psutil
class AppManager(rumps.App):
def __init__(self, config_path, icon_path):
super(AppManager, self).__init__('App Manager', icon=icon_path)
self.config_path = config_path
self.apps = self.load_apps()
self.menu = ['Refresh', rumps.separator] + [rumps.MenuItem(app['name'], callback=self.on_click_app) for app in self.apps] + [rumps.separator]
self.update_menu_states()
def load_apps(self):
with open(self.config_path, 'r') as file:
config = yaml.safe_load(file)
apps = config.get('applications', [])
# 设置默认的 check_method 如果未指定
for app in apps:
if 'check' not in app:
app['check'] = {
'type': 'ps',
'pattern': re.escape(app['command'])
}
# 确保 stop_command 存在,如果没有提供,则设置为空字符串
app.setdefault('stop_command', '')
return apps
def update_menu_states(self):
for app in self.apps:
menu_item = self.menu[app['name']]
if self.is_process_running(app):
menu_item.state = 1
else:
menu_item.state = 0
def is_process_running(self, app):
check_method = app.get('check', {})
method_type = check_method.get('type')
if method_type == 'port':
port = check_method.get('port')
return self.is_port_in_use(port)
elif method_type == 'ps':
pattern = check_method.get('pattern')
return self.is_process_matching(pattern)
else:
return False
@staticmethod
def is_port_in_use(port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) == 0
@staticmethod
def is_process_matching(pattern):
try:
compiled_pattern = re.compile(pattern)
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
cmdline = ' '.join(proc.info['cmdline']) if proc.info['cmdline'] else ''
if compiled_pattern.search(cmdline):
return True
return False
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
return False
@rumps.clicked('Refresh')
def refresh(self, _):
self.update_menu_states()
def on_click_app(self, sender):
for app in self.apps:
if sender.title == app['name']:
current_dir = os.getcwd()
os.chdir(app['path'])
if not sender.state: # Only start the application if it's not running
# 使用 start_new_session=True 让子进程在父进程退出后继续运行
subprocess.Popen(app['command'], shell=True, start_new_session=True)
sender.state = 1 # Assume the process starts successfully
else:
stop_command = app.get('stop_command', '').strip()
if stop_command:
subprocess.run(stop_command, shell=True)
else:
# Default action to terminate the process
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
cmdline = ' '.join(proc.info['cmdline']) if proc.info['cmdline'] else ''
if re.search(re.escape(app['command']), cmdline):
try:
p = psutil.Process(proc.info['pid'])
p.terminate() # 或者使用 p.kill() 强制终止
p.wait(timeout=5) # 等待进程结束
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.TimeoutExpired):
pass
sender.state = 0
os.chdir(current_dir)
break
if __name__ == "__main__":
current_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(current_dir, 'apps.yaml') # Path to your YAML
icon_path = os.path.join(current_dir, 'gtasks.icns') # Path to your .png icon file
AppManager(config_path, icon_path).run()
|