Bake-off Code 2
Submitted by Kevin Kleinfelter on Thu, 2007-05-03 20:13.
asession.py
gsession.py
osession.py
Python Code
test-session.py
#!/usr/bin/env python
# Test suite for my Outlook and Google session objects.
import sys
sys.path.append('..')
import unittest
from google.gsession import GoogleSession
from outlook.osession import OutlookSession
class TestSession(unittest.TestCase):
#################################################################
# I can set these to fixed values to prevent a prompt for ID/password,
# but I don't want to post to my blog with meaningful values here!
#################################################################
test_user = None
test_password = None
#################################################################
# unittest requires me to name these test_*
# I'm naming things test_nn_* in order to control the order of execution (it
# runs them in alphabetical order).
#################################################################
def test_01_session_start_up_and_shut_down(self):
gsess = GoogleSession(TestSession.test_user, TestSession.test_password)
osess = OutlookSession('TestCalendar', 'dummy', 'dummy')
self.sessionUpDown(gsess)
self.sessionUpDown(osess)
def test_02_fetch_full_calendar(self):
osess = OutlookSession('TestCalendar', 'dummy', 'dummy').up()
gsess = GoogleSession(TestSession.test_user, TestSession.test_password).up()
gcal = gsess.fetchFullCalendar()
ocal = osess.fetchFullCalendar()
assert type(gcal) == list
assert type(ocal) == list
def test_03_store_full_calendar(self):
osess = OutlookSession('TestCalendar').up()
gsess = GoogleSession(TestSession.test_user, TestSession.test_password).up()
gcal = gsess.fetchFullCalendar()
ocal = osess.fetchFullCalendar()
if len(gcal) < 1: raise "Need you to create events in your Google test calendar for testing."
if len(ocal) < 1: raise "Need you to create events in your Outlook test calendar for testing."
gsess.storefullcalendar(gcal)
osess.storefullcalendar(ocal)
def sessionupdown(self, sess):
assert (not sess.isup())
sess.up()
assert sess.isup()
sess.down()
assert (not sess.isup())
if __name__ ="=" '__main__':
if testsession.test_user ="=" none:
testsession.test_user ="" raw_input("enter Google user ID:")
if testsession.test_password ="=" none:
testsession.test_password ="" raw_input("enter Google password:")
unittest.main()
asession.py
#!/usr/bin/env python
# Abstract Session objects -- GoogleSession and OutlookSession are derived
# from this.
# (There was a system file session.py that was interfering with my ability to
# define a session.py)
import urllib, urllib2, httplib, socket
class Session:
# Outlook AND GMail
START = 0
END = 1
VISIBILITY = 2
CREATED = 3
MODIFIED = 4
LOCATION = 5
MEETING_STATUS = 6
BUSY = 7
SUBJECT = 8
GUID = 9
RECURS = 10
# GMail
AUTHOR_NAME = 100
AUTHOR_EMAIL = 101
EDIT_URL = 102
BODY = 103 # Can't use Outlook's body because it triggers the security warning
def __init__(self, userid=None, password=None):
if userid == None: userid = raw_input("enter " + self.getServiceName() + " user ID:")
if password == None: password = raw_input("enter " + self.getServiceName() + " password:")
self.userid = userid
self.password = password
# Handy if you have a Session and don't know which kind it is.
def getServiceName(self):
return "Abstract Session"
#######################################################################
# Based on http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/456195
class ProxyHTTPConnection(httplib.HTTPConnection):
_ports = {'http' : 80, 'https' : 443}
def request(self, method, url, body=None, headers={}):
proto, rest = urllib.splittype(url)
host, rest = urllib.splithost(rest)
host, port = urllib.splitport(host)
if port is None:
port = self._ports[proto]
else:
port = int(port)
self._real_host = host
self._real_port = port
httplib.HTTPConnection.request(self, method, url, body, headers)
def connect(self):
httplib.HTTPConnection.connect(self)
self.send("CONNECT %s:%d HTTP/1.0\r\n\r\n" % (self._real_host, self._real_port))
response = self.response_class(self.sock, strict=self.strict, method=self._method)
(version, code, message) = response._read_status()
if code != 200:
self.close()
raise socket.error, "Proxy CONNECT %s:%d failed: %d %s" % (self._real_host, self._real_port, code, message.strip())
#eat up header block from proxy....
#Throws an error if there is no trailing \r\n
while True:
line = response.fp.readline()
if line == '\r\n':
return
##################################################
class ProxyHTTPSConnection(ProxyHTTPConnection):
default_port = 443
def __init__(self, host, port = None):
ProxyHTTPConnection.__init__(self, host, port)
# wrapper for httplib 'connect' - supporting http via CONNECT
def connect(self):
ProxyHTTPConnection.connect(self)
ssl = socket.ssl(self.sock)
self.sock = httplib.FakeSocket(self.sock, ssl)
gsession.py
#!/usr/bin/env python
#########################################################
# Provides a 'session' for communicating with Google about my Calendar,
# and provides primitive operations for retrieving and storing my
# calendar to Google.
#########################################################
from asession import Session, ProxyHTTPSConnection
from urllib import splittype
import urllib, urllib2, httplib
from xml.dom import minidom
# TODO - make sure no user data can mess this up (maybe by """ in the appointment text?)
APPOINTMENT_XML_MIDDLE = """
%s
%s %s
"""
APPOINTMENT_XML_BEGIN = ""
APPOINTMENT_XML_END = " "
class GoogleSession(Session):
USER_AGENT = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
def __init__(self, userid=None, password=None):
Session.__init__(self, userid, password)
self.google_key = None
proxies = urllib.getproxies()
self.http_proxy = proxies['http']
self.https_proxy = proxies['https']
# Handy if you have a Session and don't know which kind it is.
def getServiceName(self):
return "Google"
def isUp(self):
return self.google_key != None
def itemCount(self):
if not self.isUp():
return 0
else:
return
def _unpackProxyString(self, astring):
urltype, the_rest = splittype(astring) # Split http://host:port/bar into http and //host:port/bar
host, the_rest2 = urllib.splithost(the_rest) # split into host:port and /bar
host, port = urllib.splitport(host) # Split into host and port
return (host, port)
def _login(self, max_redirect_count, userid, password):
# urllib.urlopen is supposed to transparently handle proxy server via
# an http_proxy environment variable. It turns out that this is
# true -- and that you need an httpS_proxy environment variable if you
# want to proxy httpS.
# urllib2's automatic proxy failed on BellSouth's proxy (auth required)
# for httpS, so I'll use CONNECT instead. Unfortunately it turned out to
# be tough to have gsession handle the proxy -- urllib2 kept trying to
# do so IN ADDITION. I found a seriously hacky way to tell it not to,
# but going straight to httplib is easier and safer than subverting
# urllib2.
# Note: urllib2 first tries to pick up HTTP_PROXY / HTTPS_PROXY from
# the environment. Failing that it ALSO tries to pick up proxy settings
# from Internet Explorer (in the registry). All of this even if I'm
# trying to manually handle proxy.
data = urllib.urlencode({'Email':userid, 'Passwd':password, 'service':'cl', 'source':'test-login-seq'})
headers = {"Content-type":"application/x-www-form-urlencoded", 'User-Agent':self.USER_AGENT}
if self.https_proxy:
host, port = self._unpackProxyString(self.https_proxy)
con1 = ProxyHTTPSConnection(host, port)
con1.request("POST", "https://www.google.com:443/accounts/ClientLogin", data, headers)
else:
host = "www.google.com"
port = 443
con1 = httplib.HTTPSConnection(host, port)
con1.request("POST", "/accounts/ClientLogin", data, headers)
response = con1.getresponse()
text = response.read()
con1.close()
if response.status == 200:
pass
else:
if self.https_proxy == None:
self.https_proxy="(none)"
raise urllib2.URLError, "Bad response from login (proxy=" + self.https_proxy + "). " + str(response.status) + "; " + response.reason
# Get the Google magic token from the response.
for line in text.splitlines():
if line.startswith('Auth='):
dummy,self.google_key = line.split("=")
if self.google_key == None:
raise "could not find login key in server response"
return [response, text]
# Start the session.
def up(self):
res = self._login(5, self.userid, self.password)
return self
# Shut-down the session.
def down(self):
self.google_key = None
# Do an HTTP GET to Google, using the Google magic token for security.
def _gget(self, url, max_retry_count = 3):
headers = { 'User-Agent' : self.USER_AGENT }
headers['Authorization'] = 'GoogleLogin auth=' + self.google_key
req = urllib2.Request(url, None, headers)
# Note: urlopen handles redirects and proxies. See comments about
# proxy handling elsewhere in this file.
result = urllib2.urlopen(req)
text = result.read()
return [result, text]
def getLocationHeader(self, response):
try:
return response.msg.dict['location']
except:
return None
# Does a post, adding the GoogleLogin Auth header
def _http_post(self, url, body = None, content_type = None, override = None):
# Note: urlopen 'follows' a POST that gets redirected with a GET.
# Conclusion: using urllib/urllib2 for http is more trouble than it is worth with
# GDATA, except for simple http GETs.
body_length = 0
if body:
body_length = len(body)
header1 = {}
header1["Authorization"] = "GoogleLogin auth=" + self.google_key
if override:
header1['X-HTTP-Method-Override'] = override
if content_type:
header1["Content-Type"] = content_type
header1['Content-Length'] = str(body_length)
if self.http_proxy:
host, port = self._unpackProxyString(self.http_proxy)
con1 = httplib.HTTPConnection(host, port)
con1.request("POST", url, body, header1)
response = con1.getresponse()
text = response.read()
con1.close()
else:
urltype, the_rest = splittype(url) # Split http://host:port/bar into http and //host:port/bar
dummy, path = urllib.splithost(the_rest) # split into host:port and /bar
con1 = httplib.HTTPConnection("www.google.com", 80)
con1.request("POST", path, body, header1)
response = con1.getresponse()
text = response.read()
con1.close()
if response.status == 200:
pass
elif response.status == 201:
pass
elif response.status == 302:
url = self.getLocationHeader(response)
# TODO prevent infinite redirect loop
self._http_post(url, body, content_type, override)
else:
raise urllib2.URLError, "Bad response from POST." + str(response.status) + "; " + response.reason
return True
# HTTP POST with X-HTTP-Method-Override=DELETE -- to get through the
# corp firewall.
def _http_post_delete(self, url):
return self._http_post(url, None, 'application/atom+xml', 'DELETE')
# Don't want to quit if DOM objects of this form are missing from the data.
def _safeGetTagAttribute(self, aitem, tagname, attrname):
try:
return aitem.getElementsByTagName(tagname)[0].attributes[attrname].value
except:
print 'DEBUG exception 1 caught'
return None
# Don't want to quit if DOM objects of this form are missing from the data.
def _safeGetTagAttributeValue(self, aitem, tagname, attrname):
try:
return aitem.getElementsByTagName(tagname)[0].attributes[attrname].nodeValue
except:
print 'DEBUG exception 1 caught on ' + tagname + ', ' + attrname
return None
# Don't want to quit if DOM objects of this form are missing from the data.
def _safeGetTagValue(self, aitem, tagname):
try:
return aitem.getElementsByTagName(tagname)[0].firstChild.nodeValue
except:
print 'DEBUG exception 2 caught on ' + tagname
return None
# Convert a DOM event into a dict session-event
def _makeSessionEvent(self, aitem):
item = {}
auth = aitem.getElementsByTagName('author')[0]
item[Session.GUID] = None
item[Session.EDIT_URL] = None
if aitem.getElementsByTagName('gd:when'):
item[Session.START] = self._safeGetTagAttribute(aitem, 'gd:when', 'startTime')
item[Session.END] = self._safeGetTagAttribute(aitem, 'gd:when', 'endTime')
item[Session.RECURS] = False
elif aitem.getElementsByTagName('gd:recurrence'):
item[Session.START] = None
item[Session.END] = None
item[Session.RECURS] = True
item[Session.VISIBILITY] = self._safeGetTagAttribute(aitem, 'gd:visibility', 'value')
item[Session.CREATED] = self._safeGetTagValue(aitem, 'published')
item[Session.MODIFIED] = self._safeGetTagValue(aitem, 'updated')
item[Session.LOCATION] = self._safeGetTagAttributeValue(aitem, 'gd:where', 'valueString')
item[Session.BUSY] = self._safeGetTagAttributeValue(aitem, 'gd:transparency', 'value')
item[Session.SUBJECT] = self._safeGetTagValue(aitem, 'title')
item[Session.BODY] = self._safeGetTagValue(aitem, 'content')
item[Session.MEETING_STATUS] = self._safeGetTagAttributeValue(aitem, 'gd:eventStatus', 'value')
item[Session.AUTHOR_NAME] = self._safeGetTagValue(auth, 'name')
item[Session.AUTHOR_EMAIL] = self._safeGetTagValue(auth, 'email')
tmp = aitem.getElementsByTagName('link')
for x in tmp:
if x.attributes['rel'].value == 'self':
item[Session.GUID] = x.attributes['href'].value
if x.attributes['rel'].value == 'edit':
item[Session.EDIT_URL] = x.attributes['href'].value
return item
#retrieve the full calendar
def fetchFullCalendar(self):
result, text = self._gget("http://www.google.com/calendar/feeds/default/private/full?max-results=999999")
xml = minidom.parseString(text)
items = xml.getElementsByTagName('entry')
items2 = [self._makeSessionEvent(x) for x in items]
# TODO: Make this a deep copy?
self.lastFullFetchedCalendar = items2
return items2
def _make_safe_for_xml(self, source):
#TODO - convert special characters into XML character entities.
target = source
return target
# Convert a dict session-event into an XML event. (Don't need full DOM,
# so I just do string substitution.)
def _toXML(self, item):
mytitle = '
mycontent = '
mywhere = '
mytitle = self._make_safe_for_xml(item[Session.SUBJECT])
mycontent = self._make_safe_for_xml(item[Session.BODY])
myname = "Ann E. Body"
myemail = "nobody@example.com"
mytransparency = item[Session.BUSY]
mystatus = item[Session.MEETING_STATUS]
mywhere = self._make_safe_for_xml(item[Session.LOCATION])
mystart = item[Session.START] # Must be format yyyy-mm-ddThh:mm:ssZ
myend = item[Session.END]
xml = APPOINTMENT_XML_MIDDLE % (mytitle, mycontent, myname, myemail, mytransparency, mystatus, mywhere, mystart, myend)
return xml
# TODO: Look into batch API at http://code.google.com/apis/gdata/batch.html.
# No batch support for Calendar as of 12/8/2006, but keep checking.
def _deleteAllEventsFromGoogle(self):
cal = self.lastFullFetchedCalendar
count = len(cal)
for event in cal:
self._http_post_delete(event[Session.EDIT_URL])
return True
def storeFullCalendar(self, cal):
assert(isinstance(cal, list))
if len(cal) > 0:
assert(isinstance(cal[0], dict))
# TODO - improve gross inefficiency
# TODO - improve data integrity -- If it crashes after the delete but before the save, I don't want to lose data!
self._deleteAllEventsFromGoogle()
xml = ""
for item in cal:
xml = xml + APPOINTMENT_XML_BEGIN + self._toXML(item) + APPOINTMENT_XML_END
if len(xml) < 1:
return True
url = "http://www.google.com/calendar/feeds/default/private/full"
self._http_post(url, xml, 'application/atom+xml')
return True
osession.py
#!/usr/bin/env python
#########################################################
# Provides a 'session' for communicating with Outlook about my Calendar,
# and provides primitive operations for retrieving and storing my
# calendar to Outlook.
#########################################################
import win32com.client
from asession import Session
class OutlookSession(Session):
# OlRecurrenceState Constants
OlApptNotRecurring = 0
OlApptMaster = 1
OlApptOccurrence = 2
OlApptException = 3
# Outlook appointment statuses
OlNonMeeting = 0
OlMeeting = 1
OlMeetingReceived = 3
OlMeetingCanceled = 5
OlMeetingCanceledToo = 7 # Not documented; my live data contained an example of this.
# Outlook visibility constants
OlNormal = 0
OlPersonal = 1
OlPrivate = 2
OlConfidential = 3
# Outlook appointments can be busy/free/out-of-office/tentative
OlFree = 0
OlTentative = 1
OlBusy = 2
OlOutOfOffice = 3
# Items in an Outlook folder can be one of these types of item
OlMailItem = 0
OlAppointmentItem = 1
OlContactItem = 2
OlTaskItem = 3
# Values for GetDefaultFolder(x)
OlFolderCalendar = 9
OlFolderContacts = 10
OlFolderJournal = 11
OlFolderNotes = 12
OlFolderTasks = 13
OlFolderReminders1 = 14
OlFolderReminders2 = 15
OlFolderDrafts = 16
def __init__(self, calendar_name='Calendar', userid='dummy', password='dummy'):
Session.__init__(self, userid, password)
self.down()
self.calendar_name = calendar_name
# Handy if you have a Session and don't know which kind it is.
def getServiceName(self):
return "Outlook"
# Start the session.
def up(self):
self.outlook = win32com.client.gencache.EnsureDispatch("Outlook.Application")
self.mapi = self.outlook.GetNamespace("MAPI")
self.folders = self.mapi.Folders.Item(1).Folders #Usually the user's folders
self.olCalendar = self.folders.Item(self.calendar_name)
return self
# Shut-down the session.
def down(self):
self.outlook = None
self.olCalendar = None
self.mapi = None
def isUp(self):
return self.mapi != None
# Convert a COM event into a dict session-event
def _makeSessionEvent(self, aitem):
item = {}
item[Session.START] = aitem.Start
item[Session.END] = aitem.End
item[Session.VISIBILITY] = aitem.Sensitivity
item[Session.CREATED] = aitem.CreationTime
item[Session.MODIFIED] = aitem.LastModificationTime
item[Session.LOCATION] = aitem.Location
item[Session.MEETING_STATUS] = aitem.MeetingStatus
item[Session.BUSY] = aitem.BusyStatus
item[Session.SUBJECT] = aitem.Subject
item[Session.GUID] = aitem.EntryID
item[Session.RECURS] = (aitem.RecurrenceState != OutlookSession.OlApptNotRecurring)
# Touching Body seems to set off the Outlook a-program-is-accessing-your-email
# TODO: Use Redemption DLL to prevent the warning.
# item[Session.BODY] = aitem.Body
# These are appointments with no one else invited?
if aitem.MeetingStatus == OutlookSession.OlNonMeeting:
item[Session.MEETING_STATUS] = OutlookSession.OUTLOOK_STATUS_CONFIRMED
return item
#retrieve the full calendar
def fetchFullCalendar(self):
assert self.isUp()
events = []
count = len(self.olCalendar.Items)
self.olCalendar.Items.Sort("[Start]")
self.olCalendar.Items.IncludeRecurrences = False
for i in range(1, count + 1):
item = self.olCalendar.Items.Item(i)
events.append(self._makeSessionEvent(item))
return events
# TODO: Can I bulk-delete without iterating?
def _deleteAllEventsFromOutlook(self):
count = len(self.olCalendar.Items)
self.olCalendar.Items.IncludeRecurrences = False
count = len(self.olCalendar.Items)
guids = [self.olCalendar.Items.Item(i+1).EntryID for i in range(count)] # I realize this is not Pythonic, but you have to call Outlook with successive integers.
for guid in guids:
self.olCalendar.Session.GetItemFromID(guid).Delete()
return True
def storeFullCalendar(self, cal):
assert(isinstance(cal, list))
if len(cal) > 0:
assert(isinstance(cal[0], dict))
# TODO - improve gross inefficiency
# TODO - improve data integrity -- If it crashes after the delete but before the save, I don't want to lose data!
self._deleteAllEventsFromOutlook()
for item in cal:
oEvent = self.olCalendar.Items.Add(self.OlAppointmentItem)
oEvent.Subject = item[Session.SUBJECT]
oEvent.Start = item[Session.START]
oEvent.End = item[Session.END]
oEvent.BusyStatus = item[Session.BUSY]
oEvent.Location = item[Session.LOCATION]
oEvent.MeetingStatus = item[Session.MEETING_STATUS]
oEvent.ReminderMinutesBeforeStart = 0
oEvent.ReminderSet = True
oEvent.Sensitivity = item[Session.VISIBILITY]
oEvent.ClearRecurrencePattern()
oEvent.Save()
item[Session.GUID] = oEvent.EntryID
return True
Learnings During this Cycle
Python:- I don't like explicit "self." It is all too easy to leave it out, creating bugs to fix. It is overly verbose, and it includes just as many special characters as using the "@" sigil (I'm counting the period.)I know it is necessary in Python.
- I miss 'unless' and my statement modifiers ('if' and 'unless' after a statement).
- I'm fine with Python strings being immutable. I'm not happy about not being able to point a string function parameter at another string. It means I have to return a new string, and it prevents me from modifying multiple string parameters. And it is one more way in which Python is irregular.
- I've been using ActivePython 2.4.3. Then I got "urlopen error unknown url type: https" on https URLs. That's when I learned that ActivePython doesn't include SSL support. Replaced it with Python 2.5 from python.org.
- Why is it that only Visual Basic (V5) lets you modify a function as you step through it and provide full access to data/functions from its debug/immediate window?
- Code intelligence (function prototype checking) is important in a dynamic language's IDE/editor. It is the only clue you get to an improper method call until runtime.

Delicious
Digg
Reddit