使用dnspython解析zone文件生成反解记录

今天试了下使用dnspython来解析zone文件,然后把IP->Domain信息输出来。按照PTR格式输出到zone文件里面。 #!/usr/bin/env python2.7 import dns.zone import dns.ipv4 import os.path import sys import string import re zonedir='/home/work/dns/var/named/zone/' tempdir='/home/work/dns/script/' master_zones=("zone1.xxx.com","zone2.xxx.com") ptr_zones=("10.rev","172.rev") class PTR: def __init__(self,zones): self.new_serial=dict() self.reverse_map=dict() for zonefile in zones: filename=zonedir+zonefile zone = dns.zone.from_file(filename,os.path.basename(filename),relativize=False) for (name, ttl, rdata) in zone.iterate_rdatas('SOA'): serial=str(rdata).split()[2] if serial > 0: self.new_serial[zonefile]=int(serial)+1 if len(sys.argv) ==2 : self.new_serial[zonefile]=int(sys.argv[1]) else: print “read old ptr zone file:%s err” % zonefile sys.exit(2) def load_master_zone(self,zones): for zonefile in zones: filename=zonedir+zonefile zone = dns.zone.from_file(filename,os.path.basename(filename),relativize=False) for (name, ttl, rdata) in zone.iterate_rdatas('A'): match=re.search(r’*.’,str(name)) if match: print “ignore *.xxx domain” continue ...

October 10, 2013 · 1 min · pm

服务器hang的检查

做运维的都知道,最怕的不是机器直接挂掉,而是怕机器hang在那里,能ping通但是又登录不上去。周末加班写了个检测脚本,发送icmp包进行ping的检查,如果有返回再继续做ssl端口的检查或者ssh登录的检查。python不像perl下直接有个很好用的net::ping,自己网上找了个python-ping,修改了一下放脚本里面直接用。 #!/usr/bin/env python2.7 import socket import sys import paramiko import os import select import struct import time import threading import Queue import copy import string import hashlib from collections import deque ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris. class CheckHang: def init(self,server): self.server=server def check_ssh(self): """ return 1 when i can’t ssh to the server """ ssh = paramiko.SSHClient() key = paramiko.RSAKey.from_private_key_file("/home/pm/keys/id_rsa") ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh.connect(self.server,username=“root”,pkey=key,timeout=1) flag=1 ssh.close() except: flag=0 return flag def check_ssh_port(self,port): """ check the 22 port alive, return 1 when the port is alive. """ port_test = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: port_test.settimeout(1) ...

August 18, 2013 · 4 min · pm

二分法查找指定时间的日志

二分法是很基础的一个查询方法。试想一个场景,应用的访问量非常大,单天的日志单个文件上100G,要准实时地统计出TPM的大小。没有什么storm之类的高级玩意,就自己写脚本进行统计的话其实不太好搞。这个时候可以试试每次用二分法找出上一分钟的日志所在的偏移量,然后再顺序读入日志进行处理,可以比较高效地跳过大量的日志。python简单写了个[python]#!/usr/bin/env pythonimport reimport datetimeimport sysclass logtools:"""this tools can get the bind qps and the ips which query with high frequency"""def __init__(self,filename="/xx/acess.log"):self.logname=filenametry:#print "logs is",filenameself.fd=open(filename,"r")except IOError:print "open log failed"sys.exit(1)def __del__(self):try:self.fd.close()except:print "close fd failed"def get_last_min(self):now=datetime.datetime.now()last=datetime.datetime.now()+datetime.timedelta(minutes=-2)qps_time=datetime.datetime.now()+datetime.timedelta(minutes=-1)t=qps_time.strftime(‘\s+%H:%M:’)t2=qps_time.strftime(‘%H:%M’)return (int(last.strftime("%s")),t,t2)def get_current_min(self):time_reg=re.compile("\s+(?P\d+):(?P\d+):(?P\d+)")now=datetime.datetime.now()i=1while True:line=self.fd.readline()if not line:return Nonematch=time_reg.search(line)i=i+1if match:match_time=datetime.datetime(year=now.year,month=now.month,day=now.day,hour=int(match.group("hour")),minute=int(match.group("min")),second=int(match.group("sec")),)breakreturn int(match_time.strftime("%s"))def get_last_seek(self,last_time):old_seek=self.fd.tell()self.fd.seek(0,0)start_seek=self.fd.tell()start_time=self.get_current_min()pos_off=len(self.fd.readline())*2self.fd.seek(0,2)end_seek=self.fd.tell()self.fd.seek(-pos_off,2)end_time=self.get_current_min()#print "time range:",start_time,last_time,end_time#print "pos_off:",pos_offif last_time < start_time:print "error last-timereturn end_seekelif last_time > end_time:print "error %d > %d"%(last_time,end_time)return end_seektime=0while (end_seek – start_seek > 2*pos_off and end_time – start_time > 3) :half_seek=int((end_seek+start_seek)/2)self.fd.seek(half_seek,0)half_time=self.get_current_min()#print "%d –<%d>—%d"%(start_seek,half_seek,end_seek)if last_time<=half_time:end_seek=half_seekself.fd.seek(end_seek,0)end_time=self.get_current_min()else:start_seek=half_seekself.fd.seek(start_seek,0)start_time=self.get_current_min()time+=1#print "search %d times"%timereturn half_seekdef get_tpm(self):reg=self.get_last_min()[1]+"\d{2}"reg_time=self.get_last_min()[2]regex=re.compile(reg)time_pre=self.get_last_min()[0]pos=self.get_last_seek(time_pre)self.fd.seek(pos,0)query=0line=self.fd.readline()while line:if line == None:breakelif regex.search(line):query+=1line=self.fd.readline()print "%s qps %d"%(str(reg_time),query)a=logtools(filename=sys.argv[1])a.get_tpm() [/python] ...

