Bake-off Code 2

Python Code

test-session.py

#!/usr/bin/env python
# Test suite for my Outlook and Google session objects.

import sys
sys.path.append('..')
import unittest
from google.gsession  import GoogleSession
from outlook.osession import OutlookSession


class TestSession(unittest.TestCase):

    #################################################################
    # I can set these to fixed values to prevent a prompt for ID/password,
    # but I don't want to post to my blog with meaningful values here!
    #################################################################
    test_user = None
    test_password = None

    #################################################################
    # unittest requires me to name these test_*
    # I'm naming things test_nn_* in order to control the order of execution (it
    # runs them in alphabetical order).
    #################################################################
    def test_01_session_start_up_and_shut_down(self):
        gsess = GoogleSession(TestSession.test_user, TestSession.test_password)
        osess = OutlookSession('TestCalendar', 'dummy', 'dummy')
        self.sessionUpDown(gsess)
        self.sessionUpDown(osess)

    def test_02_fetch_full_calendar(self):
        osess = OutlookSession('TestCalendar', 'dummy', 'dummy').up()
        gsess = GoogleSession(TestSession.test_user, TestSession.test_password).up()

        gcal = gsess.fetchFullCalendar()
        ocal = osess.fetchFullCalendar()

        assert type(gcal) == list
        assert type(ocal) == list

    def test_03_store_full_calendar(self):
        osess = OutlookSession('TestCalendar').up()
        gsess = GoogleSession(TestSession.test_user, TestSession.test_password).up()

        gcal = gsess.fetchFullCalendar()
        ocal = osess.fetchFullCalendar()

        if len(gcal) < 1: raise "Need you to create events in your Google test calendar for testing."
        if len(ocal) < 1: raise "Need you to create events in your Outlook test calendar for testing."

        gsess.storefullcalendar(gcal)
        osess.storefullcalendar(ocal)

    def sessionupdown(self, sess):
        assert (not sess.isup())
        sess.up()
        assert sess.isup()
        sess.down()
        assert (not sess.isup())


if __name__ ="=" '__main__':
    if testsession.test_user ="=" none:
        testsession.test_user ="" raw_input("enter Google user ID:")
    if testsession.test_password ="=" none:
        testsession.test_password ="" raw_input("enter Google password:")
    unittest.main()


asession.py
#!/usr/bin/env python
# Abstract Session objects -- GoogleSession and OutlookSession are derived
# from this.
# (There was a system file session.py that was interfering with my ability to
# define a session.py)

import urllib, urllib2, httplib, socket

class Session:
    # Outlook AND GMail
    START          = 0
    END            = 1
    VISIBILITY     = 2
    CREATED        = 3
    MODIFIED       = 4
    LOCATION       = 5
    MEETING_STATUS = 6
    BUSY           = 7
    SUBJECT        = 8
    GUID           = 9
    RECURS         = 10

    # GMail
    AUTHOR_NAME    = 100
    AUTHOR_EMAIL   = 101
    EDIT_URL       = 102
    BODY           = 103   # Can't use Outlook's body because it triggers the security warning

    def __init__(self, userid=None, password=None):
        if userid == None: userid = raw_input("enter " + self.getServiceName() + " user ID:")
        if password == None: password = raw_input("enter " + self.getServiceName() + " password:")

        self.userid = userid
        self.password = password

    # Handy if you have a Session and don't know which kind it is.
    def getServiceName(self):
        return "Abstract Session"

