Skip to content

Commit

Permalink
Batch generate composites (#128)
Browse files Browse the repository at this point in the history
* Add config and batching to tables

* Add batch support to search and views

* Tests

* Bump versions; remove 5.4 support

* PR comments

* Code review comments
  • Loading branch information
rsinger authored Sep 4, 2018
1 parent 5536177 commit 77abf3e
Show file tree
Hide file tree
Showing 13 changed files with 686 additions and 338 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ php:
- 7.0
- 5.6
- 5.5
- 5.4
services:
- redis
- docker
Expand Down
8 changes: 4 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
}
],
"require": {
"php" : ">=5.4",
"php" : ">=5.5",
"semsol/arc2": "v2.2.4",
"chrisboulton/php-resque": "dev-master#98fde571db008a8b48e73022599d1d1c07d4a7b5",
"monolog/monolog" : "~1.13",
"mongodb/mongodb": "1.0.4"
},
"require-dev": {
"phpunit/phpunit": "4.1.*",
"phpunit/phpunit": "4.8.",
"squizlabs/php_codesniffer": "3.2.*"
},
},
"autoload": {
"classmap": ["src/"]
}
}
}
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mongo:
image: mongo/2.6.12:latest
image: rossfsinger/mongo-2.6.12:latest
ports:
- "27017:27017"
246 changes: 122 additions & 124 deletions src/mongo/Config.class.php

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/mongo/Driver.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class Driver extends DriverBase implements \Tripod\IDriver
* <li>readPreference: The Read preference to set for Mongo: Default is ReadPreference::RP_PRIMARY_PREFERRED</li>
* <li>retriesToGetLock: Retries to do when unable to get lock on a document, default is 20</li></ul>
*/
public function __construct($podName, $storeName, $opts=array())
public function __construct($podName, $storeName, $opts = [])
{

$opts = array_merge(array(
Expand Down
8 changes: 8 additions & 0 deletions src/mongo/IConfigInstance.php
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@ public function getCollectionForJobGroups($storeName, $readPreference = ReadPref
*/
public function getTransactionLogDatabase($readPreference = ReadPreference::RP_PRIMARY_PREFERRED);

/**
* Return the maximum batch size for async operations
*
* @param string $operation Async operation, e.g. OP_TABLES, OP_VIEWS
* @return integer
*/
public function getBatchSize($operation);

/**
* @return string
*/
Expand Down
153 changes: 85 additions & 68 deletions src/mongo/base/CompositeBase.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Tripod\Mongo\Composites;

use \Tripod\Mongo\JobGroup;

/**
* Class CompositeBase
* @package Tripod\Mongo\Composites
Expand All @@ -12,111 +14,103 @@ abstract class CompositeBase extends \Tripod\Mongo\DriverBase implements \Tripod
* @var \Tripod\Mongo\Jobs\ApplyOperation
*/
protected $applyOperation;

/**
* Returns an array of ImpactedSubjects based on the subjects and predicates of change
* @param array $subjectsAndPredicatesOfChange
* @param string $contextAlias
* @return \Tripod\Mongo\ImpactedSubject[]
*/
public function getImpactedSubjects(Array $subjectsAndPredicatesOfChange,$contextAlias)
public function getImpactedSubjects(array $subjectsAndPredicatesOfChange, $contextAlias)
{
$candidates = array();
$filter = array();
$subjectsToAlias = array();
foreach(array_keys($subjectsAndPredicatesOfChange) as $s){
$candidates = [];
$filter = [];
$subjectsToAlias = [];
foreach (array_keys($subjectsAndPredicatesOfChange) as $s) {
$resourceAlias = $this->labeller->uri_to_alias($s);
$subjectsToAlias[$s] = $resourceAlias;
// build $filter for queries to impact index
$filter[] = array(_ID_RESOURCE=>$resourceAlias,_ID_CONTEXT=>$contextAlias);
$filter[] = [_ID_RESOURCE=>$resourceAlias, _ID_CONTEXT=>$contextAlias];
}
$query = array(_ID_KEY=>array('$in'=>$filter));
$docs = $this->getCollection()->find($query, array(
'projection' => array(_ID_KEY=>true, 'rdf:type'=>true)
));
$query = [_ID_KEY => ['$in' => $filter]];
$docs = $this->getCollection()->find(
$query,
['projection' => [_ID_KEY => true, 'rdf:type' => true]]
);

$types = $this->getTypesInSpecifications();

if($this->getCollection()->count($query) !== 0 ) {
foreach($docs as $doc)
{
if ($this->getCollection()->count($query) !== 0) {
foreach ($docs as $doc) {
$docResource = $doc[_ID_KEY][_ID_RESOURCE];
$docContext = $doc[_ID_KEY][_ID_CONTEXT];
$docHash = md5($docResource.$docContext);

$docTypes = array();
if(isset($doc["rdf:type"])) {
if(isset($doc["rdf:type"][VALUE_URI])){
$docTypes[] = $doc["rdf:type"][VALUE_URI];
$docTypes = [];
if (isset($doc['rdf:type'])) {
if (isset($doc['rdf:type'][VALUE_URI])) {
$docTypes[] = $doc['rdf:type'][VALUE_URI];
} else {
foreach($doc["rdf:type"] as $t){
if(isset($t[VALUE_URI]))
{
foreach ($doc['rdf:type'] as $t) {
if (isset($t[VALUE_URI])) {
$docTypes[] = $t[VALUE_URI];
}
}
}
}

$currentSubjectProperties = array();
if(isset($subjectsAndPredicatesOfChange[$docResource]))
{
$currentSubjectProperties = [];
if (isset($subjectsAndPredicatesOfChange[$docResource])) {
$currentSubjectProperties = $subjectsAndPredicatesOfChange[$docResource];
}
elseif(isset($subjectsToAlias[$docResource]) &&
isset($subjectsAndPredicatesOfChange[$subjectsToAlias[$docResource]]))
{
} elseif (isset($subjectsToAlias[$docResource]) &&
isset($subjectsAndPredicatesOfChange[$subjectsToAlias[$docResource]])) {
$currentSubjectProperties = $subjectsAndPredicatesOfChange[$subjectsToAlias[$docResource]];
}
foreach($docTypes as $type)
{
if($this->checkIfTypeShouldTriggerOperation($type, $types, $currentSubjectProperties)) {
if(!array_key_exists($this->getPodName(), $candidates))
{
$candidates[$this->getPodName()] = array();
foreach ($docTypes as $type) {
if ($this->checkIfTypeShouldTriggerOperation($type, $types, $currentSubjectProperties)) {
if (!array_key_exists($this->getPodName(), $candidates)) {
$candidates[$this->getPodName()] = [];
}
if(!array_key_exists($docHash, $candidates[$this->getPodName()])){
$candidates[$this->getPodName()][$docHash] = array('id'=>$doc[_ID_KEY]);
if (!array_key_exists($docHash, $candidates[$this->getPodName()])) {
$candidates[$this->getPodName()][$docHash] = ['id'=>$doc[_ID_KEY]];
}
}
}
}
}

// add to this any composites
foreach($this->findImpactedComposites($subjectsAndPredicatesOfChange, $contextAlias) as $doc) {
foreach ($this->findImpactedComposites($subjectsAndPredicatesOfChange, $contextAlias) as $doc) {
$spec = $this->getSpecification($this->storeName, $doc[_ID_KEY]['type']);
if(is_array($spec) && array_key_exists('from', $spec)){
if(!array_key_exists($spec['from'], $candidates))
{
$candidates[$spec['from']] = array();
if (is_array($spec) && array_key_exists('from', $spec)) {
if (!array_key_exists($spec['from'], $candidates)) {
$candidates[$spec['from']] = [];
}
$docHash = md5($doc[_ID_KEY][_ID_RESOURCE] . $doc[_ID_KEY][_ID_CONTEXT]);

if(!array_key_exists($docHash, $candidates[$spec['from']])){
$candidates[$spec['from']][$docHash] = array(
'id'=>array(
if (!array_key_exists($docHash, $candidates[$spec['from']])) {
$candidates[$spec['from']][$docHash] = [
'id' => [
_ID_RESOURCE=>$doc[_ID_KEY][_ID_RESOURCE],
_ID_CONTEXT=>$doc[_ID_KEY][_ID_CONTEXT],
)
);
]
];
}
if(!array_key_exists('specTypes', $candidates[$spec['from']][$docHash])) {
$candidates[$spec['from']][$docHash]['specTypes'] = array();
if (!array_key_exists('specTypes', $candidates[$spec['from']][$docHash])) {
$candidates[$spec['from']][$docHash]['specTypes'] = [];
}
// Save the specification type so we only have to regen resources in that table type
if(!in_array($doc[_ID_KEY][_ID_TYPE], $candidates[$spec['from']][$docHash]['specTypes']))
{
if (!in_array($doc[_ID_KEY][_ID_TYPE], $candidates[$spec['from']][$docHash]['specTypes'])) {
$candidates[$spec['from']][$docHash]['specTypes'][] = $doc[_ID_KEY][_ID_TYPE];
}
}
}

// convert operations to subjects
$impactedSubjects = array();
foreach(array_keys($candidates) as $podName){
foreach($candidates[$podName] as $candidate)
{
$specTypes = (isset($candidate['specTypes']) ? $candidate['specTypes'] : array());
$impactedSubjects = [];
foreach (array_keys($candidates) as $podName) {
foreach ($candidates[$podName] as $candidate) {
$specTypes = (isset($candidate['specTypes']) ? $candidate['specTypes'] : []);
$impactedSubjects[] = new \Tripod\Mongo\ImpactedSubject($candidate['id'], $this->getOperationType(), $this->getStoreName(), $podName, $specTypes);
}
}
Expand All @@ -128,22 +122,22 @@ public function getImpactedSubjects(Array $subjectsAndPredicatesOfChange,$contex
* Returns an array of the rdf types that will trigger the specification
* @return array
*/
public abstract function getTypesInSpecifications();
abstract public function getTypesInSpecifications();

/**
* @param array $resourcesAndPredicates
* @param string $contextAlias
* @return mixed // @todo: This may eventually return a either a Cursor or array
*/
public abstract function findImpactedComposites(Array $resourcesAndPredicates,$contextAlias);
abstract public function findImpactedComposites(array $resourcesAndPredicates, $contextAlias);

/**
* Returns the specification config
* @param string $storeName
* @param string $specId The specification id
* @return array|null
*/
public abstract function getSpecification($storeName, $specId);
abstract public function getSpecification($storeName, $specId);

/**
* Test if the a particular type appears in the array of types associated with a particular spec and that the changeset
Expand All @@ -153,20 +147,20 @@ public abstract function getSpecification($storeName, $specId);
* @param array $subjectPredicates
* @return bool
*/
protected function checkIfTypeShouldTriggerOperation($rdfType, array $validTypes, Array $subjectPredicates)
protected function checkIfTypeShouldTriggerOperation($rdfType, array $validTypes, array $subjectPredicates)
{
// We don't know if this is an alias or a fqURI, nor what is in the valid types, necessarily
$types = array($rdfType);
try
{
$types = [$rdfType];
try {
$types[] = $this->labeller->qname_to_uri($rdfType);
} catch (\Tripod\Exceptions\LabellerException $e) {
// Not a qname, apparently
}
catch(\Tripod\Exceptions\LabellerException $e) {}
try
{
try {
$types[] = $this->labeller->uri_to_alias($rdfType);
} catch (\Tripod\Exceptions\LabellerException $e) {
// Not a declared uri, apparently
}
catch(\Tripod\Exceptions\LabellerException $e) {}

$intersectingTypes = array_unique(array_intersect($types, $validTypes));
// If views have a matching type *at all*, the operation is triggered
Expand All @@ -175,15 +169,38 @@ protected function checkIfTypeShouldTriggerOperation($rdfType, array $validTypes

/**
* For mocking
*
*
* @return \Tripod\Mongo\Jobs\ApplyOperation
*/
protected function getApplyOperation()
{
if(!isset($this->applyOperation))
{
if (!isset($this->applyOperation)) {
$this->applyOperation = new \Tripod\Mongo\Jobs\ApplyOperation();
}
return $this->applyOperation;
}

/**
* Queues a batch of ImpactedSubjects in a single ApplyOperation job
*
* @param \Tripod\Mongo\ImpactedSubject[] $subjects Array of ImpactedSubjects
* @param string $queueName Queue name
* @param array $jobOptions Job options
* @return void
*/
protected function queueApplyJob(array $subjects, $queueName, array $jobOptions)
{
$this->getApplyOperation()->createJob($subjects, $queueName, $jobOptions);
}

/**
* For mocking
*
* @param string $storeName
* @return JobGroup
*/
protected function getJobGroup($storeName)
{
return new JobGroup($storeName);
}
}
Loading

0 comments on commit 77abf3e

Please sign in to comment.