July 10, 2013 · 1 min · pm

服务影响时间测量

这几天配置dns集群,为了测试各种场景下的服务影响时间,需要自己写个脚本进行统计。没有什么现成的好的工具,就用python里面的dnspython模块写了个小的脚本[python]#!/usr/bin/python#****************************************************************## ScriptName: dnsquery.py# Author: GNUer# Create Date: 2013-06-28 12:52# Modify Author: GNUer# Modify Date: 2013-06-28 12:52# Function:#***************************************************************#import dns.resolverimport dns.exceptionimport timeimport datetimeimport signalimport sysdef get_time():t=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")return tdef get_mtime():t=datetime.datetime.now().strftime("%H:%M:%S.%f")return tdef get_second():return time.time()def do_exit(sig,arg):print "exit dns test"sys.exit(1)def dns_test(num,sleep_time):resolver = dns.resolver.Resolver()#resolver.timeout = 0.01resolver.lifetime =0.0013#change this time accord to reponse timeresolver.nameservers=[‘7.7.7.7′,’6.6.6.6’]testlist=("search.xxx.com","obs.xxx.com","if.xxx.com")last_flag=Truefail_start=0fail_finish=0for i in range(num):for test in testlist:try:for target in resolver.query(qname=test):print "\x1b[32m",get_time(),test,target,"\x1b[m"if last_flag == False:last_flag= Truefail_finish=get_second()errortime=fail_finish-fail_startprint "\x1b[1; 31m","error time is:",errortime,"\x1b[m"except dns.exception.Timeout:print "\x1b[31m",get_time(),test,"failed","\x1b[m"if last_flag == True:fail_start=get_second()last_flag = Falsetime.sleep(sleep_time)signal.signal(signal.SIGINT,do_exit)dns_test(10000000,0.33)[/python]需要比较注意的是需要根据自己的实际情况把resolver.lifetime设置为比dns服务器的响应时间稍微大一点的值,但是需要小于平均响应时间的2倍。不然测试的时间不是太准确。因为总共测试的域名是3个,所以我把每次sleep的时间设置为0.32左右,使得1s总共能发3个请求左右。测试的时候可以先把这个脚本一直跑着,然后去做各种操作,看中间的影响时间是多少。虽然这个脚本是测试dns的,但是也可以修改一下做HTTP请求等等,方便在做各种HA切换的时候测试影响的时间。

June 28, 2013 · 1 min · pm

页面元素检测脚本