#######################################################################
# Based on http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/456195
class ProxyHTTPConnection(httplib.HTTPConnection):

    _ports = {'http' : 80, 'https' : 443}

    def request(self, method, url, body=None, headers={}):
        proto, rest = urllib.splittype(url)
        host,  rest = urllib.splithost(rest)
        host,  port = urllib.splitport(host)
        if port is None:
            port = self._ports[proto]
        else:
            port = int(port)
        self._real_host = host
        self._real_port = port
        httplib.HTTPConnection.request(self, method, url, body, headers)

    def connect(self):
        httplib.HTTPConnection.connect(self)
        self.send("CONNECT %s:%d HTTP/1.0\r\n\r\n" % (self._real_host, self._real_port))
        response = self.response_class(self.sock, strict=self.strict, method=self._method)
        (version, code, message) = response._read_status()

        if code != 200:
            self.close()
            raise socket.error, "Proxy CONNECT %s:%d failed: %d %s" % (self._real_host, self._real_port, code, message.strip())

        #eat up header block from proxy....
        #Throws an error if there is no trailing \r\n
        while True:
            line = response.fp.readline()
            if line == '\r\n':
                return

##################################################
class ProxyHTTPSConnection(ProxyHTTPConnection):

    default_port = 443

    def __init__(self, host, port = None):
        ProxyHTTPConnection.__init__(self, host, port)

    # wrapper for httplib 'connect' - supporting http via CONNECT
    def connect(self):
        ProxyHTTPConnection.connect(self)
        ssl = socket.ssl(self.sock)
        self.sock = httplib.FakeSocket(self.sock, ssl)


gsession.py
#!/usr/bin/env python
#########################################################
# Provides a 'session' for communicating with Google about my Calendar,
# and provides primitive operations for retrieving and storing my
# calendar to Google.
#########################################################

from asession import Session, ProxyHTTPSConnection
from urllib import splittype
import urllib, urllib2, httplib
from xml.dom import minidom


# TODO - make sure no user data can mess this up (maybe by """ in the appointment text?)
APPOINTMENT_XML_MIDDLE = """
%s%s%s"""


APPOINTMENT_XML_BEGIN = ""
APPOINTMENT_XML_END = ""


