做运维的都知道,最怕的不是机器直接挂掉,而是怕机器hang在那里,能ping通但是又登录不上去。周末加班写了个检测脚本,发送icmp包进行ping的检查,如果有返回再继续做ssl端口的检查或者ssh登录的检查。python不像perl下直接有个很好用的net::ping,自己网上找了个python-ping,修改了一下放脚本里面直接用。
#!/usr/bin/env python2.7
import socket
import sys
import paramiko
import os
import select
import struct
import time
import threading
import Queue
import copy
import string
import hashlib
from collections import deque
ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.
class CheckHang:
def __init__(self,server):
self.server=server
def check_ssh(self):
"""
return 1 when i can't ssh to the server
"""
ssh = paramiko.SSHClient()
key = paramiko.RSAKey.from_private_key_file("/home/pm/keys/id_rsa")
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(self.server,username="root",pkey=key,timeout=1)
flag=1
ssh.close()
except:
flag=0
return flag
def check_ssh_port(self,port):
"""
check the 22 port alive, return 1 when the port is alive.
"""
port_test = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
port_test.settimeout(1)
port_test.connect( (self.server,port) )
port_test.close()
flag=1
except :
flag=0
return flag
def checksum(self,source_string):
"""
I'm not too confident that this is right but testing seems
to suggest that it gives the same answers as in_cksum in ping.c
"""
sum = 0
count_to = (len(source_string) / 2) * 2
for count in xrange(0, count_to, 2):
this = ord(source_string[count + 1]) * 256 + ord(source_string[count])
sum = sum + this
sum = sum & 0xffffffff # Necessary?
if count_to < len(source_string):
sum = sum + ord(source_string[len(source_string) - 1])
sum = sum & 0xffffffff # Necessary?
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
# Swap bytes. Bugger me if I know why.
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def receive_one_ping(self,my_socket, id, timeout):
"""
Receive the ping from the socket.
"""
time_left = timeout
while True:
started_select = time.time()
what_ready = select.select([my_socket], [], [], time_left)
how_long_in_select = (time.time() - started_select)
if what_ready[0] == []: # Timeout
return
time_received = time.time()
received_packet, addr = my_socket.recvfrom(1024)
icmpHeader = received_packet[20:28]
type, code, checksum, packet_id, sequence = struct.unpack(
"bbHHh", icmpHeader
)
if packet_id == id:
bytes = struct.calcsize("d")
time_sent = struct.unpack("d", received_packet[28:28 + bytes])[0]
return time_received - time_sent
time_left = time_left - how_long_in_select
if time_left <= 0:
return
def send_one_ping(self,my_socket, dest_addr, id, psize):
"""
Send one ping to the given >dest_addr<.
"""
dest_addr = socket.gethostbyname(dest_addr)
# Remove header size from packet size
psize = psize - 8
# Header is type (8), code (8), checksum (16), id (16), sequence (16)
my_checksum = 0
# Make a dummy heder with a 0 checksum.
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, id, 1)
bytes = struct.calcsize("d")
data = (psize - bytes) * "Q"
data = struct.pack("d", time.time()) + data
# Calculate the checksum on the data and the dummy header.
my_checksum = self.checksum(header + data)
# Now that we have the right checksum, we put that in. It's just easier
# to make up a new header than to stuff it into the dummy.
header = struct.pack(
"bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), id, 1
)
packet = header + data
my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1
def do_one(self, timeout, psize):
"""
Returns either the delay (in seconds) or none on timeout.
"""
icmp = socket.getprotobyname("icmp")
try:
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error, (errno, msg):
if errno == 1:
# Operation not permitted
msg = msg + (
" - Note that ICMP messages can only be sent from processes"
" running as root."
)
raise socket.error(msg)
raise # raise the original error
#my_id = os.getpid() & 0xFFFF
my_id= int(hashlib.md5(self.server).hexdigest(), 16) & 0xFFFF
self.send_one_ping(my_socket,self.server, my_id, psize)
delay = self.receive_one_ping(my_socket, my_id, timeout)
my_socket.close()
return delay
def check_ping(self, timeout = 2, maxcount = 4, psize = 64 ):
"""
if success to receive 1 response,return 1.max retry time is maxcount
"""
for i in xrange(maxcount):
try:
delay = self.do_one(timeout,psize)
except:
continue
if delay:
return 1
return 0
def verbose_ping(self, timeout = 2, count = 4, psize = 64):
"""
Send `count' ping with `psize' size to `dest_addr' with
the given `timeout' and display the result.
"""
for i in xrange(count):
print "ping %s with ..." % self.server,
try:
delay = self.do_one( timeout, psize)
except socket.gaierror, e:
print "failed. (socket error: '%s')" % e[1]
break
if delay == None:
print "failed. (timeout within %ssec.)" % timeout
else:
delay = delay * 1000
print "get ping in %0.4fms" % delay
class Muti_Check:
"""
mutithread check
"""
def __init__(self,servers):
self.servers=servers
self.downlist=deque()
self.hanglist=deque()
def server_check(self,ser):
ser=ser.strip()
test=CheckHang(ser)
ping=test.check_ping(timeout=1)
if ping == 1:
ssh=test.check_ssh_port(22)
#ssh=test.check_ssh()
if ssh != 1:
self.hanglist.append(ser)
else:
self.downlist.append(ser)
def get_result(self):
for ser in self.hanglist:
print "Hang: %s"%ser
for ser in self.downlist:
print "Down: %s"%ser
def multi_check(self,concurrent_max):
lists=copy.deepcopy(self.servers)
concurrent=0
thread_list=set()
while( len( lists ) > 0 ):
if len(thread_list) <= concurrent_max:
ser=string.strip(lists.pop())
pid=threading.Thread(target=self.server_check,args=(ser,))
thread_list.add(pid)
pid.start()
else:
alive_threads=set(threading.enumerate())
join_threads=thread_list-alive_threads
for job in join_threads:
job.join()
thread_list.remove(job)
time.sleep(0.01)
while(len(thread_list)>0):
alive_threads=set(threading.enumerate())
join_threads=thread_list-alive_threads
for job in join_threads:
job.join()
thread_list.remove(job)
time.sleep(0.01)
if __name__ == "__main__":
fd=open(sys.argv[1])
servers=fd.readlines()
fd.close()
cluster=Muti_Check(servers)
cluster.multi_check(20)
cluster.get_result()
if __name__ == "__main2__":
fd=open(sys.argv[1])
num=0
servers=fd.readlines()
fd.close()
hang_list=set()
down_list=set()
for ser in servers:
ser=ser.strip()
test=CheckHang(ser)
ping=test.check_ping(timeout=1)
if ping == 1:
#ssh=test.check_ssh()
ssh=test.check_ssh_port(22)
if ssh != 1:
num=num+1
hang_list.add(ser)
else:
down_list.add(ser)
for ser in hang_list:
print "Hang: %s" %ser
for ser in down_list:
print "Down: %s" %ser
if not ( hang_list | down_list):
print "all %d server ok"%(len(servers))
输入的列表就是机器名,使用多线程进行检测,线程数可以multi_check传入。如果机器上不用多线程的版本,那就用后面的那个直接简单的轮询。
另外说一下对hang的模拟,iptables放行icmp但是把22端口封掉,对于宕机的场景就是把这个检查服务器的IP直接封掉就OK。