我们经常遇到某个页面缺少一个js之类的情况,一般使用firefox的httpfox或者chrome自带的工具都可以查看那些元素有问题.用python写了一个脚本,分析页面里面引用的元素,然后逐个进行请求,查看是否有的元素不能成功获取.不过使用urllib2有的地方异常处理也做的不完善,目前只是自己简单测试了一些,当熟练一下Python的用法了.目前从perl切换到python还是有很多的地方不是太习惯.尤其是很多列表和字符串的操作有点不一样.[python]#!/usr/bin/env pythonimport urllib2import gzipimport binasciiimport re,sysimport stringfrom StringIO import StringIOdef gunziptxt(data):buf = StringIO(data)of =gzip.GzipFile(fileobj=buf,mode="rb")outdata=of.read()return outdatadef http_code(url):request=urllib2.Request(url,headers={‘User-agent’:"python urllib browser","Accept-Encoding":’gzip’})try:response=urllib2.urlopen(request,timeout=5)return response.getcode()except urllib2.HTTPError,error:print "url:",error.reasonreturn error.codeexcept urllib2.URLError,error:print url,error.reasonreturn 1000def http_client(url):request=urllib2.Request(url,headers={‘User-agent’:"python urllib browser","Accept-Encoding":’gzip’})try:response=urllib2.urlopen(request,timeout=5)info=response.info()data=response.read()except urllib2.HTTPError,error:print "%s error:%s" %(url,error.reason)return Noneexcept urllib2.URLError,error:print error.reasonreturn None ```bash if info.get("content-encoding",None) == ‘gzip’:outdata=gunziptxt(data)else:outdata=datareturn outdatadef get_src(page):src_re=re.compile(r’src\s*=\s*["|\’]\s*(https?://[^\"\’]+?)["|\’]’)if page:link_urls=src_re.findall(page)return set(link_urls)else:return set()if len(sys.argv)<2:print "usage:\n\t",sys.argv[0],"url"exit(1)if __name__ == "__main__":urls=sys.argv[1]pages=http_client(urls)if pages:links=get_src(pages)else:exit(1)for link in links:code=http_code(link)if code >399:print "%s \x1B[1;31m%d\x1B[m"%(link,code)else:print "%s \x1B[1;32m%d\x1B[m"%(link,code)else:print "pagecheck test"

June 12, 2013 · 1 min · pm

python多线程测试

仿照之前自己写的一个perl的多线程的脚本,简单试了下python的多线程模块threading.只能说因为之前习惯了perl的很多用法,刚切换到python非常不习惯,一些小的操作上不熟练. #!/usr/bin/env python import threading import time import random import sys import string def fun_test(x): sleep_time=random.random()*1 print "args is %s sleep time is %.4f s" % (x,sleep_time) time.sleep(sleep_time) def multi_do(lists,concurrent_max,func): concurrent=0 thread_list=set() while( len( lists ) > 0 ): if len(thread_list) <= concurrent_max: ser=lists.pop() pid=threading.Thread(target=func,name=ser,args=(ser,)) thread_list.add(pid) pid.start() else: alive_threads=set(threading.enumerate()) join_threads=thread_list-alive_threads for job in join_threads: print “%s is done”% job.getName() job.join() thread_list.remove(job) while(len(thread_list)>0): alive_threads=set(threading.enumerate()) join_threads=thread_list-alive_threads for job in join_threads: print “%s is done”% job.getName() job.join() thread_list.remove(job) print “all job have been done” if name == “main”: try: fd=open(sys.argv[1],“r”) print “open %s ok” % sys.argv[1] todo_list=fd.readlines() ...

June 11, 2013 · 1 min · pm

用python解压web服务器返回的gzip数据

之前用tcpdump抓包的时候,只要是gzip压缩过的数据就没有办法直接还原原始的数据。这段时间学了一下python正好看里面有gzip模块。今天先尝试了一下解压web server返回的压缩过的数据。测试了一下OK #!/usr/bin/env python import urllib2 import gzip import binascii from StringIO import StringIO def gunziptxt(data): buf = StringIO(data) of =gzip.GzipFile(fileobj=buf,mode="rb") outdata=of.read() return outdata url="http://127.0.0.1/index.html" request=urllib2.Request(url,headers={'User-agent':"python urllib browser","Accept-Encoding":'gzip'}) try: response=urllib2.urlopen(request,timeout=5) data=response.read() except: print “get %s response failed” %url print “headers:\n”,response.info() if response.info()[“content-encoding”] == ‘gzip’: print “http resonse is gzip” outdata=gunziptxt(data) lbuf=StringIO() with gzip.GzipFile(mode=‘wb’,fileobj=lbuf) as inf: inf.write(data) gziplen=len(lbuf.getvalue()) print “gzip %d and gunzip %d”%(gziplen,len(outdata)) else: print “http resonse is not gzip” outdata=data print “http response:\n”,outdata ...

June 2, 2013 · 1 min · pm