Index: projects/portbuild/scripts/pollmachine =================================================================== --- projects/portbuild/scripts/pollmachine (revision 243878) +++ projects/portbuild/scripts/pollmachine (revision 243879) @@ -1,352 +1,355 @@ #!/usr/bin/env python # # pollmachine # # Monitors build machines and notifies qmanager of changes # # pollmachine [options] [arch] ... # - update every machine in the mlist file for [arch] # # pollmachine [options] [arch/mach] ... # - update individual machine(s) for specified architecture # # options are: # -daemon : poll repeatedly # # TODO: # XXX qmanager notification of new/removed machines # XXX counter before declaring a machine as dead # Declares a machine as online if it reports 0 data from infoseek? # * Deal with machines change OS/kernel version # - ACL list might change! # - take machine offline, update ACL/arch/etc, reboot, bring online import sys, threading, socket from time import sleep import os, subprocess, logging EXPECTED_LINES = 6 DEBUG=False pbc = os.getenv('PORTBUILD_CHECKOUT') \ if os.getenv('PORTBUILD_CHECKOUT') else "/var/portbuild" pbd = os.getenv('PORTBUILD_DATA') \ if os.getenv('PORTBUILD_DATA') else "/var/portbuild" sys.path.insert(0, '%s/lib/python' % pbc) if len(sys.argv) < 1: print "Usage: %s [ ...]" % sys.argv[0] sys.exit(1) arches=set() mlist={} polldelay=0 for i in sys.argv[1:]: if i == "-daemon": polldelay = 180 continue if "/" in i: item=i.partition("/") arch=item[0] mach=item[2] arches.add(arch) try: mlist[arch].add(mach) except KeyError: mlist[arch] = set((mach,)) else: arches.add(i) # set of machines for each arch machines={} for i in arches: machines[i]=set() # Mapping from machine names to monitor threads pollthreads={} class MachinePoll(threading.Thread): """ Poll a machine regularly """ mach = None # Which machine name to poll arch = None # Which arch is this assigned to # Which host/port to poll for this machine status (might be SSH # tunnel endpoint) host = None port = 414 timeout = None # How often to poll shutdown = False # Exit at next poll wakeup # State variables tracked online = False # Dictionary of variables reported by the client vars = None def __init__(self, mach, arch, timeout, host, port): super(MachinePoll, self).__init__() self.mach = mach self.arch = arch self.timeout = timeout self.host = host self.port = port # How many times the connection timed out since last success self.timeouts = 0 self.vars = {} self.setDaemon(True) def run(self): while True: if self.shutdown: break self.poll() if not self.timeout: break else: sleep(self.timeout) def poll(self): """ Poll the status of this machine """ nowonline = False lines = [] try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(60) retval = s.connect_ex((self.host, self.port)) if retval != 0: if self.online: logging.info("[%s] Connection error: %s" % (self.mach, `retval`)) self.timeouts += 1 else: if DEBUG: logging.info("%s connected to socket for %s" % ( str(self), self.mach )) data = "" while len(data) < 65536: chunk = s.recv(8192) if not chunk: break data += chunk if DEBUG: logging.info("%s: len(data) = %d" % (self.mach, len(data))) if len(data) > 0: lines = data.split("\n") if len(lines) >= EXPECTED_LINES: nowonline = True self.timeouts = 0 else: # XXX MCL if DEBUG or True: logging.info("%s: truncated reply: %s" % (self.mach, lines)) except socket.timeout: if self.online: logging.info("[%s] Connection timeout" % self.mach) self.timeouts += 1 if self.timeouts < 3: nowonline = self.online except Exception, e: print "pollmachine: exception in poll for %s:" %self.mach print e pass finally: try: s.close() except: pass if nowonline != self.online: logging.info("[%s] Now %s" % (self.mach, "online" if nowonline else "OFFLINE")) self.online = nowonline if self.online: self.timeouts = 0 # XXX inform qmanager of state change if self.online and not lines and not self.timeouts: # reportload script is missing dosetup=1 else: dosetup=0 for line in lines: if line == "": continue line=line.rstrip() part=line.partition('=') if part[1] != '=' or not part[0]: logging.info("[%s] Bad input: %s" % (self.mach, line)) if "No such file or directory" in line: # client requires manual setup, do not attempt to set up # to avoid just filling up the error log logging.info("[%s] you MUST set the client up manually" % self.mach) dosetup=0 else: # attempt to set client up dosetup=1 try: old = self.vars[part[0]] except KeyError: old = "" if old != part[2]: self.vars[part[0]] = part[2] # logging.info("%s@%s: \"%s\" -> \"%s\"" % (part[0], self.mach, old, part[2])) # XXX inform qmanager try: envs = self.vars['buildenvs'] for e in envs.split(): (arch, branch, buildid) = e.split("/") f = "%s/%s/%s/builds/%s/.active" % \ (pbd, arch, branch, buildid) if os.path.exists(f): continue # Clean up a stale buildenv logging.info("[%s] Cleaning up stale build: %s" % (self.mach, e)) (err, out) = self.setup(branch, buildid, "-nocopy -full") if err: logging.info("[%s] Error from cleanup" % (self.mach)) for l in out.split("\n"): if l == "": continue logging.info("[%s] %s" % (self.mach, l)) except KeyError: pass if dosetup: logging.info("[%s] Setting up machine" % (self.mach)) (err, out) = self.setup("-", "-") if err: logging.info("[%s] Error from setup" % (self.mach)) for l in out.split("\n"): if l == "": continue logging.info("[%s] %s" % (self.mach, l)) logging.info("[%s] Setup complete" % (self.mach)) # Validate that arch has not changed (e.g. i386 -> amd64) try: if self.arch != self.vars['arch']: logging.info("[%s] Unexpected arch: %s -> %s" % \ (self.mach, self.arch, self.vars['arch'])) except KeyError: pass if DEBUG: logging.info("%s recording current system load for %s" % ( str(self), self.mach )) # Record current system load # note: can fail on "file system full" try: f = file("%s/%s/loads/%s" % (pbd, self.arch, self.mach), "w") except Exception, e: print "pollmachine: exception in creating %s/%s/loads/%s:" % (pbd, self.arch, self.mach) print e return try: if 'jobs' in self.vars and 'load' in self.vars: f.write("%s %s\n" % (self.vars['jobs'], self.vars['load'])) else: # machine is not responding to poll. # XXX MCL remove from machines self.online = False # XXX inform qmanager f.write("") f.close() except Exception, e: print "pollmachine: exception in writing %s/%s/loads/%s:" % (pbd, self.arch, self.mach) print self.vars print e if DEBUG: logging.info("%s finished polling for %s" % ( str(self), self.mach )) def setup(self, branch, buildid, args = ""): cmd = "su ports-%s -c \"%s/scripts/dosetupnode %s %s %s %s %s\""\ % (self.arch, pbc, self.arch, branch, buildid, self.mach, args) child = subprocess.Popen(cmd, shell=True, stderr = subprocess.STDOUT, stdout = subprocess.PIPE) err = child.wait() out = "".join(child.stdout.readlines()) return (err, out) logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(message)s', datefmt='%d %b %Y %H:%M:%S', filename='/var/log/pollmachine.log', filemode='w') log_console = logging.StreamHandler() log_console.setLevel(logging.INFO) formatter = logging.Formatter('[%(asctime)s] %(message)s', datefmt = '%d %b %Y %H:%M:%S') log_console.setFormatter(formatter) logging.getLogger('').addHandler(log_console) while True: for arch in arches: + now = set() try: now = mlist[arch] except KeyError: mlistfile="%s/%s/mlist" % (pbd, arch) try: f = file(mlistfile, "r") + now=set(mach.rstrip() for mach in f.readlines()) + try: + f.close() + except: + pass except OSError, error: - raise - - now=set(mach.rstrip() for mach in f.readlines()) - f.close() + print "pollmachine: could not find %s, skipping" % mlistfile gone = machines[arch].difference(now) new = now.difference(machines[arch]) machines[arch]=now for mach in gone: logging.info("Removing machine %s/%s" % (arch, mach)) # XXX disable from qmanager pollthreads[mach].shutdown=True del pollthreads[mach] for mach in new: logging.info("Adding machine %s/%s" % (arch, mach)) # XXX set up qmanager pc="%s/%s/portbuild.conf" % (pbd, arch) pch="%s/%s/portbuild.%s" % (pbd, arch, mach) cmd = "test -f %s && . %s; test -f %s && . %s; echo $infoseek_host; echo $infoseek_port" % (pc, pc, pch, pch) config = subprocess.Popen(cmd, shell = True, stdout = subprocess.PIPE) host=config.stdout.readline().rstrip() if not host: host = mach port=config.stdout.readline().rstrip() try: port = int(port) except (TypeError, ValueError): port = 414 pollthreads[mach] = MachinePoll(mach, arch, polldelay, host, port) pollthreads[mach].start() if not polldelay: break if DEBUG: logging.info("Ready to sleep") sleep(polldelay) if DEBUG: logging.info("Wakeup") logging.info("pollmachine: exiting.")