blob: 6450301ae312a8d284b62e418d88a4c13501ea45 [file] [log] [blame]
Mohammed Naserf3f59a72023-01-15 21:02:04 -05001#!/usr/bin/env python
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Health probe script for OpenStack service that uses RPC/unix domain socket for
17communication. Check's the RPC tcp socket status on the process and send
18message to service through rpc call method and expects a reply.
19Use nova's ping method that is designed just for such simple purpose.
20
21Script returns failure to Kubernetes only when
22 a. TCP socket for the RPC communication are not established.
23 b. service is not reachable or
24 c. service times out sending a reply.
25
26sys.stderr.write() writes to pod's events on failures.
27
28Usage example for Nova Compute:
29# python health-probe.py --config-file /etc/nova/nova.conf \
30# --service-queue-name compute
31
32"""
33
34import json
35import os
36import psutil
okozachenko120314e38012023-09-01 01:40:19 +100037import re
Mohammed Naserf3f59a72023-01-15 21:02:04 -050038import signal
39import socket
40import sys
41
42from oslo_config import cfg
43from oslo_context import context
44from oslo_log import log
45import oslo_messaging
46
47rpc_timeout = int(os.getenv('RPC_PROBE_TIMEOUT', '60'))
48rpc_retries = int(os.getenv('RPC_PROBE_RETRIES', '2'))
49
50tcp_established = "ESTABLISHED"
51
52
53def _get_hostname(topic, use_fqdn):
54 if use_fqdn and topic == "compute":
55 return socket.getfqdn()
56 return socket.gethostname()
57
58
59def check_service_status(transport):
60 """Verify service status. Return success if service consumes message"""
61 try:
62 service_queue_name = cfg.CONF.service_queue_name
63 use_fqdn = cfg.CONF.use_fqdn
64 target = oslo_messaging.Target(
65 topic=service_queue_name,
66 server=_get_hostname(service_queue_name, use_fqdn),
67 namespace='baseapi',
68 version="1.1")
okozachenko1203567fc082023-08-21 22:50:02 +100069 if hasattr(oslo_messaging, 'get_rpc_client'):
70 client = oslo_messaging.get_rpc_client(transport, target,
71 timeout=rpc_timeout,
72 retry=rpc_retries)
73 else:
74 client = oslo_messaging.RPCClient(transport, target,
75 timeout=rpc_timeout,
76 retry=rpc_retries)
Mohammed Naserf3f59a72023-01-15 21:02:04 -050077 client.call(context.RequestContext(),
78 'ping',
79 arg=None)
80 except oslo_messaging.exceptions.MessageDeliveryFailure:
81 # Log to pod events
82 sys.stderr.write("Health probe unable to reach message bus")
83 sys.exit(0) # return success
84 except oslo_messaging.rpc.client.RemoteError as re:
85 message = getattr(re, "message", str(re))
86 if ("Endpoint does not support RPC method" in message) or \
87 ("Endpoint does not support RPC version" in message):
88 sys.exit(0) # Call reached the service
89 else:
90 sys.stderr.write("Health probe unable to reach service")
91 sys.exit(1) # return failure
92 except oslo_messaging.exceptions.MessagingTimeout:
93 sys.stderr.write("Health probe timed out. Agent is down or response "
94 "timed out")
95 sys.exit(1) # return failure
96 except Exception as ex:
97 message = getattr(ex, "message", str(ex))
98 sys.stderr.write("Health probe caught exception sending message to "
99 "service: %s" % message)
100 sys.exit(0)
101 except:
102 sys.stderr.write("Health probe caught exception sending message to"
103 " service")
104 sys.exit(0)
105
106
107def tcp_socket_status(process, ports):
108 """Check the tcp socket status on a process"""
109 for p in psutil.process_iter():
110 try:
111 with p.oneshot():
112 if process in " ".join(p.cmdline()):
113 pcon = p.connections()
114 for con in pcon:
115 try:
116 rport = con.raddr[1]
117 status = con.status
118 except IndexError:
119 continue
120 if rport in ports and status == tcp_established:
121 return 1
122 except psutil.Error:
123 continue
124 return 0
125
126
127def configured_port_in_conf():
128 """Get the rabbitmq/Database port configured in config file"""
129
130 rabbit_ports = set()
131 database_ports = set()
132
133 try:
134 transport_url = oslo_messaging.TransportURL.parse(cfg.CONF)
135 for host in transport_url.hosts:
136 rabbit_ports.add(host.port)
137 except Exception as ex:
138 message = getattr(ex, "message", str(ex))
139 sys.stderr.write("Health probe caught exception reading "
140 "RabbitMQ ports: %s" % message)
141 sys.exit(0) # return success
142
143 try:
144 with open(sys.argv[2]) as conf_file:
145 for line in conf_file:
okozachenko120314e38012023-09-01 01:40:19 +1000146 if re.match(r'^\s*connection\s*=', line):
Mohammed Naserf3f59a72023-01-15 21:02:04 -0500147 service = line.split(':', 3)[3].split('/')[1].rstrip('\n')
148 if service == "nova":
149 database_ports.add(
150 int(line.split(':', 3)[3].split('/')[0]))
151 except IOError:
152 sys.stderr.write("Nova Config file not present")
153 sys.exit(1)
154
155 return rabbit_ports, database_ports
156
157
158def test_tcp_socket(service):
159 """Check tcp socket to rabbitmq/db is in Established state"""
160 dict_services = {
161 "compute": "nova-compute",
162 "conductor": "nova-conductor",
Mohammed Naserf3f59a72023-01-15 21:02:04 -0500163 "scheduler": "nova-scheduler"
164 }
165 r_ports, d_ports = configured_port_in_conf()
166
167 if service in dict_services:
168 proc = dict_services[service]
169 transport = oslo_messaging.TransportURL.parse(cfg.CONF)
170 if r_ports and tcp_socket_status(proc, r_ports) == 0:
171 sys.stderr.write("RabbitMQ socket not established for service "
172 "%s with transport %s" % (proc, transport))
173 # Do not kill the pod if RabbitMQ is not reachable/down
174 if not cfg.CONF.liveness_probe:
175 sys.exit(1)
176
177 # let's do the db check
178 if service != "compute":
179 if d_ports and tcp_socket_status(proc, d_ports) == 0:
180 sys.stderr.write("Database socket not established for service "
181 "%s with transport %s" % (proc, transport))
182 # Do not kill the pod if database is not reachable/down
183 # there could be no socket as well as typically connections
184 # get closed after an idle timeout
185 # Just log it to pod events
186 if not cfg.CONF.liveness_probe:
187 sys.exit(1)
188
189
190def test_rpc_liveness():
191 """Test if service can consume message from queue"""
192 oslo_messaging.set_transport_defaults(control_exchange='nova')
193
194 rabbit_group = cfg.OptGroup(name='oslo_messaging_rabbit',
195 title='RabbitMQ options')
196 cfg.CONF.register_group(rabbit_group)
197 cfg.CONF.register_cli_opt(cfg.StrOpt('service-queue-name'))
198 cfg.CONF.register_cli_opt(cfg.BoolOpt('liveness-probe', default=False,
199 required=False))
200 cfg.CONF.register_cli_opt(cfg.BoolOpt('use-fqdn', default=False,
201 required=False))
202
203 cfg.CONF(sys.argv[1:])
204
205 log.logging.basicConfig(level=log.{{ .Values.health_probe.logging.level }})
206
207 try:
okozachenko1203567fc082023-08-21 22:50:02 +1000208 transport = oslo_messaging.get_rpc_transport(cfg.CONF)
Mohammed Naserf3f59a72023-01-15 21:02:04 -0500209 except Exception as ex:
210 message = getattr(ex, "message", str(ex))
211 sys.stderr.write("Message bus driver load error: %s" % message)
212 sys.exit(0) # return success
213
214 if not cfg.CONF.transport_url or \
215 not cfg.CONF.service_queue_name:
216 sys.stderr.write("Both message bus URL and service's queue name are "
217 "required for health probe to work")
218 sys.exit(0) # return success
219
220 try:
221 cfg.CONF.set_override('rabbit_max_retries', 2,
222 group=rabbit_group) # 3 attempts
223 except cfg.NoSuchOptError as ex:
224 cfg.CONF.register_opt(cfg.IntOpt('rabbit_max_retries', default=2),
225 group=rabbit_group)
226
227 service = cfg.CONF.service_queue_name
228 test_tcp_socket(service)
229
230 check_service_status(transport)
231
232def check_pid_running(pid):
233 if psutil.pid_exists(int(pid)):
234 return True
235 else:
236 return False
237
238if __name__ == "__main__":
239
240 if "liveness-probe" in ','.join(sys.argv):
241 pidfile = "/tmp/liveness.pid" #nosec
242 else:
243 pidfile = "/tmp/readiness.pid" #nosec
244 data = {}
245 if os.path.isfile(pidfile):
246 with open(pidfile,'r') as f:
okozachenko1203567fc082023-08-21 22:50:02 +1000247 file_content = f.read().strip()
248 if file_content:
249 data = json.loads(file_content)
250
251 if 'pid' in data and check_pid_running(data['pid']):
252 if 'exit_count' in data and data['exit_count'] > 1:
253 # Third time in, kill the previous process
254 os.kill(int(data['pid']), signal.SIGTERM)
255 else:
256 data['exit_count'] = data.get('exit_count', 0) + 1
257 with open(pidfile, 'w') as f:
258 json.dump(data, f)
259 sys.exit(0)
Mohammed Naserf3f59a72023-01-15 21:02:04 -0500260 data['pid'] = os.getpid()
261 data['exit_count'] = 0
262 with open(pidfile, 'w') as f:
263 json.dump(data, f)
264
265 test_rpc_liveness()
266
267 sys.exit(0) # return success