1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
|
# Copyright Mateusz Kobos, (c) 2011
# https://code.activestate.com/recipes/577803-reader-writer-lock-with-priority-for-writers/
# released under the MIT licence
import unittest
import threading
import time
import copy
from ._rwlock import RWLock
class Writer(threading.Thread):
def __init__(
self, buffer_, rw_lock, init_sleep_time, sleep_time, to_write
):
"""
@param buffer_: common buffer_ shared by the readers and writers
@type buffer_: list
@type rw_lock: L{RWLock}
@param init_sleep_time: sleep time before doing any action
@type init_sleep_time: C{float}
@param sleep_time: sleep time while in critical section
@type sleep_time: C{float}
@param to_write: data that will be appended to the buffer
"""
threading.Thread.__init__(self)
self.__buffer = buffer_
self.__rw_lock = rw_lock
self.__init_sleep_time = init_sleep_time
self.__sleep_time = sleep_time
self.__to_write = to_write
self.entry_time = None
"""Time of entry to the critical section"""
self.exit_time = None
"""Time of exit from the critical section"""
def run(self):
time.sleep(self.__init_sleep_time)
self.__rw_lock.writer_acquire()
self.entry_time = time.time()
time.sleep(self.__sleep_time)
self.__buffer.append(self.__to_write)
self.exit_time = time.time()
self.__rw_lock.writer_release()
class Reader(threading.Thread):
def __init__(self, buffer_, rw_lock, init_sleep_time, sleep_time):
"""
@param buffer_: common buffer shared by the readers and writers
@type buffer_: list
@type rw_lock: L{RWLock}
@param init_sleep_time: sleep time before doing any action
@type init_sleep_time: C{float}
@param sleep_time: sleep time while in critical section
@type sleep_time: C{float}
"""
threading.Thread.__init__(self)
self.__buffer = buffer_
self.__rw_lock = rw_lock
self.__init_sleep_time = init_sleep_time
self.__sleep_time = sleep_time
self.buffer_read = None
"""a copy of a the buffer read while in critical section"""
self.entry_time = None
"""Time of entry to the critical section"""
self.exit_time = None
"""Time of exit from the critical section"""
def run(self):
time.sleep(self.__init_sleep_time)
self.__rw_lock.reader_acquire()
self.entry_time = time.time()
time.sleep(self.__sleep_time)
self.buffer_read = copy.deepcopy(self.__buffer)
self.exit_time = time.time()
self.__rw_lock.reader_release()
class RWLockTestCase(unittest.TestCase):
def test_readers_nonexclusive_access(self):
(buffer_, rw_lock, threads) = self.__init_variables()
threads.append(Reader(buffer_, rw_lock, 0, 0))
threads.append(Writer(buffer_, rw_lock, 0.2, 0.4, 1))
threads.append(Reader(buffer_, rw_lock, 0.3, 0.3))
threads.append(Reader(buffer_, rw_lock, 0.5, 0))
self.__start_and_join_threads(threads)
## The third reader should enter after the second one but it should
## exit before the second one exits
## (i.e. the readers should be in the critical section
## at the same time)
self.assertEqual([], threads[0].buffer_read)
self.assertEqual([1], threads[2].buffer_read)
self.assertEqual([1], threads[3].buffer_read)
self.assertTrue(threads[1].exit_time <= threads[2].entry_time)
self.assertTrue(threads[2].entry_time <= threads[3].entry_time)
self.assertTrue(threads[3].exit_time < threads[2].exit_time)
def test_writers_exclusive_access(self):
(buffer_, rw_lock, threads) = self.__init_variables()
threads.append(Writer(buffer_, rw_lock, 0, 0.4, 1))
threads.append(Writer(buffer_, rw_lock, 0.1, 0, 2))
threads.append(Reader(buffer_, rw_lock, 0.2, 0))
self.__start_and_join_threads(threads)
## The second writer should wait for the first one to exit
self.assertEqual([1, 2], threads[2].buffer_read)
self.assertTrue(threads[0].exit_time <= threads[1].entry_time)
self.assertTrue(threads[1].exit_time <= threads[2].exit_time)
def test_writer_priority(self):
(buffer_, rw_lock, threads) = self.__init_variables()
threads.append(Writer(buffer_, rw_lock, 0, 0, 1))
threads.append(Reader(buffer_, rw_lock, 0.1, 0.4))
threads.append(Writer(buffer_, rw_lock, 0.2, 0, 2))
threads.append(Reader(buffer_, rw_lock, 0.3, 0))
threads.append(Reader(buffer_, rw_lock, 0.3, 0))
self.__start_and_join_threads(threads)
## The second writer should go before the second and the third reader
self.assertEqual([1], threads[1].buffer_read)
self.assertEqual([1, 2], threads[3].buffer_read)
self.assertEqual([1, 2], threads[4].buffer_read)
self.assertTrue(threads[0].exit_time < threads[1].entry_time)
self.assertTrue(threads[1].exit_time <= threads[2].entry_time)
self.assertTrue(threads[2].exit_time <= threads[3].entry_time)
self.assertTrue(threads[2].exit_time <= threads[4].entry_time)
def test_many_writers_priority(self):
(buffer_, rw_lock, threads) = self.__init_variables()
threads.append(Writer(buffer_, rw_lock, 0, 0, 1))
threads.append(Reader(buffer_, rw_lock, 0.1, 0.6))
threads.append(Writer(buffer_, rw_lock, 0.2, 0.1, 2))
threads.append(Reader(buffer_, rw_lock, 0.3, 0))
threads.append(Reader(buffer_, rw_lock, 0.4, 0))
threads.append(Writer(buffer_, rw_lock, 0.5, 0.1, 3))
self.__start_and_join_threads(threads)
## The two last writers should go first -- after the first reader and
## before the second and the third reader
self.assertEqual([1], threads[1].buffer_read)
self.assertEqual([1, 2, 3], threads[3].buffer_read)
self.assertEqual([1, 2, 3], threads[4].buffer_read)
self.assertTrue(threads[0].exit_time < threads[1].entry_time)
self.assertTrue(threads[1].exit_time <= threads[2].entry_time)
self.assertTrue(threads[1].exit_time <= threads[5].entry_time)
self.assertTrue(threads[2].exit_time <= threads[3].entry_time)
self.assertTrue(threads[2].exit_time <= threads[4].entry_time)
self.assertTrue(threads[5].exit_time <= threads[3].entry_time)
self.assertTrue(threads[5].exit_time <= threads[4].entry_time)
@staticmethod
def __init_variables():
buffer_ = []
rw_lock = RWLock()
threads = []
return (buffer_, rw_lock, threads)
@staticmethod
def __start_and_join_threads(threads):
for t in threads:
t.start()
for t in threads:
t.join()
|