服务器hang的检查

做运维的都知道,最怕的不是机器直接挂掉,而是怕机器hang在那里,能ping通但是又登录不上去。周末加班写了个检测脚本,发送icmp包进行ping的检查,如果有返回再继续做ssl端口的检查或者ssh登录的检查。python不像perl下直接有个很好用的net::ping,自己网上找了个python-ping,修改了一下放脚本里面直接用。


#!/usr/bin/env python2.7
import socket
import sys
import paramiko
import os
import select
import struct
import time
import threading
import Queue
import copy
import string
import hashlib
from collections import deque
ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.
class CheckHang:
    def __init__(self,server):
        self.server=server
    def check_ssh(self):
        """
        return 1 when i can't ssh to the server
        """
        ssh = paramiko.SSHClient()
        key = paramiko.RSAKey.from_private_key_file("/home/pm/keys/id_rsa")
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            ssh.connect(self.server,username="root",pkey=key,timeout=1)
            flag=1
            ssh.close()
        except:
            flag=0
        return flag
    def check_ssh_port(self,port):
        """
        check the 22 port alive, return 1 when the port is alive.
        """
        port_test = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            port_test.settimeout(1)
            port_test.connect( (self.server,port) )
            port_test.close()
            flag=1
        except :
            flag=0
        return flag
    def checksum(self,source_string):
        """
        I'm not too confident that this is right but testing seems
        to suggest that it gives the same answers as in_cksum in ping.c
        """
        sum = 0
        count_to = (len(source_string) / 2) * 2
        for count in xrange(0, count_to, 2):
            this = ord(source_string[count + 1]) * 256 + ord(source_string[count])
            sum = sum + this
            sum = sum & 0xffffffff # Necessary?

        if count_to < len(source_string):
            sum = sum + ord(source_string[len(source_string) - 1])
            sum = sum & 0xffffffff # Necessary?

        sum = (sum >> 16) + (sum & 0xffff)
        sum = sum + (sum >> 16)
        answer = ~sum
        answer = answer & 0xffff

        # Swap bytes. Bugger me if I know why.
        answer = answer >> 8 | (answer << 8 & 0xff00)

        return answer


    def receive_one_ping(self,my_socket, id, timeout):
        """
        Receive the ping from the socket.
        """
        time_left = timeout
        while True:
            started_select = time.time()
            what_ready = select.select([my_socket], [], [], time_left)
            how_long_in_select = (time.time() - started_select)
            if what_ready[0] == []: # Timeout
                return

            time_received = time.time()
            received_packet, addr = my_socket.recvfrom(1024)
            icmpHeader = received_packet[20:28]
            type, code, checksum, packet_id, sequence = struct.unpack(
                "bbHHh", icmpHeader
            )
            if packet_id == id:
                bytes = struct.calcsize("d")
                time_sent = struct.unpack("d", received_packet[28:28 + bytes])[0]
                return time_received - time_sent

            time_left = time_left - how_long_in_select
            if time_left <= 0:
                return


    def send_one_ping(self,my_socket, dest_addr, id, psize):
        """
        Send one ping to the given >dest_addr<.
        """
        dest_addr  =  socket.gethostbyname(dest_addr)

        # Remove header size from packet size
        psize = psize - 8

        # Header is type (8), code (8), checksum (16), id (16), sequence (16)
        my_checksum = 0

        # Make a dummy heder with a 0 checksum.
        header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, id, 1)
        bytes = struct.calcsize("d")
        data = (psize - bytes) * "Q"
        data = struct.pack("d", time.time()) + data

        # Calculate the checksum on the data and the dummy header.
        my_checksum = self.checksum(header + data)

        # Now that we have the right checksum, we put that in. It's just easier
        # to make up a new header than to stuff it into the dummy.
        header = struct.pack(
            "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), id, 1
        )
        packet = header + data
        my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1


    def do_one(self, timeout, psize):
        """
        Returns either the delay (in seconds) or none on timeout.
        """
        icmp = socket.getprotobyname("icmp")
        try:
            my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
        except socket.error, (errno, msg):
            if errno == 1:
                # Operation not permitted
                msg = msg + (
                    " - Note that ICMP messages can only be sent from processes"
                    " running as root."
                )
                raise socket.error(msg)
            raise # raise the original error

        #my_id = os.getpid() & 0xFFFF
        my_id= int(hashlib.md5(self.server).hexdigest(), 16) &  0xFFFF
        self.send_one_ping(my_socket,self.server, my_id, psize)
        delay = self.receive_one_ping(my_socket, my_id, timeout)

        my_socket.close()
        return delay
    def check_ping(self, timeout = 2, maxcount = 4, psize = 64 ):
        """
        if success to receive 1 response,return 1.max retry time is maxcount
        """
        for i in xrange(maxcount):
            try:
                delay = self.do_one(timeout,psize)
            except:
                continue
            if delay:
                return 1
        return 0

    def verbose_ping(self, timeout = 2, count = 4, psize = 64):
        """
        Send `count' ping with `psize' size to `dest_addr' with
        the given `timeout' and display the result.
        """
        for i in xrange(count):
            print "ping %s with ..." % self.server,
            try:
                delay  =  self.do_one( timeout, psize)
            except socket.gaierror, e:
                print "failed. (socket error: '%s')" % e[1]
                break

            if delay  ==  None:
                print "failed. (timeout within %ssec.)" % timeout
            else:
                delay  =  delay * 1000
                print "get ping in %0.4fms" % delay
        print    

