test-session.py
#!/usr/bin/env python
# Test suite for my Outlook and Google session objects.
import sys
sys.path.append('..')
import unittest
from google.gsession import GoogleSession
from outlook.osession import OutlookSession
class TestSession(unittest.TestCase):
#################################################################
# I can set these to fixed values to prevent a prompt for ID/password,
# but I don't want to post to my blog with meaningful values here!
#################################################################
test_user = None
test_password = None
#################################################################
# unittest requires me to name these test_*
# I'm naming things test_nn_* in order to control the order of execution (it
# runs them in alphabetical order).
#################################################################
def test_01_session_start_up_and_shut_down(self):
gsess = GoogleSession(TestSession.test_user, TestSession.test_password)
osess = OutlookSession('TestCalendar', 'dummy', 'dummy')
self.sessionUpDown(gsess)
self.sessionUpDown(osess)
def test_02_fetch_full_calendar(self):
osess = OutlookSession('TestCalendar', 'dummy', 'dummy').up()
gsess = GoogleSession(TestSession.test_user, TestSession.test_password).up()
gcal = gsess.fetchFullCalendar()
ocal = osess.fetchFullCalendar()
assert type(gcal) == list
assert type(ocal) == list
def test_03_store_full_calendar(self):
osess = OutlookSession('TestCalendar').up()
gsess = GoogleSession(TestSession.test_user, TestSession.test_password).up()
gcal = gsess.fetchFullCalendar()
ocal = osess.fetchFullCalendar()
if len(gcal) < 1: raise "Need you to create events in your Google test calendar for testing."
if len(ocal) < 1: raise "Need you to create events in your Outlook test calendar for testing."
gsess.storefullcalendar(gcal)
osess.storefullcalendar(ocal)
def sessionupdown(self, sess):
assert (not sess.isup())
sess.up()
assert sess.isup()
sess.down()
assert (not sess.isup())
if __name__ ="=" '__main__':
if testsession.test_user ="=" none:
testsession.test_user ="" raw_input("enter Google user ID:")
if testsession.test_password ="=" none:
testsession.test_password ="" raw_input("enter Google password:")
unittest.main()
#!/usr/bin/env python
# Abstract Session objects -- GoogleSession and OutlookSession are derived
# from this.
# (There was a system file session.py that was interfering with my ability to
# define a session.py)
import urllib, urllib2, httplib, socket
class Session:
# Outlook AND GMail
START = 0
END = 1
VISIBILITY = 2
CREATED = 3
MODIFIED = 4
LOCATION = 5
MEETING_STATUS = 6
BUSY = 7
SUBJECT = 8
GUID = 9
RECURS = 10
# GMail
AUTHOR_NAME = 100
AUTHOR_EMAIL = 101
EDIT_URL = 102
BODY = 103 # Can't use Outlook's body because it triggers the security warning
def __init__(self, userid=None, password=None):
if userid == None: userid = raw_input("enter " + self.getServiceName() + " user ID:")
if password == None: password = raw_input("enter " + self.getServiceName() + " password:")
self.userid = userid
self.password = password
# Handy if you have a Session and don't know which kind it is.
def getServiceName(self):
return "Abstract Session"
#######################################################################
# Based on http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/456195
class ProxyHTTPConnection(httplib.HTTPConnection):
_ports = {'http' : 80, 'https' : 443}
def request(self, method, url, body=None, headers={}):
proto, rest = urllib.splittype(url)
host, rest = urllib.splithost(rest)
host, port = urllib.splitport(host)
if port is None:
port = self._ports[proto]
else:
port = int(port)
self._real_host = host
self._real_port = port
httplib.HTTPConnection.request(self, method, url, body, headers)
def connect(self):
httplib.HTTPConnection.connect(self)
self.send("CONNECT %s:%d HTTP/1.0\r\n\r\n" % (self._real_host, self._real_port))
response = self.response_class(self.sock, strict=self.strict, method=self._method)
(version, code, message) = response._read_status()
if code != 200:
self.close()
raise socket.error, "Proxy CONNECT %s:%d failed: %d %s" % (self._real_host, self._real_port, code, message.strip())
#eat up header block from proxy....
#Throws an error if there is no trailing \r\n
while True:
line = response.fp.readline()
if line == '\r\n':
return
##################################################
class ProxyHTTPSConnection(ProxyHTTPConnection):
default_port = 443
def __init__(self, host, port = None):
ProxyHTTPConnection.__init__(self, host, port)
# wrapper for httplib 'connect' - supporting http via CONNECT
def connect(self):
ProxyHTTPConnection.connect(self)
ssl = socket.ssl(self.sock)
self.sock = httplib.FakeSocket(self.sock, ssl)
#!/usr/bin/env python
#########################################################
# Provides a 'session' for communicating with Google about my Calendar,
# and provides primitive operations for retrieving and storing my
# calendar to Google.
#########################################################
from asession import Session, ProxyHTTPSConnection
from urllib import splittype
import urllib, urllib2, httplib
from xml.dom import minidom
# TODO - make sure no user data can mess this up (maybe by """ in the appointment text?)
APPOINTMENT_XML_MIDDLE = """
%s %s %s """
APPOINTMENT_XML_BEGIN = ""
APPOINTMENT_XML_END = " "
class GoogleSession(Session):
USER_AGENT = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
def __init__(self, userid=None, password=None):
Session.__init__(self, userid, password)
self.google_key = None
proxies = urllib.getproxies()
self.http_proxy = proxies['http']
self.https_proxy = proxies['https']
# Handy if you have a Session and don't know which kind it is.
def getServiceName(self):
return "Google"
def isUp(self):
return self.google_key != None
def itemCount(self):
if not self.isUp():
return 0
else:
return
def _unpackProxyString(self, astring):
urltype, the_rest = splittype(astring) # Split http://host:port/bar into http and //host:port/bar
host, the_rest2 = urllib.splithost(the_rest) # split into host:port and /bar
host, port = urllib.splitport(host) # Split into host and port
return (host, port)
def _login(self, max_redirect_count, userid, password):
# urllib.urlopen is supposed to transparently handle proxy server via
# an http_proxy environment variable. It turns out that this is
# true -- and that you need an httpS_proxy environment variable if you
# want to proxy httpS.
# urllib2's automatic proxy failed on BellSouth's proxy (auth required)
# for httpS, so I'll use CONNECT instead. Unfortunately it turned out to
# be tough to have gsession handle the proxy -- urllib2 kept trying to
# do so IN ADDITION. I found a seriously hacky way to tell it not to,
# but going straight to httplib is easier and safer than subverting
# urllib2.
# Note: urllib2 first tries to pick up HTTP_PROXY / HTTPS_PROXY from
# the environment. Failing that it ALSO tries to pick up proxy settings
# from Internet Explorer (in the registry). All of this even if I'm
# trying to manually handle proxy.
data = urllib.urlencode({'Email':userid, 'Passwd':password, 'service':'cl', 'source':'test-login-seq'})
headers = {"Content-type":"application/x-www-form-urlencoded", 'User-Agent':self.USER_AGENT}
if self.https_proxy:
host, port = self._unpackProxyString(self.https_proxy)
con1 = ProxyHTTPSConnection(host, port)
con1.request("POST", "https://www.google.com:443/accounts/ClientLogin", data, headers)
else:
host = "www.google.com"
port = 443
con1 = httplib.HTTPSConnection(host, port)
con1.request("POST", "/accounts/ClientLogin", data, headers)
response = con1.getresponse()
text = response.read()
con1.close()
if response.status == 200:
pass
else:
if self.https_proxy == None:
self.https_proxy="(none)"
raise urllib2.URLError, "Bad response from login (proxy=" + self.https_proxy + "). " + str(response.status) + "; " + response.reason
# Get the Google magic token from the response.
for line in text.splitlines():
if line.startswith('Auth='):
dummy,self.google_key = line.split("=")
if self.google_key == None:
raise "could not find login key in server response"
return [response, text]
# Start the session.
def up(self):
res = self._login(5, self.userid, self.password)
return self
# Shut-down the session.
def down(self):
self.google_key = None
# Do an HTTP GET to Google, using the Google magic token for security.
def _gget(self, url, max_retry_count = 3):
headers = { 'User-Agent' : self.USER_AGENT }
headers['Authorization'] = 'GoogleLogin auth=' + self.google_key
req = urllib2.Request(url, None, headers)
# Note: urlopen handles redirects and proxies. See comments about
# proxy handling elsewhere in this file.
result = urllib2.urlopen(req)
text = result.read()
return [result, text]
def getLocationHeader(self, response):
try:
return response.msg.dict['location']
except:
return None
# Does a post, adding the GoogleLogin Auth header
def _http_post(self, url, body = None, content_type = None, override = None):
# Note: urlopen 'follows' a POST that gets redirected with a GET.
# Conclusion: using urllib/urllib2 for http is more trouble than it is worth with
# GDATA, except for simple http GETs.
body_length = 0
if body:
body_length = len(body)
header1 = {}
header1["Authorization"] = "GoogleLogin auth=" + self.google_key
if override:
header1['X-HTTP-Method-Override'] = override
if content_type:
header1["Content-Type"] = content_type
header1['Content-Length'] = str(body_length)
if self.http_proxy:
host, port = self._unpackProxyString(self.http_proxy)
con1 = httplib.HTTPConnection(host, port)
con1.request("POST", url, body, header1)
response = con1.getresponse()
text = response.read()
con1.close()
else:
urltype, the_rest = splittype(url) # Split http://host:port/bar into http and //host:port/bar
dummy, path = urllib.splithost(the_rest) # split into host:port and /bar
con1 = httplib.HTTPConnection("www.google.com", 80)
con1.request("POST", path, body, header1)
response = con1.getresponse()
text = response.read()
con1.close()
if response.status == 200:
pass
elif response.status == 201:
pass
elif response.status == 302:
url = self.getLocationHeader(response)
# TODO prevent infinite redirect loop
self._http_post(url, body, content_type, override)
else:
raise urllib2.URLError, "Bad response from POST." + str(response.status) + "; " + response.reason
return True
# HTTP POST with X-HTTP-Method-Override=DELETE -- to get through the
# corp firewall.
def _http_post_delete(self, url):
return self._http_post(url, None, 'application/atom+xml', 'DELETE')
# Don't want to quit if DOM objects of this form are missing from the data.
def _safeGetTagAttribute(self, aitem, tagname, attrname):
try:
return aitem.getElementsByTagName(tagname)[0].attributes[attrname].value
except:
print 'DEBUG exception 1 caught'
return None
# Don't want to quit if DOM objects of this form are missing from the data.
def _safeGetTagAttributeValue(self, aitem, tagname, attrname):
try:
return aitem.getElementsByTagName(tagname)[0].attributes[attrname].nodeValue
except:
print 'DEBUG exception 1 caught on ' + tagname + ', ' + attrname
return None
# Don't want to quit if DOM objects of this form are missing from the data.
def _safeGetTagValue(self, aitem, tagname):
try:
return aitem.getElementsByTagName(tagname)[0].firstChild.nodeValue
except:
print 'DEBUG exception 2 caught on ' + tagname
return None
# Convert a DOM event into a dict session-event
def _makeSessionEvent(self, aitem):
item = {}
auth = aitem.getElementsByTagName('author')[0]
item[Session.GUID] = None
item[Session.EDIT_URL] = None
if aitem.getElementsByTagName('gd:when'):
item[Session.START] = self._safeGetTagAttribute(aitem, 'gd:when', 'startTime')
item[Session.END] = self._safeGetTagAttribute(aitem, 'gd:when', 'endTime')
item[Session.RECURS] = False
elif aitem.getElementsByTagName('gd:recurrence'):
item[Session.START] = None
item[Session.END] = None
item[Session.RECURS] = True
item[Session.VISIBILITY] = self._safeGetTagAttribute(aitem, 'gd:visibility', 'value')
item[Session.CREATED] = self._safeGetTagValue(aitem, 'published')
item[Session.MODIFIED] = self._safeGetTagValue(aitem, 'updated')
item[Session.LOCATION] = self._safeGetTagAttributeValue(aitem, 'gd:where', 'valueString')
item[Session.BUSY] = self._safeGetTagAttributeValue(aitem, 'gd:transparency', 'value')
item[Session.SUBJECT] = self._safeGetTagValue(aitem, 'title')
item[Session.BODY] = self._safeGetTagValue(aitem, 'content')
item[Session.MEETING_STATUS] = self._safeGetTagAttributeValue(aitem, 'gd:eventStatus', 'value')
item[Session.AUTHOR_NAME] = self._safeGetTagValue(auth, 'name')
item[Session.AUTHOR_EMAIL] = self._safeGetTagValue(auth, 'email')
tmp = aitem.getElementsByTagName('link')
for x in tmp:
if x.attributes['rel'].value == 'self':
item[Session.GUID] = x.attributes['href'].value
if x.attributes['rel'].value == 'edit':
item[Session.EDIT_URL] = x.attributes['href'].value
return item
#retrieve the full calendar
def fetchFullCalendar(self):
result, text = self._gget("http://www.google.com/calendar/feeds/default/private/full?max-results=999999")
xml = minidom.parseString(text)
items = xml.getElementsByTagName('entry')
items2 = [self._makeSessionEvent(x) for x in items]
# TODO: Make this a deep copy?
self.lastFullFetchedCalendar = items2
return items2
def _make_safe_for_xml(self, source):
#TODO - convert special characters into XML character entities.
target = source
return target
# Convert a dict session-event into an XML event. (Don't need full DOM,
# so I just do string substitution.)
def _toXML(self, item):
mytitle = '
mycontent = '
mywhere = '
mytitle = self._make_safe_for_xml(item[Session.SUBJECT])
mycontent = self._make_safe_for_xml(item[Session.BODY])
myname = "Ann E. Body"
myemail = "nobody@example.com"
mytransparency = item[Session.BUSY]
mystatus = item[Session.MEETING_STATUS]
mywhere = self._make_safe_for_xml(item[Session.LOCATION])
mystart = item[Session.START] # Must be format yyyy-mm-ddThh:mm:ssZ
myend = item[Session.END]
xml = APPOINTMENT_XML_MIDDLE % (mytitle, mycontent, myname, myemail, mytransparency, mystatus, mywhere, mystart, myend)
return xml
# TODO: Look into batch API at http://code.google.com/apis/gdata/batch.html.
# No batch support for Calendar as of 12/8/2006, but keep checking.
def _deleteAllEventsFromGoogle(self):
cal = self.lastFullFetchedCalendar
count = len(cal)
for event in cal:
self._http_post_delete(event[Session.EDIT_URL])
return True
def storeFullCalendar(self, cal):
assert(isinstance(cal, list))
if len(cal) > 0:
assert(isinstance(cal[0], dict))
# TODO - improve gross inefficiency
# TODO - improve data integrity -- If it crashes after the delete but before the save, I don't want to lose data!
self._deleteAllEventsFromGoogle()
xml = ""
for item in cal:
xml = xml + APPOINTMENT_XML_BEGIN + self._toXML(item) + APPOINTMENT_XML_END
if len(xml) < 1:
return True
url = "http://www.google.com/calendar/feeds/default/private/full"
self._http_post(url, xml, 'application/atom+xml')
return True
#!/usr/bin/env python
#########################################################
# Provides a 'session' for communicating with Outlook about my Calendar,
# and provides primitive operations for retrieving and storing my
# calendar to Outlook.
#########################################################
import win32com.client
from asession import Session
class OutlookSession(Session):
# OlRecurrenceState Constants
OlApptNotRecurring = 0
OlApptMaster = 1
OlApptOccurrence = 2
OlApptException = 3
# Outlook appointment statuses
OlNonMeeting = 0
OlMeeting = 1
OlMeetingReceived = 3
OlMeetingCanceled = 5
OlMeetingCanceledToo = 7 # Not documented; my live data contained an example of this.
# Outlook visibility constants
OlNormal = 0
OlPersonal = 1
OlPrivate = 2
OlConfidential = 3
# Outlook appointments can be busy/free/out-of-office/tentative
OlFree = 0
OlTentative = 1
OlBusy = 2
OlOutOfOffice = 3
# Items in an Outlook folder can be one of these types of item
OlMailItem = 0
OlAppointmentItem = 1
OlContactItem = 2
OlTaskItem = 3
# Values for GetDefaultFolder(x)
OlFolderCalendar = 9
OlFolderContacts = 10
OlFolderJournal = 11
OlFolderNotes = 12
OlFolderTasks = 13
OlFolderReminders1 = 14
OlFolderReminders2 = 15
OlFolderDrafts = 16
def __init__(self, calendar_name='Calendar', userid='dummy', password='dummy'):
Session.__init__(self, userid, password)
self.down()
self.calendar_name = calendar_name
# Handy if you have a Session and don't know which kind it is.
def getServiceName(self):
return "Outlook"
# Start the session.
def up(self):
self.outlook = win32com.client.gencache.EnsureDispatch("Outlook.Application")
self.mapi = self.outlook.GetNamespace("MAPI")
self.folders = self.mapi.Folders.Item(1).Folders #Usually the user's folders
self.olCalendar = self.folders.Item(self.calendar_name)
return self
# Shut-down the session.
def down(self):
self.outlook = None
self.olCalendar = None
self.mapi = None
def isUp(self):
return self.mapi != None
# Convert a COM event into a dict session-event
def _makeSessionEvent(self, aitem):
item = {}
item[Session.START] = aitem.Start
item[Session.END] = aitem.End
item[Session.VISIBILITY] = aitem.Sensitivity
item[Session.CREATED] = aitem.CreationTime
item[Session.MODIFIED] = aitem.LastModificationTime
item[Session.LOCATION] = aitem.Location
item[Session.MEETING_STATUS] = aitem.MeetingStatus
item[Session.BUSY] = aitem.BusyStatus
item[Session.SUBJECT] = aitem.Subject
item[Session.GUID] = aitem.EntryID
item[Session.RECURS] = (aitem.RecurrenceState != OutlookSession.OlApptNotRecurring)
# Touching Body seems to set off the Outlook a-program-is-accessing-your-email
# TODO: Use Redemption DLL to prevent the warning.
# item[Session.BODY] = aitem.Body
# These are appointments with no one else invited?
if aitem.MeetingStatus == OutlookSession.OlNonMeeting:
item[Session.MEETING_STATUS] = OutlookSession.OUTLOOK_STATUS_CONFIRMED
return item
#retrieve the full calendar
def fetchFullCalendar(self):
assert self.isUp()
events = []
count = len(self.olCalendar.Items)
self.olCalendar.Items.Sort("[Start]")
self.olCalendar.Items.IncludeRecurrences = False
for i in range(1, count + 1):
item = self.olCalendar.Items.Item(i)
events.append(self._makeSessionEvent(item))
return events
# TODO: Can I bulk-delete without iterating?
def _deleteAllEventsFromOutlook(self):
count = len(self.olCalendar.Items)
self.olCalendar.Items.IncludeRecurrences = False
count = len(self.olCalendar.Items)
guids = [self.olCalendar.Items.Item(i+1).EntryID for i in range(count)] # I realize this is not Pythonic, but you have to call Outlook with successive integers.
for guid in guids:
self.olCalendar.Session.GetItemFromID(guid).Delete()
return True
def storeFullCalendar(self, cal):
assert(isinstance(cal, list))
if len(cal) > 0:
assert(isinstance(cal[0], dict))
# TODO - improve gross inefficiency
# TODO - improve data integrity -- If it crashes after the delete but before the save, I don't want to lose data!
self._deleteAllEventsFromOutlook()
for item in cal:
oEvent = self.olCalendar.Items.Add(self.OlAppointmentItem)
oEvent.Subject = item[Session.SUBJECT]
oEvent.Start = item[Session.START]
oEvent.End = item[Session.END]
oEvent.BusyStatus = item[Session.BUSY]
oEvent.Location = item[Session.LOCATION]
oEvent.MeetingStatus = item[Session.MEETING_STATUS]
oEvent.ReminderMinutesBeforeStart = 0
oEvent.ReminderSet = True
oEvent.Sensitivity = item[Session.VISIBILITY]
oEvent.ClearRecurrencePattern()
oEvent.Save()
item[Session.GUID] = oEvent.EntryID
return True
Add new comment