class GoogleSession(Session):
    USER_AGENT = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'

    def __init__(self, userid=None, password=None):
        Session.__init__(self, userid, password)
        self.google_key  = None
        proxies          = urllib.getproxies()
        self.http_proxy  = proxies['http']
        self.https_proxy = proxies['https']

    # Handy if you have a Session and don't know which kind it is.
    def getServiceName(self):
        return "Google"

    def isUp(self):
        return self.google_key != None

    def itemCount(self):
        if not self.isUp():
            return 0
        else:
            return

    def _unpackProxyString(self, astring):
        urltype, the_rest = splittype(astring)          # Split http://host:port/bar into http and //host:port/bar
        host, the_rest2   = urllib.splithost(the_rest)  # split into host:port and /bar
        host, port        = urllib.splitport(host)      # Split into host and port
        return (host, port)


    def _login(self, max_redirect_count, userid, password):
        # urllib.urlopen is supposed to transparently handle proxy server via
        # an http_proxy environment variable.  It turns out that this is
        # true -- and that you need an httpS_proxy environment variable if you
        # want to proxy httpS.
        # urllib2's automatic proxy failed on BellSouth's proxy (auth required)
        # for httpS, so I'll use CONNECT instead. Unfortunately it turned out to
        # be tough to have gsession handle the proxy -- urllib2 kept trying to
        # do so IN ADDITION. I found a seriously hacky way to tell it not to,
        # but going straight to httplib is easier and safer than subverting
        # urllib2.
        # Note: urllib2 first tries to pick up HTTP_PROXY / HTTPS_PROXY from
        # the environment.  Failing that it ALSO tries to pick up proxy settings
        # from Internet Explorer (in the registry).  All of this even if I'm
        # trying to manually handle proxy.

        data = urllib.urlencode({'Email':userid, 'Passwd':password, 'service':'cl', 'source':'test-login-seq'})
        headers = {"Content-type":"application/x-www-form-urlencoded", 'User-Agent':self.USER_AGENT}

        if self.https_proxy:
            host, port = self._unpackProxyString(self.https_proxy)
            con1 = ProxyHTTPSConnection(host, port)
            con1.request("POST", "https://www.google.com:443/accounts/ClientLogin", data, headers)
        else:
            host = "www.google.com"
            port = 443
            con1 = httplib.HTTPSConnection(host, port)
            con1.request("POST", "/accounts/ClientLogin", data, headers)
        response = con1.getresponse()
        text = response.read()
        con1.close()

        if response.status == 200:
            pass
        else:
            if self.https_proxy == None:
                self.https_proxy="(none)"
            raise urllib2.URLError, "Bad response from login (proxy=" + self.https_proxy + "). " + str(response.status) + "; " + response.reason

        # Get the Google magic token from the response.
        for line in text.splitlines():
            if line.startswith('Auth='):
                dummy,self.google_key = line.split("=")
        if self.google_key == None:
            raise "could not find login key in server response"

        return [response, text]

    # Start the session.
    def up(self):
        res = self._login(5, self.userid, self.password)
        return self

    # Shut-down the session.
    def down(self):
        self.google_key = None

    # Do an HTTP GET to Google, using the Google magic token for security.
    def _gget(self, url, max_retry_count = 3):
        headers = { 'User-Agent' : self.USER_AGENT }
        headers['Authorization'] = 'GoogleLogin auth=' + self.google_key

        req = urllib2.Request(url, None, headers)
        # Note: urlopen handles redirects and proxies. See comments about
        # proxy handling elsewhere in this file.
        result = urllib2.urlopen(req)
        text = result.read()
        return [result, text]

    def getLocationHeader(self, response):
        try:
            return response.msg.dict['location']
        except:
            return None

    # Does a post, adding the GoogleLogin Auth header
    def _http_post(self, url, body = None, content_type = None, override = None):
        # Note: urlopen 'follows' a POST that gets redirected with a GET.
        # Conclusion: using urllib/urllib2 for http is more trouble than it is worth with
        # GDATA, except for simple http GETs.
        body_length = 0
        if body:
            body_length = len(body)
        header1 = {}
        header1["Authorization"] = "GoogleLogin auth=" + self.google_key
        if override:
            header1['X-HTTP-Method-Override'] = override
        if content_type:
            header1["Content-Type"] = content_type
            header1['Content-Length'] = str(body_length)

        if self.http_proxy:
            host, port = self._unpackProxyString(self.http_proxy)
            con1 = httplib.HTTPConnection(host, port)
            con1.request("POST", url, body, header1)
            response = con1.getresponse()
            text     = response.read()
            con1.close()
        else:
            urltype, the_rest = splittype(url)        # Split http://host:port/bar into http and //host:port/bar
            dummy, path  = urllib.splithost(the_rest) # split into host:port and /bar
            con1 = httplib.HTTPConnection("www.google.com", 80)
            con1.request("POST", path, body, header1)
            response = con1.getresponse()
            text = response.read()
            con1.close()

        if response.status == 200:
            pass
        elif response.status == 201:
            pass
        elif response.status == 302:
            url = self.getLocationHeader(response)
            # TODO prevent infinite redirect loop
            self._http_post(url, body, content_type, override)
        else:
            raise urllib2.URLError, "Bad response from POST." + str(response.status) + "; " + response.reason

        return True

    # HTTP POST with X-HTTP-Method-Override=DELETE -- to get through the
    # corp firewall.
    def _http_post_delete(self, url):
        return self._http_post(url, None, 'application/atom+xml', 'DELETE')

    # Don't want to quit if DOM objects of this form are missing from the data.
    def _safeGetTagAttribute(self, aitem, tagname, attrname):
        try:
            return aitem.getElementsByTagName(tagname)[0].attributes[attrname].value
        except:
            print 'DEBUG exception 1 caught'
            return None

    # Don't want to quit if DOM objects of this form are missing from the data.
    def _safeGetTagAttributeValue(self, aitem, tagname, attrname):
        try:
            return aitem.getElementsByTagName(tagname)[0].attributes[attrname].nodeValue
        except:
            print 'DEBUG exception 1 caught on ' + tagname + ', ' + attrname
            return None

    # Don't want to quit if DOM objects of this form are missing from the data.
    def _safeGetTagValue(self, aitem, tagname):
        try:
            return aitem.getElementsByTagName(tagname)[0].firstChild.nodeValue
        except:
            print 'DEBUG exception 2 caught on ' + tagname
            return None

    # Convert a DOM event into a dict session-event
    def _makeSessionEvent(self, aitem):
        item = {}
        auth = aitem.getElementsByTagName('author')[0]
        item[Session.GUID]     = None
        item[Session.EDIT_URL] = None

        if aitem.getElementsByTagName('gd:when'):
            item[Session.START]        = self._safeGetTagAttribute(aitem, 'gd:when', 'startTime')
            item[Session.END]          = self._safeGetTagAttribute(aitem, 'gd:when', 'endTime')
            item[Session.RECURS]       = False
        elif aitem.getElementsByTagName('gd:recurrence'):
            item[Session.START]        = None
            item[Session.END]          = None
            item[Session.RECURS]       = True

        item[Session.VISIBILITY]       = self._safeGetTagAttribute(aitem, 'gd:visibility', 'value')
        item[Session.CREATED]          = self._safeGetTagValue(aitem, 'published')
        item[Session.MODIFIED]         = self._safeGetTagValue(aitem, 'updated')
        item[Session.LOCATION]         = self._safeGetTagAttributeValue(aitem, 'gd:where', 'valueString')
        item[Session.BUSY]             = self._safeGetTagAttributeValue(aitem, 'gd:transparency', 'value')
        item[Session.SUBJECT]          = self._safeGetTagValue(aitem, 'title')
        item[Session.BODY]             = self._safeGetTagValue(aitem, 'content')
        item[Session.MEETING_STATUS]   = self._safeGetTagAttributeValue(aitem, 'gd:eventStatus', 'value')
        item[Session.AUTHOR_NAME]  = self._safeGetTagValue(auth, 'name')
        item[Session.AUTHOR_EMAIL] = self._safeGetTagValue(auth, 'email')

        tmp = aitem.getElementsByTagName('link')
        for x in tmp:
            if x.attributes['rel'].value == 'self':
                item[Session.GUID]     = x.attributes['href'].value
            if x.attributes['rel'].value == 'edit':
                item[Session.EDIT_URL] = x.attributes['href'].value

        return item


    #retrieve the full calendar
    def fetchFullCalendar(self):
        result, text = self._gget("http://www.google.com/calendar/feeds/default/private/full?max-results=999999")
        xml          = minidom.parseString(text)
        items        = xml.getElementsByTagName('entry')
        items2       = [self._makeSessionEvent(x) for x in items]
        # TODO: Make this a deep copy?
        self.lastFullFetchedCalendar = items2
        return items2

    def _make_safe_for_xml(self, source):
        #TODO - convert special characters into XML character entities.
        target = source
        return target

    # Convert a dict session-event into an XML event.  (Don't need full DOM,
    # so I just do string substitution.)
    def _toXML(self, item):
        mytitle = '
        mycontent = '
        mywhere = '

        mytitle         = self._make_safe_for_xml(item[Session.SUBJECT])
        mycontent       = self._make_safe_for_xml(item[Session.BODY])
        myname          = "Ann E. Body"
        myemail         = "nobody@example.com"
        mytransparency  = item[Session.BUSY]
        mystatus        = item[Session.MEETING_STATUS]
        mywhere         = self._make_safe_for_xml(item[Session.LOCATION])
        mystart         = item[Session.START] # Must be format yyyy-mm-ddThh:mm:ssZ
        myend           = item[Session.END]
        xml = APPOINTMENT_XML_MIDDLE % (mytitle, mycontent, myname, myemail, mytransparency, mystatus, mywhere, mystart, myend)
        return xml

    # TODO: Look into batch API at http://code.google.com/apis/gdata/batch.html.
    #       No batch support for Calendar as of 12/8/2006, but keep checking.
    def _deleteAllEventsFromGoogle(self):
        cal = self.lastFullFetchedCalendar
        count = len(cal)
        for event in cal:
            self._http_post_delete(event[Session.EDIT_URL])
        return True

    def storeFullCalendar(self, cal):
        assert(isinstance(cal, list))
        if len(cal) > 0:
            assert(isinstance(cal[0], dict))

        # TODO - improve gross inefficiency
        # TODO - improve data integrity -- If it crashes after the delete but before the save, I don't want to lose data!

        self._deleteAllEventsFromGoogle()

        xml = ""
        for item in cal:
            xml = xml + APPOINTMENT_XML_BEGIN + self._toXML(item) + APPOINTMENT_XML_END
        if len(xml) < 1:
            return True
        url = "http://www.google.com/calendar/feeds/default/private/full"
        self._http_post(url, xml, 'application/atom+xml')
        return True

