diff --git a/README.md b/README.md index c16dbd7..821ac9a 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ $client = $database->getClient(); **Important:** don't forget to `urlencode()` the password (and username for that matter) when using a DSN, especially if it contains non-alphanumeric characters. Not doing so might cause exceptions to be thrown. - + ### Reading data To fetch records from InfluxDB you can do a query directly on a database: @@ -103,6 +103,16 @@ In production if you are querying InfluxDB to generate a response to a web or AP $database = InfluxDB\Client::fromDSN(sprintf('influxdb://user:pass@%s:%s/%s', $host, $port, $dbname), 5); ``` +### Reading large data sets + +Reading large data sets may exceed available memory. To overcome this obstacle, use the iterate() method to return each point without loading them all first into memory. This method supports only a single measurement, however, you can manually target another measurement if necessary. + +```php +foreach($result->iterate() as $index=>$point) { + // Process each $point as needed. +} +``` + ### Writing data Writing data is done by providing an array of points to the writePoints method on a database: @@ -280,6 +290,27 @@ $result = $client->listUsers(); $result = $client->listDatabases(); ``` +### Other Database functions + +Several additional database functions are available: + +```php +// list measurements +$result = $database->listMeasurements(); + +// list all field keys +$result = $database->listFieldKeys(); + +// list field keys for a single measurement +$result = $database->listFieldKeys('yourMeasurement'); + +// list all tag keys +$result = $database->listTagKeys(); + +// list tag keys for a single measurement +$result = $database->listTagKeys('yourMeasurement'); +``` + ### Admin functionality You can use the client's $client->admin functionality to administer InfluxDB via the API. @@ -371,10 +402,10 @@ $client->admin->revoke(\InfluxDB\Client\Admin::PRIVILEGE_ALL, 'admin_user'); * Fixed tag with Boolean/Null value trigger parse error #### 1.4.1 -* Fixed bug: Escape field values as per line protocol. +* Fixed bug: Escape field values as per line protocol. #### 1.4.0 -* Updating Influx Database with support for writing direct payloads, thanks @virgofx +* Updating Influx Database with support for writing direct payloads, thanks @virgofx #### 1.3.1 * Added ability to write data to a specific retention policy, thanks @virgofx ! diff --git a/composer.json b/composer.json index 4b779c5..81b598d 100644 --- a/composer.json +++ b/composer.json @@ -27,6 +27,7 @@ ], "require": { "php": "^5.5 || ^7.0", + "halaxa/json-machine": "0.3.2", "guzzlehttp/guzzle": "^6.0" }, "require-dev": { diff --git a/src/InfluxDB/Client.php b/src/InfluxDB/Client.php index 14c8c51..e850253 100644 --- a/src/InfluxDB/Client.php +++ b/src/InfluxDB/Client.php @@ -349,6 +349,7 @@ public function getVerifySSL() public function setDriver(DriverInterface $driver) { $this->driver = $driver; + return $this; } /** diff --git a/src/InfluxDB/Database.php b/src/InfluxDB/Database.php index e730aff..086f5ae 100644 --- a/src/InfluxDB/Database.php +++ b/src/InfluxDB/Database.php @@ -8,6 +8,7 @@ use InfluxDB\Database\Exception as DatabaseException; use InfluxDB\Database\RetentionPolicy; use InfluxDB\Query\Builder as QueryBuilder; +use InfluxDB\Query\QueryBuilder as QueryBuilder2; /** * Class Database @@ -205,6 +206,63 @@ public function listRetentionPolicies() return $this->query(sprintf('SHOW RETENTION POLICIES ON "%s"', $this->name))->getPoints(); } + /** + * @return array + */ + public function listMeasurements():array + { + return array_map(function($m){ + return $m[0]; + }, $this->query('SHOW MEASUREMENTS')->getSeries()[0]['values']); + } + + /** + * @param $measurement + * @return array + * @throws Exception + */ + public function listFieldKeys(?string $measurement=null):array + { + $rs = $measurement + ?$this->query(sprintf('SHOW FIELD KEYS FROM "%s"', $measurement))->getSeries() + :$this->query('SHOW FIELD KEYS')->getSeries(); + $arr=[]; + foreach($rs as $m) { + $arr[$m['name']]=[]; + foreach($m['values'] as $v) { + $arr[$m['name']][$v[0]]=$v[1]; + } + } + if($measurement) { + if(!isset($arr[$measurement])) throw new Exception("Measurement $measurement does not exist."); + return $arr[$measurement]; + } + else return $arr; + } + + /** + * @param $measurement + * @return array + * @throws Exception + */ + public function listTagKeys(?string $measurement=null):array + { + $rs = $measurement + ?$this->query(sprintf('SHOW TAG KEYS FROM "%s"', $measurement))->getSeries() + :$this->query('SHOW TAG KEYS')->getSeries(); + $arr=[]; + foreach($rs as $m) { + $arr[$m['name']]=array_map(function($v){ + return $v[0]; + }, $m['values']); + } + if($measurement) { + if(!isset($arr[$measurement])) throw new Exception("Measurement $measurement does not exist."); + return $arr[$measurement]; + } + else return $arr; + } + /** * Drop this database */ @@ -223,6 +281,16 @@ public function getQueryBuilder() return new QueryBuilder($this); } + /** + * Retrieve the optional second query builder + * + * @return QueryBuilder + */ + public function getQueryBuilder2() + { + return new QueryBuilder2($this); + } + /** * @return Client */ diff --git a/src/InfluxDB/Driver/Curl.php b/src/InfluxDB/Driver/Curl.php index 079efc6..f73a8d5 100644 --- a/src/InfluxDB/Driver/Curl.php +++ b/src/InfluxDB/Driver/Curl.php @@ -136,8 +136,10 @@ public function isSuccess() */ public function query() { - $response = $this->execute($this->parameters['url'], $this->getCurlOptions()); - return new ResultSet($response); + $stream=fopen('php://temp', 'w+'); + $this->execute($this->parameters['url'], $this->getCurlOptions() + [CURLOPT_FILE => $stream]); + rewind($stream); + return new ResultSet($stream); } protected function execute($url, $curlOptions = []) @@ -145,25 +147,27 @@ protected function execute($url, $curlOptions = []) $this->lastRequestInfo = null; $ch = curl_init(); - foreach ($curlOptions as $option => $value) { - curl_setopt($ch, $option, $value); - } - - curl_setopt($ch, CURLOPT_URL, $this->dsn . '/' . $url); - curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1); + $curlOptions=[ + CURLOPT_URL => $this->dsn . '/' . $url, + CURLOPT_RETURNTRANSFER => true, /* CURLOPT_FILE must be set after CURLOPT_RETURNTRANSFER */ + CURLOPT_HEADER => 0, + CURLOPT_BUFFERSIZE => 256, + ] + $curlOptions; - $result = curl_exec($ch); - $this->lastRequestInfo = curl_getinfo($ch); + curl_setopt_array( $ch, $curlOptions); - if ($result === false) { + if(isset($curlOptions[CURLOPT_FILE])) { + curl_exec($ch); + rewind($curlOptions[CURLOPT_FILE]); + } + elseif (curl_exec($ch) === false) { // in case of total failure - socket/port is closed etc throw new Exception('Request failed! curl_errno: ' . curl_errno($ch)); } + $this->lastRequestInfo = curl_getinfo($ch); curl_close($ch); - - return $result; } /** diff --git a/src/InfluxDB/Driver/Guzzle.php b/src/InfluxDB/Driver/Guzzle.php index 66cf405..4013fc8 100644 --- a/src/InfluxDB/Driver/Guzzle.php +++ b/src/InfluxDB/Driver/Guzzle.php @@ -7,6 +7,7 @@ use GuzzleHttp\Client; use GuzzleHttp\Psr7\Response; +use GuzzleHttp\Psr7\StreamWrapper; use InfluxDB\ResultSet; /** @@ -86,25 +87,13 @@ public function write($data = null) /** * @throws \Exception - * @return ResultSet + * @return stream */ public function query() { $response = $this->httpClient->get($this->parameters['url'], $this->getRequestParameters()); - $raw = (string) $response->getBody(); - - return $this->asResultSet($raw); - } - - /** - * @param $raw - * @return ResultSet - * @throws \InfluxDB\Client\Exception - */ - protected function asResultSet($raw) - { - return new ResultSet($raw); + return new ResultSet(StreamWrapper::getResource($response->getBody())); } /** diff --git a/src/InfluxDB/Driver/QueryDriverInterface.php b/src/InfluxDB/Driver/QueryDriverInterface.php index 0966cec..69966e8 100644 --- a/src/InfluxDB/Driver/QueryDriverInterface.php +++ b/src/InfluxDB/Driver/QueryDriverInterface.php @@ -16,7 +16,8 @@ interface QueryDriverInterface { /** - * @return ResultSet + * @throws \Exception + * @return stream */ public function query(); } \ No newline at end of file diff --git a/src/InfluxDB/Query/QueryBuilder.php b/src/InfluxDB/Query/QueryBuilder.php new file mode 100644 index 0000000..c9f4264 --- /dev/null +++ b/src/InfluxDB/Query/QueryBuilder.php @@ -0,0 +1,396 @@ +','!=','>','>=','<','<=']; + const VALID_DURATIONS = ['ns'=>1/1000000000,'u'=>1/1000000,'ms'=>1/1000,'s'=>1,'m'=>60,'h'=>60*60,'d'=>60*60*24,'w'=>60*60*24*7]; + const VALID_PROPERTIES = ['select'=>[],'where'=>[],'measurement'=>'','limitClause'=>'','offsetClause'=>'','groupBy'=>[],'groupByTime'=>'','orderBy'=>[],'retentionPolicy'=>'']; + + /** + * @var Database + */ + private $db; + + /** + * @var string[] + */ + private $select = []; + + /** + * @var string[] + */ + private $where = []; + + /** + * @var string + */ + private $retentionPolicy; + + /** + * @var string + */ + private $measurement; + + /** + * @var string + */ + private $limitClause = ''; + + /** + * @var string + */ + private $offsetClause = ''; + + /** + * @var array + */ + private $groupBy = []; + + private $groupByTime = ''; + + private $fieldKeys; + private $tagKeys; + + /** + * @var array + */ + private $orderBy = []; + + /** + * @param Database $db + */ + public function __construct(Database $db) + { + $this->db = $db; + } + + /** + * @param array $fields + * @param array $methods + * + * Example: ['field1', 'field2'], ['min', 'max']; + * + * @return $this + */ + public function select(array $fields, array $methods=[]):self + { + foreach($fields as $field) { + if($methods) { + foreach($methods as $method) { + $this->addFunctionField($field, $method); + } + } + else { + $this->select[$field] = $field; + } + } + return $this; + } + + public function reset(array $properties=[], bool $perserveRetention=true):self + { + if($properties) { + if($err = array_diff_key($properties, array_keys(self::VALID_PROPERTIES))) { + throw new \InvalidArgumentException(implode(', ', $err). ' are not valid properties'); + } + } + else { + $properties=self::VALID_PROPERTIES; + if(!$perserveRetention) { + unset($properties['retentionPolicy']); + } + $properties=array_keys($properties); + } + foreach($properties as $property) { + $this->$property=self::VALID_PROPERTIES[$property]; + } + if(in_array('measurement', $properties)) { + $this->fieldKeys=null; + $this->tagKeys=null; + } + return $this; + } + + /** + * @param string $measurement The measurement to select (required) + * @return $this + */ + public function from($measurement, bool $perserveRetention=true):self + { + $this->reset([], $perserveRetention); + $this->measurement = $measurement; + return $this; + } + + /** + * @param array $field + * + * Example: ['time > now()', 'time < now() -1d']; + * + * @return $this + */ + public function where(string $field, string $operator, $value):self + { + if(!in_array($operator, self::VALID_OPERATORS)) throw new \InvalidArgumentException("Invalid operator: $operator"); + switch($field) { + case 'time': + if($this->validateRFCDate($value)) { + $this->where[] = "time $operator '$value'"; + } + elseif($this->isLiteralTime($value)) { + $this->where[] = "time $operator $value"; + } + elseif($this->isDigit($value)) { + $this->where[] = "time $operator {$value}s"; + } + else throw new \InvalidArgumentException("'$value' is not a valided time"); + break; + case 'timeOffset': + if($value===0) $value=(string) $value; + if(!$this->isLiteralTime($value)) throw new \InvalidArgumentException("'$value' is not a valided literal time"); + $this->where[] = "time $operator now()".($value?" -$value":''); + break; + default: + $this->where[] = is_numeric($value) + ?"$field $operator $value" + :"$field $operator '$value'"; + } + return $this; + } + + private function addFunctionField(string $field, string $method):self + { + $method=strtoupper($method); + switch($method) { + case 'COUNT': + case 'MEDIAN': + case 'MEAN': + case 'SUM': + case 'FIRST': + case 'MIN': + case 'MAX': + case 'SPREAD': + case 'LAST': + $this->select[strtolower($method).($field==='*'?'':'_'.$field)] = "$method($field)"; + break; + default: throw new \InvalidArgumentException("Invalid method: $method"); + } + return $this; + } + + public function groupBy(string $field):self + { + $this->groupBy[] = $field; + + return $this; + } + + public function groupByTime(string $field):self + { + $this->groupByTime = $field; + + return $this; + } + + public function orderBy($field, $order = 'ASC'):self + { + $this->orderBy[] = "$field $order"; + + return $this; + } + + /** + * @param int $percentile Percentage to select (for example 95 for 95th percentile billing) + * + * @return $this + */ + public function percentile($percentile = 95):self + { + $this->select = sprintf('percentile(value, %d)', (int) $percentile); + + return $this; + } + + /** + * Limit the ResultSet to n records + * + * @param int $count + * + * @return $this + */ + public function limit($count):self + { + $this->limitClause = sprintf(' LIMIT %s', (int) $count); + + return $this; + } + + /** + * Offset the ResultSet to n records + * + * @param int $count + * + * @return $this + */ + public function offset($count):self + { + $this->offsetClause = sprintf(' OFFSET %s', (int) $count); + + return $this; + } + + public function subQuery($foo):self + { + $this->measurement = $foo; + throw new \Exception('QueryBuilder::subQuery() is not complete'); + return $this; + } + + /** + * Add retention policy to query + * + * @param string $rp + * + * @return $this + */ + public function retentionPolicy(string $rp):self + { + $this->retentionPolicy = $rp; + + return $this; + } + + /** + * Gets the result from the database (builds the query) + * + * @return ResultSet + * @throws \Exception + */ + public function getResultSet():ResultSet + { + $this->validate(); + + return $this->db->query($this->getQuery()); + } + + /** + * @return string + */ + public function getQuery():string + { + //Future. Add subqueries + + foreach($this->select as $alias=>$function) { + $fields[]=$alias === $function + ?$function + :"$function AS $alias"; + } + + $query=['SELECT', implode(', ', $fields)]; + $query[]=$this->retentionPolicy + ?"FROM $this->retentionPolicy.$this->measurement" + :"FROM $this->measurement"; + if($this->where) $query[]='WHERE '.implode(' AND ', $this->where); + $groupBy=$this->groupBy; + if($this->groupByTime) $groupBy[]="time($this->groupByTime)"; + if($groupBy) $query[]="GROUP BY ".implode(',',$groupBy); + if($this->orderBy) $query[]="ORDER BY ".implode(',',$this->orderBy); + if($this->limitClause) $query[] = $this->limitClause; + if($this->offsetClause) $query[]= $this->offsetClause; + + return implode(' ', $query); + } + + public function isValidQuery():bool + { + try { + $this->validate(); + } + catch(\InvalidArgumentException $e) { + return false; + } + return true; + } + + public function validate():self + { + $err=[]; + if (! $this->measurement) { + $err[] = 'Measurement is required'; + } + if (! $this->select) { + $err[] = 'At least one select field is required'; + } + if($err) throw new \InvalidArgumentException(implode(', ', $err)); + return $this; + } + + public function listFieldKeys():array + { + if(!$this->measurement) throw new \InvalidArgumentException('Measurement must be set'); + if(is_null($this->fieldKeys)) { + $this->fieldKeys = $this->db->listFieldKeys($this->measurement); + } + return $this->fieldKeys; + } + + public function listTagKeys():array + { + if(!$this->measurement) throw new \InvalidArgumentException('Measurement must be set'); + if(is_null($this->tagKeys)) { + $this->tagKeys = $this->db->listTagKeys($this->measurement); + } + return $this->tagKeys; + } + + public function getDatabase():Database + { + return $this->db; + } + + public function createDate(int ...$values):string + { + if(count($values)<3) throw new \InvalidArgumentException('Year, month, and day must be provided'); + $date=implode('-',array_slice($values, 0, 3)); + if(isset($values[3])) $date.' '.implode(':',array_slice($values, 3, 3)); + return (new \DateTime($date))->format(DATE_RFC3339); + } + + public function createDateFromTimestamp(int $timestamp):string + { + return (new \DateTime())->setTimestamp($timestamp)->format(DATE_RFC3339); + } + + private function getTimeDigit(string $time):?int + { + return ($digit=substr($time, 0, -1)) && $this->isDigit($digit)?(int)$digit:null; + } + private function getTimeUnit(string $time):?string + { + return ($unit=substr($time, -1)) && isset(self::VALID_DURATIONS[$unit])?$unit:null; + } + private function getUnitSeconds(string $unit):?int + { + return self::VALID_DURATIONS[$unit]??null; + } + private function isDigit($value):bool + { + return is_int($value) || ctype_digit($value); + } + private function getSeconds($time):?int{ + if($time==0) return 0; + return ($digit=$this->getTimeDigit($time)) && ($unit=$this->getTimeUnit($time))?$digit*self::VALID_DURATIONS[$unit]:null; + } + private function isLiteralTime(string $time):bool{ + return !is_null($this->getSeconds($time)); + } + private function validateRFCDate(string $date):bool + { + return \DateTime::createFromFormat(\DateTime::RFC3339, $date) !== FALSE; + } +} diff --git a/src/InfluxDB/ResultSet.php b/src/InfluxDB/ResultSet.php index 7d3be02..829d484 100644 --- a/src/InfluxDB/ResultSet.php +++ b/src/InfluxDB/ResultSet.php @@ -3,6 +3,8 @@ namespace InfluxDB; use InfluxDB\Client\Exception as ClientException; +use JsonMachine\JsonMachine; +use JsonMachine\StreamBytes; /** * Class ResultSet @@ -12,10 +14,11 @@ */ class ResultSet { + /** * @var array|mixed */ - protected $parsedResults = []; + private $parsedResults; /** * @var string @@ -23,35 +26,79 @@ class ResultSet protected $rawResults = ''; /** - * @param string $raw + * @var array|mixed + */ + private $parsedResultsMap; + + /** + * @var stream + */ + private $stream; + + /** + * @var int + */ + private $maxParseSize; + + /** + * @var int + */ + private $streamSize; + + const MAX_PARSE_SIZE = 1000000; + + /** + * @param stream $stream + */ + public function __construct($stream, ?int $maxParseSize=null) + { + $this->stream = $stream; + $this->maxParseSize = $maxParseSize??self::MAX_PARSE_SIZE; + } + + /** * @throws \InvalidArgumentException * @throws ClientException + * @return array $parsedResults */ - public function __construct($raw) + private function getParsedResults() { - $this->rawResults = $raw; - $this->parsedResults = json_decode((string)$raw, true); - - if (json_last_error() !== JSON_ERROR_NONE) { - throw new \InvalidArgumentException('Invalid JSON'); + if(is_null($this->parsedResults)) { + $this->rawResults=stream_get_contents($this->stream); + $this->parsedResults=json_decode($this->rawResults, true)['results']; + if (json_last_error() !== JSON_ERROR_NONE) { + throw new \InvalidArgumentException('Invalid JSON'); + } + $this->validate($this->parsedResults); } + return $this->parsedResults; + } - $this->validate(); + private function getParsedResultsMap() + { + if(is_null($this->parsedResultsMap)) { + $this->parsedResultsMap=[]; + foreach($this->getParsedResults() as $index=>$values) { + $this->parsedResultsMap[$values['time']]=$index; + } + } + return $this->parsedResultsMap; } /** + * @param array $parsedResults * @throws ClientException */ - protected function validate() + private function validate($parsedResults) { // There was an error in the query thrown by influxdb - if (isset($this->parsedResults['error'])) { - throw new ClientException($this->parsedResults['error']); + if (isset($parsedResults['error'])) { + throw new ClientException($parsedResults['error']); } // Check if there are errors in the first serie - if (isset($this->parsedResults['results'][0]['error'])) { - throw new ClientException($this->parsedResults['results'][0]['error']); + if (isset($parsedResults['results'][0]['error'])) { + throw new ClientException($parsedResults['results'][0]['error']); } } @@ -101,7 +148,7 @@ public function getPoints($metricName = '', array $tags = []) */ public function getSeries($queryIndex = 0) { - $results = $this->parsedResults['results']; + $results = $this->getParsedResults(); if ($queryIndex !== null && !array_key_exists($queryIndex, $results)) { throw new \InvalidArgumentException('Invalid statement index provided'); @@ -171,4 +218,67 @@ private function getPointsFromSerie(array $serie) return $points; } + + /** + * Used to obtain large result sets. + * @param int $measurement + * @throws JsonMachine\Exception\SyntaxError + * @yield array + */ + public function iterate(int $measurement=0) + { + if($this->getResponseSize() <= $this->maxParseSize) { + foreach($this->getPoints() as $point) { + yield $point; + } + } + else { + $json_pointer="/results/$measurement/series"; + foreach(JsonMachine::fromStream($this->stream, $json_pointer) as $series) { + foreach($series['values'] as $point) { + $point = array_combine($series['columns'], $point); + if (!empty($series['tags'])) { + $point += $series['tags']; + } + yield $point; + } + } + rewind($this->stream); + } + } + + public function getByTime(string $time, int $measurement=0):?array + { + if(\DateTime::createFromFormat(\DateTime::RFC3339, $time) === FALSE) { + throw new \InvalidArgumentException("'$time' is not a valid RFC3339 time"); + } + if($this->getResponseSize() <= $this->maxParseSize) { + $point = $this->getParsedResultsMap()[$time]??null; + } + else { + $json_pointer="/results/$measurement/series"; + $point=null; + foreach(JsonMachine::fromStream($this->stream, $json_pointer) as $series) { + foreach($series['values'] as $p) { + if($time===$p[0]) { + $p = array_combine($series['columns'], $p); + if (!empty($series['tags'])) { + $p += $series['tags']; + } + $point=$p; + break(2); + } + } + } + rewind($this->stream); + } + return $point; + } + + public function getResponseSize():int + { + //rewind($this->stream); + if(!$this->streamSize) $this->streamSize = fstat($this->stream)['size']; + return $this->streamSize; + } }