Skip to content

Commit

Permalink
lib/Requests: implement request_pool handling
Browse files Browse the repository at this point in the history
Implemented requests pool handling, as requests_multiple can't handle
pooling.
  • Loading branch information
Kis Attila Balint committed Jun 23, 2021
1 parent d82ea15 commit c88b9aa
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 0 deletions.
73 changes: 73 additions & 0 deletions library/Requests.php
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,79 @@ public static function request($url, $headers = array(), $data = array(), $type
return self::parse_response($response, $url, $headers, $data, $options);
}

public static function request_pool($requests, $options = array(), $pool_size = 2) {
$options = array_merge(self::get_default_options(true), $options);

if (!empty($options['hooks'])) {
$options['hooks']->register('transport.internal.parse_response', array('Requests', 'parse_multiple'));
if (!empty($options['complete'])) {
$options['hooks']->register('multiple.request.complete', $options['complete']);
}
}

foreach ($requests as $id => &$request) {
if (!isset($request['headers'])) {
$request['headers'] = array();
}
if (!isset($request['data'])) {
$request['data'] = array();
}
if (!isset($request['type'])) {
$request['type'] = self::GET;
}
if (!isset($request['options'])) {
$request['options'] = $options;
$request['options']['type'] = $request['type'];
}
else {
if (empty($request['options']['type'])) {
$request['options']['type'] = $request['type'];
}
$request['options'] = array_merge($options, $request['options']);
}

self::set_defaults($request['url'], $request['headers'], $request['data'], $request['type'], $request['options']);

// Ensure we only hook in once
if ($request['options']['hooks'] !== $options['hooks']) {
$request['options']['hooks']->register('transport.internal.parse_response', array('Requests', 'parse_multiple'));
if (!empty($request['options']['complete'])) {
$request['options']['hooks']->register('multiple.request.complete', $request['options']['complete']);
}
}
}
unset($request);

if (!empty($options['transport'])) {
$transport = $options['transport'];

if (is_string($options['transport'])) {
$transport = new $transport();
}
}
else {
$transport = self::get_transport();
}

if (get_class($transport) !== 'Requests_Transport_cURL') {
return array();
}

$responses = $transport->request_pool($requests, $options, $pool_size);

foreach ($responses as $id => &$response) {
// If our hook got messed with somehow, ensure we end up with the
// correct response
if (is_string($response)) {
$request = $requests[$id];
self::parse_multiple($response, $request);
$request['options']['hooks']->dispatch('multiple.request.complete', array(&$response, $id));
}
}

return $responses;
}

/**
* Send multiple HTTP requests simultaneously
*
Expand Down
128 changes: 128 additions & 0 deletions library/Requests/Transport/cURL.php
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,134 @@ public function request($url, $headers = array(), $data = array(), $options = ar
return $this->headers;
}


/**
* Send multiple requests simultaneously with a maximal concurrent conncetion (pool)
*
* @param array $requests Request data
* @param array $options Global options
* @param int $pool_size Maximal simultaneously connections
* @return array Array of Requests_Response objects (may contain Requests_Exception or string responses as well)
*/
public function request_pool($requests, $options, $pool_size = 2) {
if (empty($requests)) {
return array();
}

$pool = array();
$queue = array();
$responses = array();

$main_curl_executor_pool = curl_multi_init();

foreach ($requests as $id => $request) {
$queue[] = array(
$id,
$request,
);
}

$queue = array_reverse($queue);

$request['options']['hooks']->dispatch('curl.before_multi_exec', array(&$multihandle));

while (!empty($pool) || !empty($queue)) {
curl_multi_exec($main_curl_executor_pool, $active);
$done = curl_multi_info_read($main_curl_executor_pool);

if ($done !== false) {
$pool_key = (int) $done['handle'];
$pool_element = $pool[$pool_key];
unset($pool[$pool_key]);

$response = $this->handleCurlResponse($done, $pool_element);
$responses[$pool_element['id']] = $response;
curl_multi_remove_handle($main_curl_executor_pool, $done['handle']);
curl_close($done['handle']);

$pool_element['request']['options']['hooks']->dispatch(
'multiple.request.complete',
array(
&$responses[$pool_element['id']],
$pool_element['id'],
)
);
}

if (count($pool) >= $pool_size || empty($queue)) {
sleep(0.5);

continue;
}

list($id, $request) = array_pop($queue);
list($subhandle, $subrequest) = $this->addNewSubrequestHandle($request);

$pool[(int) $subhandle] = array(
'id' => $id,
'request' => $request,
'subhandle' => $subhandle,
'subrequest' => $subrequest,
);

curl_multi_add_handle($main_curl_executor_pool, $subhandle);
}

$request['options']['hooks']->dispatch('curl.after_multi_exec', array(&$multihandle));
curl_multi_close($main_curl_executor_pool);

return $responses;
}

private function addNewSubrequestHandle($request) {
$class = get_class($this);
$subrequest = new $class();
$subhandle = $subrequest->get_subrequest_handle(
$request['url'],
$request['headers'],
$request['data'],
$request['options']
);
$request['options']['hooks']->dispatch('curl.before_multi_add', array(&$subhandle));

return array($subhandle, $subrequest);
}

private function handleCurlResponse(array $done, $pool_element) {

if ($done['result'] === CURLE_OK) {
$parsed_response = $pool_element['subrequest']->process_response(
$pool_element['subrequest']->response_data,
$pool_element['request']['options']
);

$pool_element['request']['options']['hooks']->dispatch(
'transport.internal.parse_response',
array(
&$parsed_response,
$pool_element['request'],
)
);

return $parsed_response;
}

$reason = curl_error($done['handle']);
$exception = new Requests_Exception_Transport_cURL(
$reason,
Requests_Exception_Transport_cURL::EASY,
$done['handle'],
$done['result']
);

$pool_element['request']['options']['hooks']->dispatch(
'transport.internal.parse_error',
array(&$exception, $pool_element['request'])
);

return $exception;
}

/**
* Send multiple requests simultaneously
*
Expand Down

0 comments on commit c88b9aa

Please sign in to comment.