#!/usr/bin/python from atomicinteger import AtomicInteger from json import loads, dumps from random import random from subprocess import call, check_output from tempfile import mkdtemp from time import sleep, time import os import socket import stat import threading #from https://github.com/littlehedgehog/base/blob/master/atomicinteger.py class AtomicInteger: def __init__(self, integer = 0): self.counter = integer self.lock = threading.RLock() return def increase(self, inc = 1): self.lock.acquire() self.counter = self.counter + inc self.lock.release() return def decrease(self, dec = 1): self.lock.acquire() self.counter = self.counter - dec self.lock.release() return def get(self): return self.counter def get_sockname(): result = check_output(["watchman", "get-sockname"]) result = loads(result) return result['sockname'] def connect(): sockname = get_sockname() sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(sockname) sock.setblocking(False) return sock def watch(sock, directory): watch = ['watch', directory] sock.sendall(dumps(watch) + "\n") result = readline(sock) result = loads(result) if not result.get("watch"): print result def readline(sock): message = [] start = time() while True: elapsed = time() - start if elapsed > 5: print "We have been waiting a very long time for data from watchman. We have so far: %s" % "".join(message) try: data = sock.recv(1024, socket.MSG_DONTWAIT) if "\n" in data: message.append(data[:data.index("\n")]) break except socket.error: pass sleep(0.001) return "".join(message) def since(sock, directory, since="c:1:2:3:4"): expression = {'since' : since} watch = ["query", directory, expression] sock.sendall(dumps(watch) + "\n") message = readline(sock) result = loads(message) if "error" in result: print "Error in since: %s (since = %s)" % (result["error"], since) return result def create(sock, directory=None): directory = mkdtemp(dir=directory) watch(sock, directory) return directory def touch(directory): f = open(os.path.join(directory, "file-%s" % random()), "w") f.write("x") f.close() def isdir(f): result = os.lstat(f) return stat.S_ISDIR(result.st_mode) def recursive_rmdir(directory): for f in os.listdir(directory): qualified = os.path.join(directory, f) if isdir(qualified): recursive_rmdir(qualified) else: os.unlink(qualified) os.rmdir(directory) nthreads = AtomicInteger() runs = AtomicInteger() def run(): nthreads.increase() runs.increase() directory = None sock = None try: print "RUN: %d %d" % (runs.get(), nthreads.get()) sock = connect() directory = create(sock) result = since(sock, directory) assert "files" not in result or len(result["files"]) == 0 clock = result.get("clock") if not clock: print "Failed since: %s" % result assert clock #this stanza is only necesary on unbuntu 3.11; on 3.15, it can be skipped touch(directory) result = since(sock, directory, clock) assert result["clock"] != clock clock = result["clock"] assert len(result["files"]) == 1 sleep(0.1) #ditto for i in range(5): touch(directory) result = since(sock, directory, clock) assert result["clock"] != clock if len(result["files"]) < 5: print result finally: if sock: sock.close() if directory: recursive_rmdir(directory) nthreads.decrease() def threaded(target, *args, **kwargs): thread = threading.Thread(target=target, args=args, kwargs=kwargs) thread.start() while True: if nthreads.get() < 15: threaded(run) else: sleep(0.1)