logo
Published on Input Jam (http://www.kleinfelter.com)

Bake-off Code 2

By Kevin Kleinfelter
Created 2007-05-03 20:13

Python Code

test-session.py

#!/usr/bin/env python
# Test suite for my Outlook and Google session objects.

import sys
sys.path.append('..')
import unittest
from google.gsession  import GoogleSession
from outlook.osession import OutlookSession


class TestSession(unittest.TestCase):

    #################################################################
    # I can set these to fixed values to prevent a prompt for ID/password,
    # but I don't want to post to my blog with meaningful values here!
    #################################################################
    test_user = None
    test_password = None

    #################################################################
    # unittest requires me to name these test_*
    # I'm naming things test_nn_* in order to control the order of execution (it
    # runs them in alphabetical order).
    #################################################################
    def test_01_session_start_up_and_shut_down(self):
        gsess = GoogleSession(TestSession.test_user, TestSession.test_password)
        osess = OutlookSession('TestCalendar', 'dummy', 'dummy')
        self.sessionUpDown(gsess)
        self.sessionUpDown(osess)

    def test_02_fetch_full_calendar(self):
        osess = OutlookSession('TestCalendar', 'dummy', 'dummy').up()
        gsess = GoogleSession(TestSession.test_user, TestSession.test_password).up()

        gcal = gsess.fetchFullCalendar()
        ocal = osess.fetchFullCalendar()

        assert type(gcal) == list
        assert type(ocal) == list

    def test_03_store_full_calendar(self):
        osess = OutlookSession('TestCalendar').up()
        gsess = GoogleSession(TestSession.test_user, TestSession.test_password).up()

        gcal = gsess.fetchFullCalendar()
        ocal = osess.fetchFullCalendar()

        if len(gcal) < 1: raise "Need you to create events in your Google test calendar for testing."
        if len(ocal) < 1: raise "Need you to create events in your Outlook test calendar for testing."

        gsess.storefullcalendar(gcal)
        osess.storefullcalendar(ocal)

    def sessionupdown(self, sess):
        assert (not sess.isup())
        sess.up()
        assert sess.isup()
        sess.down()
        assert (not sess.isup())


if __name__ ="=" '__main__':
    if testsession.test_user ="=" none:
        testsession.test_user ="" raw_input("enter Google user ID:")
    if testsession.test_password ="=" none:
        testsession.test_password ="" raw_input("enter Google password:")
    unittest.main()




asession.py

#!/usr/bin/env python
# Abstract Session objects -- GoogleSession and OutlookSession are derived
# from this.
# (There was a system file session.py that was interfering with my ability to
# define a session.py)

import urllib, urllib2, httplib, socket

class Session:
    # Outlook AND GMail
    START          = 0
    END            = 1
    VISIBILITY     = 2
    CREATED        = 3
    MODIFIED       = 4
    LOCATION       = 5
    MEETING_STATUS = 6
    BUSY           = 7
    SUBJECT        = 8
    GUID           = 9
    RECURS         = 10

    # GMail
    AUTHOR_NAME    = 100
    AUTHOR_EMAIL   = 101
    EDIT_URL       = 102
    BODY           = 103   # Can't use Outlook's body because it triggers the security warning

    def __init__(self, userid=None, password=None):
        if userid == None: userid = raw_input("enter " + self.getServiceName() + " user ID:")
        if password == None: password = raw_input("enter " + self.getServiceName() + " password:")

        self.userid = userid
        self.password = password

    # Handy if you have a Session and don't know which kind it is.
    def getServiceName(self):
        return "Abstract Session"

#######################################################################
# Based on http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/456195
class ProxyHTTPConnection(httplib.HTTPConnection):

    _ports = {'http' : 80, 'https' : 443}

    def request(self, method, url, body=None, headers={}):
        proto, rest = urllib.splittype(url)
        host,  rest = urllib.splithost(rest)
        host,  port = urllib.splitport(host)
        if port is None:
            port = self._ports[proto]
        else:
            port = int(port)
        self._real_host = host
        self._real_port = port
        httplib.HTTPConnection.request(self, method, url, body, headers)

    def connect(self):
        httplib.HTTPConnection.connect(self)
        self.send("CONNECT %s:%d HTTP/1.0\r\n\r\n" % (self._real_host, self._real_port))
        response = self.response_class(self.sock, strict=self.strict, method=self._method)
        (version, code, message) = response._read_status()

        if code != 200:
            self.close()
            raise socket.error, "Proxy CONNECT %s:%d failed: %d %s" % (self._real_host, self._real_port, code, message.strip())

        #eat up header block from proxy....
        #Throws an error if there is no trailing \r\n
        while True:
            line = response.fp.readline()
            if line == '\r\n':
                return

##################################################
class ProxyHTTPSConnection(ProxyHTTPConnection):

    default_port = 443

    def __init__(self, host, port = None):
        ProxyHTTPConnection.__init__(self, host, port)

    # wrapper for httplib 'connect' - supporting http via CONNECT
    def connect(self):
        ProxyHTTPConnection.connect(self)
        ssl = socket.ssl(self.sock)
        self.sock = httplib.FakeSocket(self.sock, ssl)




gsession.py

#!/usr/bin/env python
#########################################################
# Provides a 'session' for communicating with Google about my Calendar,
# and provides primitive operations for retrieving and storing my
# calendar to Google.
#########################################################

from asession import Session, ProxyHTTPSConnection
from urllib import splittype
import urllib, urllib2, httplib
from xml.dom import minidom


# TODO - make sure no user data can mess this up (maybe by """ in the appointment text?)
APPOINTMENT_XML_MIDDLE = """


%s
%s%s



"""


APPOINTMENT_XML_BEGIN = ""
APPOINTMENT_XML_END = ""


class GoogleSession(Session):
    USER_AGENT = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'

    def __init__(self, userid=None, password=None):
        Session.__init__(self, userid, password)
        self.google_key  = None
        proxies          = urllib.getproxies()
        self.http_proxy  = proxies['http']
        self.https_proxy = proxies['https']

    # Handy if you have a Session and don't know which kind it is.
    def getServiceName(self):
        return "Google"

    def isUp(self):
        return self.google_key != None

    def itemCount(self):
        if not self.isUp():
            return 0
        else:
            return

    def _unpackProxyString(self, astring):
        urltype, the_rest = splittype(astring)          # Split http://host:port/bar into http and //host:port/bar
        host, the_rest2   = urllib.splithost(the_rest)  # split into host:port and /bar
        host, port        = urllib.splitport(host)      # Split into host and port
        return (host, port)


    def _login(self, max_redirect_count, userid, password):
        # urllib.urlopen is supposed to transparently handle proxy server via
        # an http_proxy environment variable.  It turns out that this is
        # true -- and that you need an httpS_proxy environment variable if you
        # want to proxy httpS.
        # urllib2's automatic proxy failed on BellSouth's proxy (auth required)
        # for httpS, so I'll use CONNECT instead. Unfortunately it turned out to
        # be tough to have gsession handle the proxy -- urllib2 kept trying to
        # do so IN ADDITION. I found a seriously hacky way to tell it not to,
        # but going straight to httplib is easier and safer than subverting
        # urllib2.
        # Note: urllib2 first tries to pick up HTTP_PROXY / HTTPS_PROXY from
        # the environment.  Failing that it ALSO tries to pick up proxy settings
        # from Internet Explorer (in the registry).  All of this even if I'm
        # trying to manually handle proxy.

        data = urllib.urlencode({'Email':userid, 'Passwd':password, 'service':'cl', 'source':'test-login-seq'})
        headers = {"Content-type":"application/x-www-form-urlencoded", 'User-Agent':self.USER_AGENT}

        if self.https_proxy:
            host, port = self._unpackProxyString(self.https_proxy)
            con1 = ProxyHTTPSConnection(host, port)
            con1.request("POST", "https://www.google.com:443/accounts/ClientLogin", data, headers)
        else:
            host = "www.google.com"
            port = 443
            con1 = httplib.HTTPSConnection(host, port)
            con1.request("POST", "/accounts/ClientLogin", data, headers)
        response = con1.getresponse()
        text = response.read()
        con1.close()

        if response.status == 200:
            pass
        else:
            if self.https_proxy == None:
                self.https_proxy="(none)"
            raise urllib2.URLError, "Bad response from login (proxy=" + self.https_proxy + "). " + str(response.status) + "; " + response.reason

        # Get the Google magic token from the response.
        for line in text.splitlines():
            if line.startswith('Auth='):
                dummy,self.google_key = line.split("=")
        if self.google_key == None:
            raise "could not find login key in server response"

        return [response, text]

    # Start the session.
    def up(self):
        res = self._login(5, self.userid, self.password)
        return self

    # Shut-down the session.
    def down(self):
        self.google_key = None

    # Do an HTTP GET to Google, using the Google magic token for security.
    def _gget(self, url, max_retry_count = 3):
        headers = { 'User-Agent' : self.USER_AGENT }
        headers['Authorization'] = 'GoogleLogin auth=' + self.google_key

        req = urllib2.Request(url, None, headers)
        # Note: urlopen handles redirects and proxies. See comments about
        # proxy handling elsewhere in this file.
        result = urllib2.urlopen(req)
        text = result.read()
        return [result, text]

    def getLocationHeader(self, response):
        try:
            return response.msg.dict['location']
        except:
            return None

    # Does a post, adding the GoogleLogin Auth header
    def _http_post(self, url, body = None, content_type = None, override = None):
        # Note: urlopen 'follows' a POST that gets redirected with a GET.
        # Conclusion: using urllib/urllib2 for http is more trouble than it is worth with
        # GDATA, except for simple http GETs.
        body_length = 0
        if body:
            body_length = len(body)
        header1 = {}
        header1["Authorization"] = "GoogleLogin auth=" + self.google_key
        if override:
            header1['X-HTTP-Method-Override'] = override
        if content_type:
            header1["Content-Type"] = content_type
            header1['Content-Length'] = str(body_length)

        if self.http_proxy:
            host, port = self._unpackProxyString(self.http_proxy)
            con1 = httplib.HTTPConnection(host, port)
            con1.request("POST", url, body, header1)
            response = con1.getresponse()
            text     = response.read()
            con1.close()
        else:
            urltype, the_rest = splittype(url)        # Split http://host:port/bar into http and //host:port/bar
            dummy, path  = urllib.splithost(the_rest) # split into host:port and /bar
            con1 = httplib.HTTPConnection("www.google.com", 80)
            con1.request("POST", path, body, header1)
            response = con1.getresponse()
            text = response.read()
            con1.close()

        if response.status == 200:
            pass
        elif response.status == 201:
            pass
        elif response.status == 302:
            url = self.getLocationHeader(response)
            # TODO prevent infinite redirect loop
            self._http_post(url, body, content_type, override)
        else:
            raise urllib2.URLError, "Bad response from POST." + str(response.status) + "; " + response.reason

        return True

    # HTTP POST with X-HTTP-Method-Override=DELETE -- to get through the
    # corp firewall.
    def _http_post_delete(self, url):
        return self._http_post(url, None, 'application/atom+xml', 'DELETE')

    # Don't want to quit if DOM objects of this form are missing from the data.
    def _safeGetTagAttribute(self, aitem, tagname, attrname):
        try:
            return aitem.getElementsByTagName(tagname)[0].attributes[attrname].value
        except:
            print 'DEBUG exception 1 caught'
            return None

    # Don't want to quit if DOM objects of this form are missing from the data.
    def _safeGetTagAttributeValue(self, aitem, tagname, attrname):
        try:
            return aitem.getElementsByTagName(tagname)[0].attributes[attrname].nodeValue
        except:
            print 'DEBUG exception 1 caught on ' + tagname + ', ' + attrname
            return None

    # Don't want to quit if DOM objects of this form are missing from the data.
    def _safeGetTagValue(self, aitem, tagname):
        try:
            return aitem.getElementsByTagName(tagname)[0].firstChild.nodeValue
        except:
            print 'DEBUG exception 2 caught on ' + tagname
            return None

    # Convert a DOM event into a dict session-event
    def _makeSessionEvent(self, aitem):
        item = {}
        auth = aitem.getElementsByTagName('author')[0]
        item[Session.GUID]     = None
        item[Session.EDIT_URL] = None

        if aitem.getElementsByTagName('gd:when'):
            item[Session.START]        = self._safeGetTagAttribute(aitem, 'gd:when', 'startTime')
            item[Session.END]          = self._safeGetTagAttribute(aitem, 'gd:when', 'endTime')
            item[Session.RECURS]       = False
        elif aitem.getElementsByTagName('gd:recurrence'):
            item[Session.START]        = None
            item[Session.END]          = None
            item[Session.RECURS]       = True

        item[Session.VISIBILITY]       = self._safeGetTagAttribute(aitem, 'gd:visibility', 'value')
        item[Session.CREATED]          = self._safeGetTagValue(aitem, 'published')
        item[Session.MODIFIED]         = self._safeGetTagValue(aitem, 'updated')
        item[Session.LOCATION]         = self._safeGetTagAttributeValue(aitem, 'gd:where', 'valueString')
        item[Session.BUSY]             = self._safeGetTagAttributeValue(aitem, 'gd:transparency', 'value')
        item[Session.SUBJECT]          = self._safeGetTagValue(aitem, 'title')
        item[Session.BODY]             = self._safeGetTagValue(aitem, 'content')
        item[Session.MEETING_STATUS]   = self._safeGetTagAttributeValue(aitem, 'gd:eventStatus', 'value')
        item[Session.AUTHOR_NAME]  = self._safeGetTagValue(auth, 'name')
        item[Session.AUTHOR_EMAIL] = self._safeGetTagValue(auth, 'email')

        tmp = aitem.getElementsByTagName('link')
        for x in tmp:
            if x.attributes['rel'].value == 'self':
                item[Session.GUID]     = x.attributes['href'].value
            if x.attributes['rel'].value == 'edit':
                item[Session.EDIT_URL] = x.attributes['href'].value

        return item


    #retrieve the full calendar
    def fetchFullCalendar(self):
        result, text = self._gget("http://www.google.com/calendar/feeds/default/private/full?max-results=999999")
        xml          = minidom.parseString(text)
        items        = xml.getElementsByTagName('entry')
        items2       = [self._makeSessionEvent(x) for x in items]
        # TODO: Make this a deep copy?
        self.lastFullFetchedCalendar = items2
        return items2

    def _make_safe_for_xml(self, source):
        #TODO - convert special characters into XML character entities.
        target = source
        return target

    # Convert a dict session-event into an XML event.  (Don't need full DOM,
    # so I just do string substitution.)
    def _toXML(self, item):
        mytitle = '
        mycontent = '
        mywhere = '

        mytitle         = self._make_safe_for_xml(item[Session.SUBJECT])
        mycontent       = self._make_safe_for_xml(item[Session.BODY])
        myname          = "Ann E. Body"
        myemail         = "nobody@example.com"
        mytransparency  = item[Session.BUSY]
        mystatus        = item[Session.MEETING_STATUS]
        mywhere         = self._make_safe_for_xml(item[Session.LOCATION])
        mystart         = item[Session.START] # Must be format yyyy-mm-ddThh:mm:ssZ
        myend           = item[Session.END]
        xml = APPOINTMENT_XML_MIDDLE % (mytitle, mycontent, myname, myemail, mytransparency, mystatus, mywhere, mystart, myend)
        return xml

    # TODO: Look into batch API at http://code.google.com/apis/gdata/batch.html.
    #       No batch support for Calendar as of 12/8/2006, but keep checking.
    def _deleteAllEventsFromGoogle(self):
        cal = self.lastFullFetchedCalendar
        count = len(cal)
        for event in cal:
            self._http_post_delete(event[Session.EDIT_URL])
        return True

    def storeFullCalendar(self, cal):
        assert(isinstance(cal, list))
        if len(cal) > 0:
            assert(isinstance(cal[0], dict))

        # TODO - improve gross inefficiency
        # TODO - improve data integrity -- If it crashes after the delete but before the save, I don't want to lose data!

        self._deleteAllEventsFromGoogle()

        xml = ""
        for item in cal:
            xml = xml + APPOINTMENT_XML_BEGIN + self._toXML(item) + APPOINTMENT_XML_END
        if len(xml) < 1:
            return True
        url = "http://www.google.com/calendar/feeds/default/private/full"
        self._http_post(url, xml, 'application/atom+xml')
        return True

osession.py
#!/usr/bin/env python
#########################################################
# Provides a 'session' for communicating with Outlook about my Calendar,
# and provides primitive operations for retrieving and storing my
# calendar to Outlook.
#########################################################

import win32com.client
from asession import Session

class OutlookSession(Session):

    # OlRecurrenceState Constants
    OlApptNotRecurring    = 0
    OlApptMaster 	      = 1
    OlApptOccurrence 	  = 2
    OlApptException 	  = 3

    # Outlook appointment statuses
    OlNonMeeting          = 0
    OlMeeting             = 1
    OlMeetingReceived     = 3
    OlMeetingCanceled     = 5
    OlMeetingCanceledToo  = 7   # Not documented; my live data contained an example of this.

    # Outlook visibility constants
    OlNormal        = 0
    OlPersonal      = 1
    OlPrivate       = 2
    OlConfidential  = 3

    # Outlook appointments can be busy/free/out-of-office/tentative
    OlFree        = 0
    OlTentative   = 1
    OlBusy        = 2
    OlOutOfOffice = 3

    # Items in an Outlook folder can be one of these types of item
    OlMailItem        = 0
    OlAppointmentItem = 1
    OlContactItem     = 2
    OlTaskItem        = 3


    # Values for GetDefaultFolder(x)
    OlFolderCalendar   = 9
    OlFolderContacts   = 10
    OlFolderJournal    = 11
    OlFolderNotes      = 12
    OlFolderTasks      = 13
    OlFolderReminders1 = 14
    OlFolderReminders2 = 15
    OlFolderDrafts     = 16

    def __init__(self, calendar_name='Calendar', userid='dummy', password='dummy'):
        Session.__init__(self, userid, password)
        self.down()
        self.calendar_name = calendar_name

    # Handy if you have a Session and don't know which kind it is.
    def getServiceName(self):
        return "Outlook"

    # Start the session.
    def up(self):
        self.outlook = win32com.client.gencache.EnsureDispatch("Outlook.Application")
        self.mapi = self.outlook.GetNamespace("MAPI")
        self.folders = self.mapi.Folders.Item(1).Folders  #Usually the user's folders
        self.olCalendar =  self.folders.Item(self.calendar_name)
        return self

    # Shut-down the session.
    def down(self):
        self.outlook = None
        self.olCalendar = None
        self.mapi = None

    def isUp(self):
        return self.mapi != None

    # Convert a COM event into a dict session-event
    def _makeSessionEvent(self, aitem):
        item = {}
        item[Session.START]          = aitem.Start
        item[Session.END]            = aitem.End
        item[Session.VISIBILITY]     = aitem.Sensitivity
        item[Session.CREATED]        = aitem.CreationTime
        item[Session.MODIFIED]       = aitem.LastModificationTime
        item[Session.LOCATION]       = aitem.Location
        item[Session.MEETING_STATUS] = aitem.MeetingStatus
        item[Session.BUSY]           = aitem.BusyStatus
        item[Session.SUBJECT]        = aitem.Subject
        item[Session.GUID]           = aitem.EntryID
        item[Session.RECURS]         = (aitem.RecurrenceState != OutlookSession.OlApptNotRecurring)

        # Touching Body seems to set off the Outlook a-program-is-accessing-your-email
        # TODO: Use Redemption DLL to prevent the warning.
        # item[Session.BODY] = aitem.Body

        # These are appointments with no one else invited?
        if aitem.MeetingStatus == OutlookSession.OlNonMeeting:
            item[Session.MEETING_STATUS] = OutlookSession.OUTLOOK_STATUS_CONFIRMED

        return item

    #retrieve the full calendar
    def fetchFullCalendar(self):
        assert self.isUp()
        events = []
        count = len(self.olCalendar.Items)
        self.olCalendar.Items.Sort("[Start]")
        self.olCalendar.Items.IncludeRecurrences = False
        for i in range(1, count + 1):
            item = self.olCalendar.Items.Item(i)
            events.append(self._makeSessionEvent(item))
        return events

    # TODO: Can I bulk-delete without iterating?
    def _deleteAllEventsFromOutlook(self):
        count = len(self.olCalendar.Items)
        self.olCalendar.Items.IncludeRecurrences = False
        count = len(self.olCalendar.Items)
        guids = [self.olCalendar.Items.Item(i+1).EntryID for i in range(count)]        # I realize this is not Pythonic, but you have to call Outlook with successive integers.
        for guid in guids:
            self.olCalendar.Session.GetItemFromID(guid).Delete()
        return True

    def storeFullCalendar(self, cal):
        assert(isinstance(cal, list))
        if len(cal) > 0:
            assert(isinstance(cal[0], dict))

        # TODO - improve gross inefficiency
        # TODO - improve data integrity -- If it crashes after the delete but before the save, I don't want to lose data!

        self._deleteAllEventsFromOutlook()

        for item in cal:
            oEvent             = self.olCalendar.Items.Add(self.OlAppointmentItem)
            oEvent.Subject     = item[Session.SUBJECT]
            oEvent.Start       = item[Session.START]
            oEvent.End         = item[Session.END]
            oEvent.BusyStatus  = item[Session.BUSY]
            oEvent.Location    = item[Session.LOCATION]
            oEvent.MeetingStatus = item[Session.MEETING_STATUS]
            oEvent.ReminderMinutesBeforeStart = 0
            oEvent.ReminderSet = True
            oEvent.Sensitivity = item[Session.VISIBILITY]
            oEvent.ClearRecurrencePattern()
            oEvent.Save()

            item[Session.GUID] = oEvent.EntryID

        return True

Learnings During this Cycle

Python: Rants:

Source URL:
http://www.kleinfelter.com/node/54