1 | """ |
---|
2 | Tests for allmydata.util.consumer. |
---|
3 | |
---|
4 | Ported to Python 3. |
---|
5 | """ |
---|
6 | |
---|
7 | from zope.interface import implementer |
---|
8 | from twisted.internet.interfaces import IPushProducer, IPullProducer |
---|
9 | |
---|
10 | from allmydata.util.consumer import MemoryConsumer |
---|
11 | |
---|
12 | from .common import ( |
---|
13 | SyncTestCase, |
---|
14 | ) |
---|
15 | from testtools.matchers import ( |
---|
16 | Equals, |
---|
17 | ) |
---|
18 | |
---|
19 | |
---|
20 | @implementer(IPushProducer) |
---|
21 | @implementer(IPullProducer) |
---|
22 | class Producer(object): |
---|
23 | """Can be used as either streaming or non-streaming producer. |
---|
24 | |
---|
25 | If used as streaming, the test should call iterate() manually. |
---|
26 | """ |
---|
27 | |
---|
28 | def __init__(self, consumer, data): |
---|
29 | self.data = data |
---|
30 | self.consumer = consumer |
---|
31 | self.done = False |
---|
32 | |
---|
33 | def stopProducing(self): |
---|
34 | pass |
---|
35 | |
---|
36 | def pauseProducing(self): |
---|
37 | pass |
---|
38 | |
---|
39 | def resumeProducing(self): |
---|
40 | """Kick off streaming.""" |
---|
41 | self.iterate() |
---|
42 | |
---|
43 | def iterate(self): |
---|
44 | """Do another iteration of writing.""" |
---|
45 | if self.done: |
---|
46 | raise RuntimeError( |
---|
47 | "There's a bug somewhere, shouldn't iterate after being done" |
---|
48 | ) |
---|
49 | if self.data: |
---|
50 | self.consumer.write(self.data.pop(0)) |
---|
51 | else: |
---|
52 | self.done = True |
---|
53 | self.consumer.unregisterProducer() |
---|
54 | |
---|
55 | |
---|
56 | class MemoryConsumerTests(SyncTestCase): |
---|
57 | """Tests for MemoryConsumer.""" |
---|
58 | |
---|
59 | def test_push_producer(self): |
---|
60 | """ |
---|
61 | A MemoryConsumer accumulates all data sent by a streaming producer. |
---|
62 | """ |
---|
63 | consumer = MemoryConsumer() |
---|
64 | producer = Producer(consumer, [b"abc", b"def", b"ghi"]) |
---|
65 | consumer.registerProducer(producer, True) |
---|
66 | self.assertThat(consumer.chunks, Equals([b"abc"])) |
---|
67 | producer.iterate() |
---|
68 | producer.iterate() |
---|
69 | self.assertThat(consumer.chunks, Equals([b"abc", b"def", b"ghi"])) |
---|
70 | self.assertFalse(consumer.done) |
---|
71 | producer.iterate() |
---|
72 | self.assertThat(consumer.chunks, Equals([b"abc", b"def", b"ghi"])) |
---|
73 | self.assertTrue(consumer.done) |
---|
74 | |
---|
75 | def test_pull_producer(self): |
---|
76 | """ |
---|
77 | A MemoryConsumer accumulates all data sent by a non-streaming producer. |
---|
78 | """ |
---|
79 | consumer = MemoryConsumer() |
---|
80 | producer = Producer(consumer, [b"abc", b"def", b"ghi"]) |
---|
81 | consumer.registerProducer(producer, False) |
---|
82 | self.assertThat(consumer.chunks, Equals([b"abc", b"def", b"ghi"])) |
---|
83 | self.assertTrue(consumer.done) |
---|
84 | |
---|
85 | |
---|
86 | # download_to_data() is effectively tested by some of the filenode tests, e.g. |
---|
87 | # test_immutable.py. |
---|