Project

General

Profile

1
<?php
2
/*
3
 * Licensed to the Apache Software Foundation (ASF) under one
4
 * or more contributor license agreements. See the NOTICE file
5
 * distributed with this work for additional information
6
 * regarding copyright ownership. The ASF licenses this file
7
 * to you under the Apache License, Version 2.0 (the
8
 * "License"); you may not use this file except in compliance
9
 * with the License. You may obtain a copy of the License at
10
 *
11
 *   http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 * Unless required by applicable law or agreed to in writing,
14
 * software distributed under the License is distributed on an
15
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16
 * KIND, either express or implied. See the License for the
17
 * specific language governing permissions and limitations
18
 * under the License.
19
 *
20
 * @package thrift.transport
21
 */
22

    
23

    
24
/**
25
 * Sockets implementation of the TTransport interface.
26
 *
27
 * @package thrift.transport
28
 */
29
class TSocket extends TTransport {
30

    
31
  /**
32
   * Handle to PHP socket
33
   *
34
   * @var resource
35
   */
36
  private $handle_ = null;
37

    
38
  /**
39
   * Remote hostname
40
   *
41
   * @var string
42
   */
43
  protected $host_ = 'localhost';
44

    
45
  /**
46
   * Remote port
47
   *
48
   * @var int
49
   */
50
  protected $port_ = '9090';
51

    
52
  /**
53
   * Send timeout in seconds.
54
   *
55
   * Combined with sendTimeoutUsec this is used for send timeouts.
56
   *
57
   * @var int
58
   */
59
  private $sendTimeoutSec_ = 0;
60

    
61
  /**
62
   * Send timeout in microseconds.
63
   *
64
   * Combined with sendTimeoutSec this is used for send timeouts.
65
   *
66
   * @var int
67
   */
68
  private $sendTimeoutUsec_ = 100000;
69

    
70
  /**
71
   * Recv timeout in seconds
72
   *
73
   * Combined with recvTimeoutUsec this is used for recv timeouts.
74
   *
75
   * @var int
76
   */
77
  private $recvTimeoutSec_ = 0;
78

    
79
  /**
80
   * Recv timeout in microseconds
81
   *
82
   * Combined with recvTimeoutSec this is used for recv timeouts.
83
   *
84
   * @var int
85
   */
86
  private $recvTimeoutUsec_ = 750000;
87

    
88
  /**
89
   * Persistent socket or plain?
90
   *
91
   * @var bool
92
   */
93
  protected $persist_ = FALSE;
94

    
95
  /**
96
   * Debugging on?
97
   *
98
   * @var bool
99
   */
100
  protected $debug_ = FALSE;
101

    
102
  /**
103
   * Debug handler
104
   *
105
   * @var mixed
106
   */
107
  protected $debugHandler_ = null;
108

    
109
  /**
110
   * Socket constructor
111
   *
112
   * @param string $host         Remote hostname
113
   * @param int    $port         Remote port
114
   * @param bool   $persist      Whether to use a persistent socket
115
   * @param string $debugHandler Function to call for error logging
116
   */
117
  public function __construct($host='localhost',
118
                              $port=9090,
119
                              $persist=FALSE,
120
                              $debugHandler=null) {
121
    $this->host_ = $host;
122
    $this->port_ = $port;
123
    $this->persist_ = $persist;
124
    $this->debugHandler_ = $debugHandler ? $debugHandler : 'error_log';
125
  }
126

    
127
  /**
128
   * @param resource $handle
129
   * @return void
130
   */
131
  public function setHandle($handle) {
132
    $this->handle_ = $handle;
133
  }
134

    
135
  /**
136
   * Sets the send timeout.
137
   *
138
   * @param int $timeout  Timeout in milliseconds.
139
   */
140
  public function setSendTimeout($timeout) {
141
    $this->sendTimeoutSec_ = floor($timeout / 1000);
142
    $this->sendTimeoutUsec_ =
143
            ($timeout - ($this->sendTimeoutSec_ * 1000)) * 1000;
144
  }
145

    
146
  /**
147
   * Sets the receive timeout.
148
   *
149
   * @param int $timeout  Timeout in milliseconds.
150
   */
151
  public function setRecvTimeout($timeout) {
152
    $this->recvTimeoutSec_ = floor($timeout / 1000);
153
    $this->recvTimeoutUsec_ =
154
            ($timeout - ($this->recvTimeoutSec_ * 1000)) * 1000;
155
  }
156

    
157
  /**
158
   * Sets debugging output on or off
159
   *
160
   * @param bool $debug
161
   */
162
  public function setDebug($debug) {
163
    $this->debug_ = $debug;
164
  }
165

    
166
  /**
167
   * Get the host that this socket is connected to
168
   *
169
   * @return string host
170
   */
171
  public function getHost() {
172
    return $this->host_;
173
  }
174

    
175
  /**
176
   * Get the remote port that this socket is connected to
177
   *
178
   * @return int port
179
   */
180
  public function getPort() {
181
    return $this->port_;
182
  }
183

    
184
  /**
185
   * Tests whether this is open
186
   *
187
   * @return bool true if the socket is open
188
   */
189
  public function isOpen() {
190
    return is_resource($this->handle_);
191
  }
192

    
193
  /**
194
   * Connects the socket.
195
   */
196
  public function open() {
197
    if ($this->isOpen()) {
198
      throw new TTransportException('Socket already connected', TTransportException::ALREADY_OPEN);
199
    }
200

    
201
    if (empty($this->host_)) {
202
      throw new TTransportException('Cannot open null host', TTransportException::NOT_OPEN);
203
    }
204

    
205
    if ($this->port_ <= 0) {
206
      throw new TTransportException('Cannot open without port', TTransportException::NOT_OPEN);
207
    }
208

    
209
    if ($this->persist_) {
210
      $this->handle_ = @pfsockopen($this->host_,
211
                                   $this->port_,
212
                                   $errno,
213
                                   $errstr,
214
                                   $this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000));
215
    } else {
216
      $this->handle_ = @fsockopen($this->host_,
217
                                  $this->port_,
218
                                  $errno,
219
                                  $errstr,
220
                                  $this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000));
221
    }
