create blog

go home go home
  1. about
  2. code
  3. wiki
  4. blog

Dolores & Cornelius, Sitting in a Tree. Of Comet, Orbited and Python.

I’ll give you a moment to allow you to get that disturbing image out of your head.

I have been working on a web application. I’ll hopefully be able to post the application soon.

I needed the server to be able to send updates to the client via Comet. I’d never done any Comet before, so I wasn’t sure what to do.

A lot of articles had examples making use of message queues and who knows what else. They were a tad overwhelming. While a message queue of some sort may eventually be a good idea, for now, all I needed to do was push a non-vital progress indicator for a job (15%, 20%, 30%, and so on) to the client so the user could see what was happening. I wanted more than one job to be supported, and the overall system should eventually allow sending updates when elements in the user’s filesystem are updated.

So, I’d like to be able to send:

UPDATE process/at/my/process/path; {progress:.5}
UPDATE file/at/my/file/path.txt

In short, I’d like to send paths, along (sometimes) with little tiny messages.

Also, each instance of the web app should only receive updates to paths they are listening to, and they should only be allowed to listen to paths they actually have access to. Or, at least, that should be the case once I implement permission management in the app &emdash; right now, anything goes, which is fine for the moment because it is limited to test use on our local network.

So, I made a little script, called Dolores, which handles all of this, and another script, named Cornelius, which connects to it (incidentally, there are two Corneliuses: a Python one, and a JavaScript one).

I have posted it here in case anyone is interested. I used Orbited in JavaScript to connect to the server. In a separate post in just a few moments, I’ll write about something a tad more useful than the Dolores & Cornelius code I’m writing about here: how to run the Orbited (and, actually, Dolores) as a Windows service.

Disclaimer: This is for interest only. I’m proud of it because I managed to make it, not because I actually think it is wonderful.

Before getting to the code, let me explain the API, which, if you have the server running, you can access manually using telnet:

You connect, and immediately receive:

I, Dolores, High Inquisitor, Hogwarts.
{A-THREAD-ID-HERE}

That thread ID can then be used by a controller thread to connect you to paths.

Speaking of controllers, to control, you’d just make a file named “control-{THREAD-ID}” in a folder named “threads” relative to Dolores.py. Then, you’d send:

CONTROL

Now, to connect or disconnect a thread to a path:

CONNECT {THREAD-ID} some/path/or/another
DISCONNECT {THREAD-ID} some/path/or/another

In the web application, a django view handles the connection of a client (who sends its thread id along with the connection request). By only allowing controllers to create the connections, the client is not able to listen in to places it isn’t allowed.

And to send an update to a path:

UPDATE some/path/or/another; {some: message, may: go, right: here, format: "any"}

Any listening thread will immediately receive:

UPDATE some/path/or/another; {some: message, may: go, right: here, format: "any"}

Just as you sent it.

And yes, part of the reason it is named Dolores (the name of a particularly nasty character in the Harry Potter universe) is because I believe it to be a somewhat ugly solution. But unlike the HP character, it works. The HP character&emdash;let’s not go there.

Now, without further ado:

from twisted.internet.protocol import Protocol, Factory
from twisted.internet import reactor
from twisted.protocols.basic import LineReceiver
import random, string
import os, os.path
import threading
import time
 
# This is Delores the High Inquisitor.
 
# Dolores is a series of tubes. To prevent clogging, there are separate tubes,
# rather than one big pipe everyone might get stuck in.
 
# A user can be connected to a tube. This allows the user to receive notifications
# through said tubes.
 
# The pipe server is not made for actual COMET, believe it or not. There is no
# persistence or anything. It is as simple as possible. Instead, we will try to
# use Orbited to make a nice connection between Dolores and the client.
 
# I don't know how this will hold up performance-wise, but it is simple enough
# that I think it will work for now.
 
# API:
# There are two kinds of connections: control and client. The api 
# Creating a connection immediately results in a response with a thread id. Thread
# ids consist of a few main parts: IPADDRESS-THREAD#-SECRET
# Where thread# is a sequential thread number, IPADDRESS is the address of the client,
# and SECRET is a random string of bytes made for the thread.
 
# Clients connecting should check first for a string starting with "I,", which means they
# successfully connected. They should then read another string with their thread id.
 