class Muti_Check:
    """
    mutithread check
    """
    def __init__(self,servers):
        self.servers=servers
        self.downlist=deque()
        self.hanglist=deque()
    def server_check(self,ser):
        ser=ser.strip()
        test=CheckHang(ser)
        ping=test.check_ping(timeout=1)
        if ping == 1:
            ssh=test.check_ssh_port(22)
            #ssh=test.check_ssh()
            if ssh != 1:
                self.hanglist.append(ser)
        else:
            self.downlist.append(ser)
    def get_result(self):
        for ser in  self.hanglist:
            print "Hang: %s"%ser
        for ser in self.downlist:
            print "Down: %s"%ser
    def multi_check(self,concurrent_max):
        lists=copy.deepcopy(self.servers)
        concurrent=0
        thread_list=set()
        while( len( lists ) > 0 ):
            if len(thread_list) <= concurrent_max:
                ser=string.strip(lists.pop())
                pid=threading.Thread(target=self.server_check,args=(ser,))
                thread_list.add(pid)
                pid.start()
            else:
                alive_threads=set(threading.enumerate())
                join_threads=thread_list-alive_threads
                for job in join_threads:
                    job.join()
                    thread_list.remove(job)
            time.sleep(0.01)
        while(len(thread_list)>0):
            alive_threads=set(threading.enumerate())
            join_threads=thread_list-alive_threads
            for job in join_threads:
                job.join()
                thread_list.remove(job)
            time.sleep(0.01)

if __name__ == "__main__":
    fd=open(sys.argv[1])
    servers=fd.readlines()
    fd.close()
    cluster=Muti_Check(servers)  
    cluster.multi_check(20)
    cluster.get_result()
if __name__ == "__main2__":
    fd=open(sys.argv[1])
    num=0
    servers=fd.readlines()
    fd.close()
    hang_list=set()
    down_list=set()
    for ser in servers:
        ser=ser.strip()
        test=CheckHang(ser)
        ping=test.check_ping(timeout=1)
        if ping == 1:
            #ssh=test.check_ssh()
            ssh=test.check_ssh_port(22)
            if ssh != 1:
                num=num+1
                hang_list.add(ser)
        else:
            down_list.add(ser)
    for ser in  hang_list:
        print "Hang: %s" %ser
    for ser in down_list:
        print "Down: %s" %ser
    if not ( hang_list | down_list):
        print "all %d server ok"%(len(servers))

输入的列表就是机器名,使用多线程进行检测,线程数可以multi_check传入。如果机器上不用多线程的版本,那就用后面的那个直接简单的轮询。

另外说一下对hang的模拟,iptables放行icmp但是把22端口封掉,对于宕机的场景就是把这个检查服务器的IP直接封掉就OK。

此条目发表在OS, python分类目录。将固定链接加入收藏夹。