Nagios check to test for changes in DNS records
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

check_dns_change.py 1.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. # -*- coding: utf-8 -*-
  2. import argparse
  3. import json
  4. import os
  5. import re
  6. import sys
  7. import dns.resolver
  8. from dns.resolver import NoAnswer
  9. class DNSChange(object):
  10. def __init__(self, state_file):
  11. self.state_file = state_file
  12. def test(self, records):
  13. state = {}
  14. changes = []
  15. if os.path.exists(self.state_file):
  16. try:
  17. with open(self.state_file, 'r') as f:
  18. state = json.load(f)
  19. except:
  20. pass
  21. for record_def in records:
  22. type, name = record_def.split(':')
  23. current_records = self.test_record(type, name)
  24. if current_records is None:
  25. print("FIXME")
  26. else:
  27. if record_def in state.keys():
  28. if state[record_def] != current_records:
  29. changes.append((type, name, current_records))
  30. state[record_def] = current_records
  31. with open(self.state_file, 'w') as f:
  32. json.dump(state, f)
  33. if changes:
  34. changes_texts = [
  35. "%s (%s) changed to %s" % (name, type, ', '.join(records)) for type, name, records in changes
  36. ]
  37. return 1, '. '.join(changes_texts)
  38. else:
  39. return 0, 'No changes found'
  40. def test_record(self, type, name):
  41. try:
  42. answer = dns.resolver.query(name, type)
  43. return sorted([x.address for x in answer])
  44. except NoAnswer:
  45. return []
  46. except:
  47. return None
  48. if __name__ == "__main__":
  49. parser = argparse.ArgumentParser()
  50. parser.add_argument('--state-file', default='/var/lib/check_dns_change/state.json')
  51. parser.add_argument('records', metavar='RECORD', nargs='+', help='records to test for. Either as TYPE:NAME or just as name')
  52. args = parser.parse_args()
  53. o = DNSChange(args.state_file)
  54. records = []
  55. for x in args.records:
  56. m = re.match("^(\w+):(.+)$", x, re.IGNORECASE)
  57. if m:
  58. if m.group(1).upper() == 'MX':
  59. raise Exception("MX records not supported")
  60. records.append(x)
  61. else:
  62. records.append('A:%s' % x)
  63. exit_code, message = o.test(records)
  64. print(message)
  65. sys.exit(exit_code)