Location: OpenVMS/vmspython/python/local/rdb/dbapi2.py

Jean-Francois Pieronne
Bugfix from voalp
# -*- coding: iso-8859-15 -*-
"""Oracle/Rdb - An dbapi2 interface to Oracle/Rdb.
"""
__version__ = "1.0"
import rdb
from _rdb_exceptions import *
# use a read_write default transaction
rdb.declare_transaction('READ WRITE')
# Threads may not share the module
threadsafety = 0
paramstyle = 'qmark'
apilevel = '2.0'
class Cursor(object):
def __init__(self, connection):
from weakref import proxy
self.connection = proxy(connection)
self.description = None
self.rowcount = -1
self.stmt = None
self.arraysize = 1
self.lastrowid = None
self.is_open = True
def close(self):
"""Close the cursor. No further queries will be possible."""
if not self.is_open:
raise ProgrammingError
if self.stmt and self.stmt.isOpen():
self.stmt.close()
self.stmt = None
self.is_open = False
self.connection = None
def execute(self, operation, params=None):
# print operation, params
if not self.is_open:
raise ProgrammingError
if self.stmt:
self.stmt.isOpen()
if self.stmt and self.stmt.isOpen():
self.stmt.close()
self.stmt = None
self.stmt = rdb.Statement(operation)
if self.stmt.isOpen():
self.stmt.close()
if params is None:
self.stmt.execute()
else:
self.stmt.execute(*params)
self.description = self.stmt.description()
self.rowcount = self.stmt.rowcount()
def executemany(self, operation, seq):
# print operation, params
if not self.is_open:
raise ProgrammingError
if self.stmt and self.stmt.isOpen():
self.stmt.close()
self.stmt = None
self.stmt = rdb.Statement(operation)
if self.stmt.isOpen():
self.stmt.close()
for args in seq:
self.stmt.execute(*args)
self.description = self.stmt.description()
self.rowcount = self.stmt.rowcount()
def fetchone(self):
if not self.is_open:
raise ProgrammingError
res = self.stmt.fetchone()
if res is None:
self.stmt.close()
return res
def fetchall(self):
if not self.is_open:
raise ProgrammingError
res = self.stmt.fetchall()
return res
def fetchmany(self, size=None):
if not self.is_open:
raise ProgrammingError
if size is None:
size = self.arraysize
res = self.stmt.fetchmany(size)
return res
def setinputsizes(sel, sizes):
return
def setoutputsize(self, size, column = None):
return
def __iter__(self):
return self.stmt
class Connection(object):
default_cursor = Cursor
def __init__(self, **kwargs):
self.cursorclass = kwargs.get('cursorclass', self.default_cursor)
rdb.attachDB(kwargs['database'])
self.is_open = True
def close(self):
if not self.is_open:
raise ProgrammingError
if rdb.transaction_active():
rdb.rollback()
self.is_open = False
def declare_transaction(self, default_transaction):
if not self.is_open:
raise ProgrammingError
rdb.declare_transaction(default_transaction)
def start_read_only(self):
if not self.is_open:
raise ProgrammingError
rdb.read_only()
def start_read_write(self):
if not self.is_open:
raise ProgrammingError
rdb.read_write()
def commit(self):
if not self.is_open:
raise ProgrammingError
rdb.commit()
def rollback(self):
if not self.is_open:
raise ProgrammingError
rdb.rollback()
def cursor(self, cursorclass=None):
if not self.is_open:
raise ProgrammingError
return (cursorclass or self.cursorclass)(self)
def connect(*args, **kwargs):
"""Factory function for Connection."""
return Connection(*args, **kwargs)