osession.py
#!/usr/bin/env python
#########################################################
# Provides a 'session' for communicating with Outlook about my Calendar,
# and provides primitive operations for retrieving and storing my
# calendar to Outlook.
#########################################################

import win32com.client
from asession import Session

class OutlookSession(Session):

    # OlRecurrenceState Constants
    OlApptNotRecurring    = 0
    OlApptMaster 	      = 1
    OlApptOccurrence 	  = 2
    OlApptException 	  = 3

    # Outlook appointment statuses
    OlNonMeeting          = 0
    OlMeeting             = 1
    OlMeetingReceived     = 3
    OlMeetingCanceled     = 5
    OlMeetingCanceledToo  = 7   # Not documented; my live data contained an example of this.

    # Outlook visibility constants
    OlNormal        = 0
    OlPersonal      = 1
    OlPrivate       = 2
    OlConfidential  = 3

    # Outlook appointments can be busy/free/out-of-office/tentative
    OlFree        = 0
    OlTentative   = 1
    OlBusy        = 2
    OlOutOfOffice = 3

    # Items in an Outlook folder can be one of these types of item
    OlMailItem        = 0
    OlAppointmentItem = 1
    OlContactItem     = 2
    OlTaskItem        = 3


    # Values for GetDefaultFolder(x)
    OlFolderCalendar   = 9
    OlFolderContacts   = 10
    OlFolderJournal    = 11
    OlFolderNotes      = 12
    OlFolderTasks      = 13
    OlFolderReminders1 = 14
    OlFolderReminders2 = 15
    OlFolderDrafts     = 16

    def __init__(self, calendar_name='Calendar', userid='dummy', password='dummy'):
        Session.__init__(self, userid, password)
        self.down()
        self.calendar_name = calendar_name

    # Handy if you have a Session and don't know which kind it is.
    def getServiceName(self):
        return "Outlook"

    # Start the session.
    def up(self):
        self.outlook = win32com.client.gencache.EnsureDispatch("Outlook.Application")
        self.mapi = self.outlook.GetNamespace("MAPI")
        self.folders = self.mapi.Folders.Item(1).Folders  #Usually the user's folders
        self.olCalendar =  self.folders.Item(self.calendar_name)
        return self

    # Shut-down the session.
    def down(self):
        self.outlook = None
        self.olCalendar = None
        self.mapi = None

    def isUp(self):
        return self.mapi != None

    # Convert a COM event into a dict session-event
    def _makeSessionEvent(self, aitem):
        item = {}
        item[Session.START]          = aitem.Start
        item[Session.END]            = aitem.End
        item[Session.VISIBILITY]     = aitem.Sensitivity
        item[Session.CREATED]        = aitem.CreationTime
        item[Session.MODIFIED]       = aitem.LastModificationTime
        item[Session.LOCATION]       = aitem.Location
        item[Session.MEETING_STATUS] = aitem.MeetingStatus
        item[Session.BUSY]           = aitem.BusyStatus
        item[Session.SUBJECT]        = aitem.Subject
        item[Session.GUID]           = aitem.EntryID
        item[Session.RECURS]         = (aitem.RecurrenceState != OutlookSession.OlApptNotRecurring)

        # Touching Body seems to set off the Outlook a-program-is-accessing-your-email
        # TODO: Use Redemption DLL to prevent the warning.
        # item[Session.BODY] = aitem.Body

        # These are appointments with no one else invited?
        if aitem.MeetingStatus == OutlookSession.OlNonMeeting:
            item[Session.MEETING_STATUS] = OutlookSession.OUTLOOK_STATUS_CONFIRMED

        return item

    #retrieve the full calendar
    def fetchFullCalendar(self):
        assert self.isUp()
        events = []
        count = len(self.olCalendar.Items)
        self.olCalendar.Items.Sort("[Start]")
        self.olCalendar.Items.IncludeRecurrences = False
        for i in range(1, count + 1):
            item = self.olCalendar.Items.Item(i)
            events.append(self._makeSessionEvent(item))
        return events

    # TODO: Can I bulk-delete without iterating?
    def _deleteAllEventsFromOutlook(self):
        count = len(self.olCalendar.Items)
        self.olCalendar.Items.IncludeRecurrences = False
        count = len(self.olCalendar.Items)
        guids = [self.olCalendar.Items.Item(i+1).EntryID for i in range(count)]        # I realize this is not Pythonic, but you have to call Outlook with successive integers.
        for guid in guids:
            self.olCalendar.Session.GetItemFromID(guid).Delete()
        return True

    def storeFullCalendar(self, cal):
        assert(isinstance(cal, list))
        if len(cal) > 0:
            assert(isinstance(cal[0], dict))

        # TODO - improve gross inefficiency
        # TODO - improve data integrity -- If it crashes after the delete but before the save, I don't want to lose data!

        self._deleteAllEventsFromOutlook()

        for item in cal:
            oEvent             = self.olCalendar.Items.Add(self.OlAppointmentItem)
            oEvent.Subject     = item[Session.SUBJECT]
            oEvent.Start       = item[Session.START]
            oEvent.End         = item[Session.END]
            oEvent.BusyStatus  = item[Session.BUSY]
            oEvent.Location    = item[Session.LOCATION]
            oEvent.MeetingStatus = item[Session.MEETING_STATUS]
            oEvent.ReminderMinutesBeforeStart = 0
            oEvent.ReminderSet = True
            oEvent.Sensitivity = item[Session.VISIBILITY]
            oEvent.ClearRecurrencePattern()
            oEvent.Save()

            item[Session.GUID] = oEvent.EntryID

        return True

Learnings During this Cycle

Python:
  • I don't like explicit "self." It is all too easy to leave it out, creating bugs to fix. It is overly verbose, and it includes just as many special characters as using the "@" sigil (I'm counting the period.)I know it is necessary in Python.
  • I miss 'unless' and my statement modifiers ('if' and 'unless' after a statement).
  • I'm fine with Python strings being immutable. I'm not happy about not being able to point a string function parameter at another string. It means I have to return a new string, and it prevents me from modifying multiple string parameters. And it is one more way in which Python is irregular.
  • I've been using ActivePython 2.4.3. Then I got "urlopen error unknown url type: https" on https URLs. That's when I learned that ActivePython doesn't include SSL support. Replaced it with Python 2.5 from python.org.
Rants:
  • Why is it that only Visual Basic (V5) lets you modify a function as you step through it and provide full access to data/functions from its debug/immediate window?
  • Code intelligence (function prototype checking) is important in a dynamic language's IDE/editor. It is the only clue you get to an improper method call until runtime.

Add new comment