ReadableStream.php 9.16 KB
Newer Older
1
<?php
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright 2016-2017 MongoDB, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
17 18 19

namespace MongoDB\GridFS;

20
use IteratorIterator;
21
use MongoDB\Exception\InvalidArgumentException;
22
use MongoDB\GridFS\Exception\CorruptFileException;
23
use stdClass;
24 25 26 27 28 29 30
use function ceil;
use function floor;
use function is_integer;
use function property_exists;
use function sprintf;
use function strlen;
use function substr;
31 32

/**
33
 * ReadableStream abstracts the process of reading a GridFS file.
34 35 36
 *
 * @internal
 */
37
class ReadableStream
38
{
39
    /** @var string|null */
40
    private $buffer;
41 42

    /** @var integer */
43
    private $bufferOffset = 0;
44 45

    /** @var integer */
46
    private $chunkSize;
47 48

    /** @var integer */
49
    private $chunkOffset = 0;
50 51

    /** @var IteratorIterator|null */
52
    private $chunksIterator;
53 54

    /** @var CollectionWrapper */
55
    private $collectionWrapper;
56 57

    /** @var float|integer */
58
    private $expectedLastChunkSize = 0;
59 60

    /** @var stdClass */
61
    private $file;
62 63

    /** @var integer */
64
    private $length;
65 66

    /** @var integer */
67
    private $numChunks = 0;
68 69

    /**
70
     * Constructs a readable GridFS stream.
71
     *
72
     * @param CollectionWrapper $collectionWrapper GridFS collection wrapper
73
     * @param stdClass          $file              GridFS file document
74
     * @throws CorruptFileException
75
     */
76
    public function __construct(CollectionWrapper $collectionWrapper, stdClass $file)
77
    {
78
        if (! isset($file->chunkSize) || ! is_integer($file->chunkSize) || $file->chunkSize < 1) {
79 80 81
            throw new CorruptFileException('file.chunkSize is not an integer >= 1');
        }

82
        if (! isset($file->length) || ! is_integer($file->length) || $file->length < 0) {
83 84 85
            throw new CorruptFileException('file.length is not an integer > 0');
        }

86
        if (! isset($file->_id) && ! property_exists($file, '_id')) {
87 88 89
            throw new CorruptFileException('file._id does not exist');
        }

90
        $this->file = $file;
91 92
        $this->chunkSize = (integer) $file->chunkSize;
        $this->length = (integer) $file->length;
93

94
        $this->collectionWrapper = $collectionWrapper;
95 96 97 98 99

        if ($this->length > 0) {
            $this->numChunks = (integer) ceil($this->length / $this->chunkSize);
            $this->expectedLastChunkSize = ($this->length - (($this->numChunks - 1) * $this->chunkSize));
        }
100 101
    }

102 103 104 105 106 107 108 109 110 111 112
    /**
     * Return internal properties for debugging purposes.
     *
     * @see http://php.net/manual/en/language.oop5.magic.php#language.oop5.magic.debuginfo
     * @return array
     */
    public function __debugInfo()
    {
        return [
            'bucketName' => $this->collectionWrapper->getBucketName(),
            'databaseName' => $this->collectionWrapper->getDatabaseName(),
113
            'file' => $this->file,
114 115 116
        ];
    }

117 118
    public function close()
    {
119
        // Nothing to do
120 121
    }

122
    /**
123
     * Return the stream's file document.
124
     *
125
     * @return stdClass
126
     */
127
    public function getFile()
128
    {
129
        return $this->file;
130 131
    }

132
    /**
133
     * Return the stream's size in bytes.
134 135 136
     *
     * @return integer
     */
137 138
    public function getSize()
    {
139
        return $this->length;
140 141
    }

142
    /**
Jeremy Mikola's avatar
Jeremy Mikola committed
143 144 145 146
     * Return whether the current read position is at the end of the stream.
     *
     * @return boolean
     */
147 148
    public function isEOF()
    {
149 150 151 152 153
        if ($this->chunkOffset === $this->numChunks - 1) {
            return $this->bufferOffset >= $this->expectedLastChunkSize;
        }

        return $this->chunkOffset >= $this->numChunks;
154 155
    }

156 157 158 159 160
    /**
     * Read bytes from the stream.
     *
     * Note: this method may return a string smaller than the requested length
     * if data is not available to be read.
161
     *
162 163 164 165 166
     * @param integer $length Number of bytes to read
     * @return string
     * @throws InvalidArgumentException if $length is negative
     */
    public function readBytes($length)
167
    {
168 169 170
        if ($length < 0) {
            throw new InvalidArgumentException(sprintf('$length must be >= 0; given: %d', $length));
        }
171

172 173 174 175 176 177
        if ($this->chunksIterator === null) {
            $this->initChunksIterator();
        }

        if ($this->buffer === null && ! $this->initBufferFromCurrentChunk()) {
            return '';
178 179
        }

180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
        $data = '';

        while (strlen($data) < $length) {
            if ($this->bufferOffset >= strlen($this->buffer) && ! $this->initBufferFromNextChunk()) {
                break;
            }

            $initialDataLength = strlen($data);
            $data .= substr($this->buffer, $this->bufferOffset, $length - $initialDataLength);
            $this->bufferOffset += strlen($data) - $initialDataLength;
        }

        return $data;
    }

195 196 197 198 199 200 201 202 203
    /**
     * Seeks the chunk and buffer offsets for the next read operation.
     *
     * @param integer $offset
     * @throws InvalidArgumentException if $offset is out of range
     */
    public function seek($offset)
    {
        if ($offset < 0 || $offset > $this->file->length) {
204
            throw new InvalidArgumentException(sprintf('$offset must be >= 0 and <= %d; given: %d', $this->file->length, $offset));
205 206 207 208 209 210 211 212 213 214
        }

        /* Compute the offsets for the chunk and buffer (i.e. chunk data) from
         * which we will expect to read after seeking. If the chunk offset
         * changed, we'll also need to reset the buffer.
         */
        $lastChunkOffset = $this->chunkOffset;
        $this->chunkOffset = (integer) floor($offset / $this->chunkSize);
        $this->bufferOffset = $offset % $this->chunkSize;

Katherine Walker's avatar
Katherine Walker committed
215 216 217 218 219 220 221 222 223 224 225
        if ($lastChunkOffset === $this->chunkOffset) {
            return;
        }

        if ($this->chunksIterator === null) {
            return;
        }

        // Clear the buffer since the current chunk will be changed
        $this->buffer = null;

226 227 228 229
        /* If we are seeking to a previous chunk, we need to reinitialize the
         * chunk iterator.
         */
        if ($lastChunkOffset > $this->chunkOffset) {
230
            $this->chunksIterator = null;
231

232 233 234 235 236 237 238 239
            return;
        }

        /* If we are seeking to a subsequent chunk, we do not need to
         * reinitalize the chunk iterator. Instead, we can simply move forward
         * to $this->chunkOffset.
         */
        $numChunks = $this->chunkOffset - $lastChunkOffset;
Katherine Walker's avatar
Katherine Walker committed
240 241
        for ($i = 0; $i < $numChunks; $i++) {
            $this->chunksIterator->next();
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
        }
    }

    /**
     * Return the current position of the stream.
     *
     * This is the offset within the stream where the next byte would be read.
     *
     * @return integer
     */
    public function tell()
    {
        return ($this->chunkOffset * $this->chunkSize) + $this->bufferOffset;
    }

257 258 259 260 261 262 263 264 265 266
    /**
     * Initialize the buffer to the current chunk's data.
     *
     * @return boolean Whether there was a current chunk to read
     * @throws CorruptFileException if an expected chunk could not be read successfully
     */
    private function initBufferFromCurrentChunk()
    {
        if ($this->chunkOffset === 0 && $this->numChunks === 0) {
            return false;
267 268
        }

269
        if (! $this->chunksIterator->valid()) {
270
            throw CorruptFileException::missingChunk($this->chunkOffset);
271 272
        }

273 274 275 276
        $currentChunk = $this->chunksIterator->current();

        if ($currentChunk->n !== $this->chunkOffset) {
            throw CorruptFileException::unexpectedIndex($currentChunk->n, $this->chunkOffset);
277 278
        }

279
        $this->buffer = $currentChunk->data->getData();
280

281 282
        $actualChunkSize = strlen($this->buffer);

283
        $expectedChunkSize = $this->chunkOffset === $this->numChunks - 1
284
            ? $this->expectedLastChunkSize
285
            : $this->chunkSize;
286

287
        if ($actualChunkSize !== $expectedChunkSize) {
288
            throw CorruptFileException::unexpectedSize($actualChunkSize, $expectedChunkSize);
289 290 291 292
        }

        return true;
    }
293

294 295 296 297 298 299 300
    /**
     * Advance to the next chunk and initialize the buffer to its data.
     *
     * @return boolean Whether there was a next chunk to read
     * @throws CorruptFileException if an expected chunk could not be read successfully
     */
    private function initBufferFromNextChunk()
301
    {
302 303
        if ($this->chunkOffset === $this->numChunks - 1) {
            return false;
304 305
        }

306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
        $this->bufferOffset = 0;
        $this->chunkOffset++;
        $this->chunksIterator->next();

        return $this->initBufferFromCurrentChunk();
    }

    /**
     * Initializes the chunk iterator starting from the current offset.
     */
    private function initChunksIterator()
    {
        $cursor = $this->collectionWrapper->findChunksByFileId($this->file->_id, $this->chunkOffset);

        $this->chunksIterator = new IteratorIterator($cursor);
        $this->chunksIterator->rewind();
322
    }
323
}