#	CONTROL
#			Attempts to make the thread a controller. This only works if a file
#			named "control-" and the thread id is in the "threads" folder (relative
#			to the Dolores.py file -- messy, I know)
#
#	CONNECT UID PATH 			(controller only)
#			Connects the thread with id UID to the specified path.
#
#	DISCONNECT UID PATH			(controller only)
#			Disconnects the thread with id UID from the specified path.
#
#	UPDATE PATH MESSAGE 		(controller only)
#			Sens a message regarding the path. Please keep the message twitter length.
#
#	CRASH UID					(controller only)
#			Aborts a thread. Could be used for testing whole (un-optimized/cometized) reload.
#
 
 
class Comet(LineReceiver):
	def __init__(self):
		self.listening = set()
		self.controls = False
 
	def connectionMade(self):
		self.transport.write("I, Dolores, High Inquisitor, Hogwarts.\n")
 
		secret = ""
		for i in range(32):
			secret += random.choice(string.ascii_letters + string.digits)
 
		self.id = str(self.transport.getPeer().host) + "-" + str(self.factory.nextThreadNumber()) + "-" + secret
		print "New Thread " + self.id
 
		self.transport.write(self.id + "\n")
		self.factory.comets[self.id] = self;
 
	def control(self):
		# if the file exists, do it.
		if os.path.exists(os.path.join(os.path.dirname(__file__), "threads", "control-" + self.id)):
			self.controls = True
			os.remove(os.path.join(os.path.dirname(__file__), "threads", "control-" + self.id))
		else:
			self.controls = False
 
	def connectionLost(self, reason):
		print "Closed Thread " + self.id
		del self.factory.comets[self.id]
		for listen in self.listening:
			self.factory.deregister(listen, self)
 
	def listen(self, path):
		self.factory.register(path.strip(), self)
		self.listening.add(path.strip())
 
	def ignore(self, path):
		self.factory.deregister(path.strip(), self)
		if path.strip() in self.listening:
				self.listening.remove(path.strip())
 
	def lineReceived(self, data):
		if data.startswith("CONNECT"):
			if not self.controls:
				self.transport.write("NOALLOW\n")
				return
			try:
				uid, path = data[7:].strip().split(" ", 1)
				uid = uid.strip()
				path = path.strip()
				print "Connect " + uid + " to " + path
 
				self.factory.comets[uid].listen(path)
				self.listening.add(data[6:].strip())
				self.transport.write("SUCCESS\n")
			except:
				print "Failed a connect attempt."
				self.transport.write("FAIL\n")
 
		elif data.startswith("DISCONNECT"):
			if not self.controls:
				self.transport.write("NOALLOW\n")
				return
			try:
				uid, path = data[10:].strip().split(" ", 1)
				uid = uid.strip()
				path = path.strip()
				if uid in self.factory.comets:
					self.factory.comets[uid].ignore(path)
					self.transport.write("SUCCESS")
					return
			except:
				pass
			self.transport.write("FAIL")
 
		elif data.startswith("CRASH"):
			if not self.controls:
				self.transport.write("NOALLOW\n")
				return
			try:
				uid = data[5:].strip()
				if uid in self.factory.comets:
					self.factory.comets[uid].transport.loseConnection()
				self.transport.write("SUCCESS")
			except:
				self.transport.write("FAIL")
 
		elif data.startswith("EXIT"):
			self.transport.loseConnection()
 
		elif data.startswith("UPDATE"):
			if not self.controls:
				self.transport.write("NOALLOW\n")
				return
			try:
				pieces = data[6:].split(";",1)
				path = pieces[0]
				message = ""
				if len(pieces) > 1:
					message = pieces[1]
				print "UPDATE " + path.strip() + ": " + message.strip()
				self.factory.message(path.strip(), message.strip())
				self.transport.write("SUCCESS")
			except:
				self.transport.write("FAIL\n")
 
		elif data.startswith("ALL"):
			if not self.controls:
				self.transport.write("NOALLOW\n")
				return
			try:
				path = data[3:]
				for i in self.factory.comets:
					self.factory.comets[i].sendMessage(path)
				self.transport.write("SUCCESS")
			except:
				self.transport.write("FAIL")
 
		elif data.startswith("CONTROL"):
			self.control()
			if not self.controls:
				self.transport.write("NOALLOW\n")
				return
 
	def sendMessage(self, path, message):
		self.transport.write("UPDATE " + path + ";" + message + "\n")
 
