D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
usr
/
share
/
doc
/
python-zope-component-4.1.0
/
html
/
_sources
/
api
/
Filename :
persistent.txt
back
Copy
Persistent Registries ===================== .. testsetup:: from zope.component.testing import setUp setUp() Conforming Adapter Lookup ------------------------- Here, we'll demonstrate that changes work even when data are stored in a database and when accessed from multiple connections. Start by setting up a database and creating two transaction managers and database connections to work with. .. doctest:: >>> import ZODB.tests.util >>> db = ZODB.tests.util.DB() >>> import transaction >>> t1 = transaction.TransactionManager() >>> c1 = db.open(transaction_manager=t1) >>> r1 = c1.root() >>> t2 = transaction.TransactionManager() >>> c2 = db.open(transaction_manager=t2) >>> r2 = c2.root() Create a set of components registries in the database, alternating connections. .. doctest:: >>> from zope.component.persistentregistry import PersistentComponents >>> from zope.component.tests.examples import I1 >>> from zope.component.tests.examples import I2 >>> from zope.component.tests.examples import U >>> from zope.component.tests.examples import U1 >>> from zope.component.tests.examples import U12 >>> from zope.component.tests.examples import handle1 >>> from zope.component.tests.examples import handle2 >>> from zope.component.tests.examples import handle3 >>> from zope.component.tests.examples import handle4 >>> _ = t1.begin() >>> r1[1] = PersistentComponents('1') >>> t1.commit() >>> _ = t2.begin() >>> r2[2] = PersistentComponents('2', (r2[1], )) >>> t2.commit() >>> _ = t1.begin() >>> r1[3] = PersistentComponents('3', (r1[1], )) >>> t1.commit() >>> _ = t2.begin() >>> r2[4] = PersistentComponents('4', (r2[2], r2[3])) >>> t2.commit() >>> _ = t1.begin() >>> r1[1].__bases__ () >>> r1[2].__bases__ == (r1[1], ) True >>> r1[1].registerUtility(U1(1)) >>> r1[1].queryUtility(I1) U1(1) >>> r1[2].queryUtility(I1) U1(1) >>> t1.commit() >>> _ = t2.begin() >>> r2[1].registerUtility(U1(2)) >>> r2[2].queryUtility(I1) U1(2) >>> r2[4].queryUtility(I1) U1(2) >>> t2.commit() >>> _ = t1.begin() >>> r1[1].registerUtility(U12(1), I2) >>> r1[4].queryUtility(I2) U12(1) >>> t1.commit() >>> _ = t2.begin() >>> r2[3].registerUtility(U12(3), I2) >>> r2[4].queryUtility(I2) U12(3) >>> t2.commit() >>> _ = t1.begin() >>> r1[1].registerHandler(handle1, info="First handler") >>> r1[2].registerHandler(handle2, required=[U]) >>> r1[3].registerHandler(handle3) >>> r1[4].registerHandler(handle4) >>> r1[4].handle(U1(1)) handle1 U1(1) handle3 U1(1) handle2 (U1(1),) handle4 U1(1) >>> t1.commit() >>> _ = t2.begin() >>> r2[4].handle(U1(1)) handle1 U1(1) handle3 U1(1) handle2 (U1(1),) handle4 U1(1) >>> t2.abort() >>> db.close() Subscription to Events in Persistent Registries ----------------------------------------------- .. doctest:: >>> import ZODB.tests.util >>> db = ZODB.tests.util.DB() >>> import transaction >>> t1 = transaction.TransactionManager() >>> c1 = db.open(transaction_manager=t1) >>> r1 = c1.root() >>> t2 = transaction.TransactionManager() >>> c2 = db.open(transaction_manager=t2) >>> r2 = c2.root() >>> from zope.component.persistentregistry import PersistentComponents >>> _ = t1.begin() >>> r1[1] = PersistentComponents('1') >>> r1[1].registerHandler(handle1) >>> r1[1].registerSubscriptionAdapter(handle1, provided=I2) >>> _ = r1[1].unregisterHandler(handle1) >>> _ = r1[1].unregisterSubscriptionAdapter(handle1, provided=I2) >>> t1.commit() >>> _ = t1.begin() >>> r1[1].registerHandler(handle1) >>> r1[1].registerSubscriptionAdapter(handle1, provided=I2) >>> t1.commit() >>> _ = t2.begin() >>> len(list(r2[1].registeredHandlers())) 1 >>> len(list(r2[1].registeredSubscriptionAdapters())) 1 >>> t2.abort() Adapter Registrations after Serialization / Deserialization ----------------------------------------------------------- We want to make sure that we see updates corrextly. .. doctest:: >>> import persistent >>> import transaction >>> from zope.interface import Interface >>> from zope.interface import implements >>> class IFoo(Interface): ... pass >>> class Foo(persistent.Persistent): ... implements(IFoo) ... name = '' ... def __init__(self, name=''): ... self.name = name ... def __repr__(self): ... return 'Foo(%r)' % self.name >>> from zope.component.tests.examples import base >>> from zope.component.tests.examples import clear_base >>> len(base._v_subregistries) 0 >>> import ZODB.tests.util >>> db = ZODB.tests.util.DB() >>> tm1 = transaction.TransactionManager() >>> c1 = db.open(transaction_manager=tm1) >>> from zope.component.persistentregistry import PersistentAdapterRegistry >>> r1 = PersistentAdapterRegistry((base,)) >>> r2 = PersistentAdapterRegistry((r1,)) >>> c1.root()[1] = r1 >>> c1.root()[2] = r2 >>> tm1.commit() >>> r1._p_deactivate() >>> len(base._v_subregistries) 0 >>> tm2 = transaction.TransactionManager() >>> c2 = db.open(transaction_manager=tm2) >>> r1 = c2.root()[1] >>> r2 = c2.root()[2] >>> r1.lookup((), IFoo, '') >>> base.register((), IFoo, '', Foo('')) >>> r1.lookup((), IFoo, '') Foo('') >>> r2.lookup((), IFoo, '1') >>> r1.register((), IFoo, '1', Foo('1')) >>> r2.lookup((), IFoo, '1') Foo('1') >>> r1.lookup((), IFoo, '2') >>> r2.lookup((), IFoo, '2') >>> base.register((), IFoo, '2', Foo('2')) >>> r1.lookup((), IFoo, '2') Foo('2') >>> r2.lookup((), IFoo, '2') Foo('2') >>> db.close() >>> clear_base() .. testcleanup:: from zope.component.testing import tearDown tearDown()
Name
Size
Last Modified
Owner
Permissions
Actions
adapter.txt
10.405
KB
February 28 2013 2:31:18
root
0644
factory.txt
0.175
KB
February 28 2013 2:31:18
root
0644
interface.txt
4.918
KB
February 28 2013 2:31:18
root
0644
interfaces.txt
0.614
KB
February 28 2013 2:31:18
root
0644
persistent.txt
5.922
KB
February 28 2013 2:31:18
root
0644
security.txt
2.546
KB
February 28 2013 2:31:18
root
0644
sitemanager.txt
2.283
KB
February 28 2013 2:31:18
root
0644
utility.txt
7.338
KB
February 28 2013 2:31:18
root
0644
2017 © D7net | D704T team