222

    
223
    // Connect failed?
224
    if ($this->handle_ === FALSE) {
225
      $error = 'TSocket: Could not connect to '.$this->host_.':'.$this->port_.' ('.$errstr.' ['.$errno.'])';
226
      if ($this->debug_) {
227
        call_user_func($this->debugHandler_, $error);
228
      }
229
      throw new TException($error);
230
    }
231
  }
232

    
233
  /**
234
   * Closes the socket.
235
   */
236
  public function close() {
237
    if (!$this->persist_) {
238
      @fclose($this->handle_);
239
      $this->handle_ = null;
240
    }
241
  }
242

    
243
  /**
244
   * Read from the socket at most $len bytes.
245
   *
246
   * This method will not wait for all the requested data, it will return as
247
   * soon as any data is received.
248
   *
249
   * @param int $len Maximum number of bytes to read.
250
   * @return string Binary data
251
   */
252
  public function read($len) {
253
    $null = null;
254
    $read = array($this->handle_);
255
    $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec_, $this->recvTimeoutUsec_);
256

    
257
    if ($readable > 0) {
258
      $data = @stream_socket_recvfrom($this->handle_, $len);
259
      if ($data === false) {
260
          throw new TTransportException('TSocket: Could not read '.$len.' bytes from '.
261
                               $this->host_.':'.$this->port_);
262
      } elseif($data == '' && feof($this->handle_)) {
263
          throw new TTransportException('TSocket read 0 bytes');
264
        }
265

    
266
      return $data;
267
    } else if ($readable === 0) {
268
        throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '.
269
                             $this->host_.':'.$this->port_);
270
      } else {
271
        throw new TTransportException('TSocket: Could not read '.$len.' bytes from '.
272
                             $this->host_.':'.$this->port_);
273
      }
274
    }
275

    
276
  /**
277
   * Write to the socket.
278
   *
279
   * @param string $buf The data to write
280
   */
281
  public function write($buf) {
282
    $null = null;
283
    $write = array($this->handle_);
284

    
285
    // keep writing until all the data has been written
286
    while (strlen($buf) > 0) {
287
      // wait for stream to become available for writing
288
      $writable = @stream_select($null, $write, $null, $this->sendTimeoutSec_, $this->sendTimeoutUsec_);
289
      if ($writable > 0) {
290
        // write buffer to stream
291
        $written = @stream_socket_sendto($this->handle_, $buf);
292
        if ($written === -1 || $written === false) {
293
          throw new TTransportException('TSocket: Could not write '.strlen($buf).' bytes '.
294
                                   $this->host_.':'.$this->port_);
295
        }
296
        // determine how much of the buffer is left to write
297
        $buf = substr($buf, $written);
298
      } else if ($writable === 0) {
299
          throw new TTransportException('TSocket: timed out writing '.strlen($buf).' bytes from '.
300
                               $this->host_.':'.$this->port_);
301
        } else {
302
            throw new TTransportException('TSocket: Could not write '.strlen($buf).' bytes '.
303
                                 $this->host_.':'.$this->port_);
304
        }
305
      }
306
    }
307

    
308
  /**
309
   * Flush output to the socket.
310
   *
311
   * Since read(), readAll() and write() operate on the sockets directly,
312
   * this is a no-op
313
   *
314
   * If you wish to have flushable buffering behaviour, wrap this TSocket
315
   * in a TBufferedTransport.
316
   */
317
  public function flush() {
318
    // no-op
319
    }
320
  }
(9-9/12)