# Factory
class Manager(Factory):
	def __init__(self):
		self.protocol = Comet
		self.comets = {}
		self.lookup = {}
		self.threadCount = 0
 
	def startFactory(self):
		pass
 
	def stopFactory(self):
		pass
 
	def register(self, path, comet):
		if path not in self.lookup:
			self.lookup[path] = set()
 
		self.lookup[path].add(comet)
 
	def deregister(self, path, comet):
		if path not in self.lookup:
			return
 
		if comet not in self.lookup[path]:
			return
 
		self.lookup[path].remove(comet)
 
		if len(self.lookup[path]) == 0:
			del self.lookup[path]
 
	def message(self, path, message):
		if path not in self.lookup:
			return
 
		for i in self.lookup[path]:
			i.sendMessage(path, message)
 
	def nextThreadNumber(self):
		self.threadCount = self.threadCount + 1
		return self.threadCount
 
def thread_start():
	global dolores_reactor
	factory = Manager()
	reactor.listenTCP(8007, factory)
	reactor.run(installSignalHandlers=0)
	print "Dolores has been taken by the Centaurs. Clip clop."
 
 
dolores_thread = None
def start():
	print "Starting Dolores..."
	global dolores_thread
	dolores_thread = threading.Thread(target=thread_start)
	dolores_thread.start()
	print "Dolores has  now been appointed High Inquisitor."
 
def stop_reactor():
	reactor.stop()
 
def stop():
	reactor.callFromThread(stop_reactor)
 
if __name__ == "__main__":
    start()
    run = True
    while run:
    	try:
    		time.sleep(5)
    	except KeyboardInterrupt:
    		stop()
    		run = False

And, the arguably even uglier Cornelius, with raw socket communication:

cornelius_connection = None
 
import socket
import time
import os, os.path
 
class CorneliusFault(Exception):
	def __init__(self, value):
		self.value = value
	def __str__(self):
		return repr(self.value)
 
 
def _connect(reconnect = False):
	global cornelius_connection
	if cornelius_connection and reconnect:
		try:
			cornelius_connection.shutdown()
		except:
			pass
		cornelius_connection = None
 
	if not cornelius_connection:
		cornelius_connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		cornelius_connection.connect(("localhost", 8007))
		cornelius_connection.settimeout(1) # wait one second for response if needed. Hopefully won't be that long.
 
		buffer = read()
		if len(buffer) < 2:
			raise CorneliusFault("He's Not Back!")
 
		verify_dolores = buffer[0].strip()
		if verify_dolores.strip() != "I, Dolores, High Inquisitor, Hogwarts.":
			raise CorneliusFault("Lord Thingy! Lord Thingy! Dolores, why have you turned into Lord Thingy?")
 
		cornelius_thread_id = buffer[1].strip()
		f = open(os.path.dirname(os.path.abspath(__file__)) + "/threads/control-" + cornelius_thread_id, "w")
		f.write("you big dummy.")
		f.close()
		send("CONTROL\n\n", True)
		read()
 
def read():
	try:
		buffer = cornelius_connection.recv(4096)
		return buffer.split("\n")
	except:
		pass
 
def send(what, noconnect = False):
	if not noconnect:
		_connect()
	global cornelius_connection
	try:
		cornelius_connection.sendall(what + "\r\n")
	except:
		if not noconnect:
			_connect(True)
		cornelius_connection.sendall(what + "\r\n")
 
	try:
		read() # don't do anythin' with output yet. Just get rid of it... so it doesn't sit in some buffer
	except:
		pass
 
def update(path, message = ""):
	send("UPDATE " + path + "; " + message)
 
 
def connect(uid, path):
	send("CONNECT " + uid + " " + path)
 
def disconnect(uid, path):
	send("DISCONNECT " + uid + " " + path)
 
 
def crash(uid):
	send("CRASH " + uid)
 
def log():
	global cornelius_connection
	print cornelius_connection.read_very_eager()

Well, again, at least they work. And they are quite simple.

One Response to “Dolores & Cornelius, Sitting in a Tree. Of Comet, Orbited and Python.”

  1. [...] blog « Dolores & Cornelius, Sitting in a Tree. Of Comet, Orbited and Python. [...]

Leave a Reply