Package paramiko :: Module buffered_pipe
[frames] | no frames]

Source Code for Module paramiko.buffered_pipe

  1  # Copyright (C) 2006-2007  Robey Pointer <robeypointer@gmail.com> 
  2  # 
  3  # This file is part of paramiko. 
  4  # 
  5  # Paramiko is free software; you can redistribute it and/or modify it under the 
  6  # terms of the GNU Lesser General Public License as published by the Free 
  7  # Software Foundation; either version 2.1 of the License, or (at your option) 
  8  # any later version. 
  9  # 
 10  # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 
 11  # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 
 12  # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 13  # details. 
 14  # 
 15  # You should have received a copy of the GNU Lesser General Public License 
 16  # along with Paramiko; if not, write to the Free Software Foundation, Inc., 
 17  # 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. 
 18   
 19  """ 
 20  Attempt to generalize the "feeder" part of a `.Channel`: an object which can be 
 21  read from and closed, but is reading from a buffer fed by another thread.  The 
 22  read operations are blocking and can have a timeout set. 
 23  """ 
 24   
 25  import array 
 26  import threading 
 27  import time 
 28  from paramiko.py3compat import PY2, b 
 29   
 30   
31 -class PipeTimeout (IOError):
32 """ 33 Indicates that a timeout was reached on a read from a `.BufferedPipe`. 34 """ 35 pass
36 37
38 -class BufferedPipe (object):
39 """ 40 A buffer that obeys normal read (with timeout) & close semantics for a 41 file or socket, but is fed data from another thread. This is used by 42 `.Channel`. 43 """ 44
45 - def __init__(self):
46 self._lock = threading.Lock() 47 self._cv = threading.Condition(self._lock) 48 self._event = None 49 self._buffer = array.array('B') 50 self._closed = False
51 52 if PY2:
53 - def _buffer_frombytes(self, data):
54 self._buffer.fromstring(data)
55
56 - def _buffer_tobytes(self, limit=None):
57 return self._buffer[:limit].tostring()
58 else:
59 - def _buffer_frombytes(self, data):
60 self._buffer.frombytes(data)
61
62 - def _buffer_tobytes(self, limit=None):
63 return self._buffer[:limit].tobytes()
64
65 - def set_event(self, event):
66 """ 67 Set an event on this buffer. When data is ready to be read (or the 68 buffer has been closed), the event will be set. When no data is 69 ready, the event will be cleared. 70 71 :param threading.Event event: the event to set/clear 72 """ 73 self._event = event 74 if len(self._buffer) > 0: 75 event.set() 76 else: 77 event.clear()
78
79 - def feed(self, data):
80 """ 81 Feed new data into this pipe. This method is assumed to be called 82 from a separate thread, so synchronization is done. 83 84 :param data: the data to add, as a `str` 85 """ 86 self._lock.acquire() 87 try: 88 if self._event is not None: 89 self._event.set() 90 self._buffer_frombytes(b(data)) 91 self._cv.notifyAll() 92 finally: 93 self._lock.release()
94
95 - def read_ready(self):
96 """ 97 Returns true if data is buffered and ready to be read from this 98 feeder. A ``False`` result does not mean that the feeder has closed; 99 it means you may need to wait before more data arrives. 100 101 :return: 102 ``True`` if a `read` call would immediately return at least one 103 byte; ``False`` otherwise. 104 """ 105 self._lock.acquire() 106 try: 107 if len(self._buffer) == 0: 108 return False 109 return True 110 finally: 111 self._lock.release()
112
113 - def read(self, nbytes, timeout=None):
114 """ 115 Read data from the pipe. The return value is a string representing 116 the data received. The maximum amount of data to be received at once 117 is specified by ``nbytes``. If a string of length zero is returned, 118 the pipe has been closed. 119 120 The optional ``timeout`` argument can be a nonnegative float expressing 121 seconds, or ``None`` for no timeout. If a float is given, a 122 `.PipeTimeout` will be raised if the timeout period value has elapsed 123 before any data arrives. 124 125 :param int nbytes: maximum number of bytes to read 126 :param float timeout: 127 maximum seconds to wait (or ``None``, the default, to wait forever) 128 :return: the read data, as a `str` 129 130 :raises PipeTimeout: 131 if a timeout was specified and no data was ready before that 132 timeout 133 """ 134 out = bytes() 135 self._lock.acquire() 136 try: 137 if len(self._buffer) == 0: 138 if self._closed: 139 return out 140 # should we block? 141 if timeout == 0.0: 142 raise PipeTimeout() 143 # loop here in case we get woken up but a different thread has 144 # grabbed everything in the buffer. 145 while (len(self._buffer) == 0) and not self._closed: 146 then = time.time() 147 self._cv.wait(timeout) 148 if timeout is not None: 149 timeout -= time.time() - then 150 if timeout <= 0.0: 151 raise PipeTimeout() 152 153 # something's in the buffer and we have the lock! 154 if len(self._buffer) <= nbytes: 155 out = self._buffer_tobytes() 156 del self._buffer[:] 157 if (self._event is not None) and not self._closed: 158 self._event.clear() 159 else: 160 out = self._buffer_tobytes(nbytes) 161 del self._buffer[:nbytes] 162 finally: 163 self._lock.release() 164 165 return out
166
167 - def empty(self):
168 """ 169 Clear out the buffer and return all data that was in it. 170 171 :return: 172 any data that was in the buffer prior to clearing it out, as a 173 `str` 174 """ 175 self._lock.acquire() 176 try: 177 out = self._buffer_tobytes() 178 del self._buffer[:] 179 if (self._event is not None) and not self._closed: 180 self._event.clear() 181 return out 182 finally: 183 self._lock.release()
184
185 - def close(self):
186 """ 187 Close this pipe object. Future calls to `read` after the buffer 188 has been emptied will return immediately with an empty string. 189 """ 190 self._lock.acquire() 191 try: 192 self._closed = True 193 self._cv.notifyAll() 194 if self._event is not None: 195 self._event.set() 196 finally: 197 self._lock.release()
198
199 - def __len__(self):
200 """ 201 Return the number of bytes buffered. 202 203 :return: number (`int`) of bytes buffered 204 """ 205 self._lock.acquire() 206 try: 207 return len(self._buffer) 208 finally: 209 self._lock.release()
210