Skip to content

Commit

Permalink
fixing up the library, adding tox tests, adding travis config
Browse files Browse the repository at this point in the history
  • Loading branch information
jpetrucciani committed Jul 22, 2019
1 parent fb2e87b commit 0ccc430
Show file tree
Hide file tree
Showing 17 changed files with 995 additions and 47 deletions.
22 changes: 14 additions & 8 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
dist: xenial
sudo: false
language: python
python:
- 3.5
- 3.6
- 3.7
matrix:
include:
- python: 3.5
env: TOXENV=py35
- python: 3.6
env: TOXENV=py36
- python: 3.7
env: TOXENV=py37
before_install:
- sudo rm -f /etc/boto.cfg
install:
- pip install -r requirements.txt
- pip install -r requirements-dev.txt
script:
- make
- pip install tox pipenv
- pipenv install
script: tox
deploy:
provider: pypi
user: jacobi
Expand Down
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2019 Jacobi Petrucciani

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
11 changes: 11 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[[source]]
url = "https://pypi.python.org/simple"
verify_ssl = true

[packages]
boto3 = "*"

[dev-packages]
tox = "*"
moto = "*"
pytest = "*"
603 changes: 603 additions & 0 deletions Pipfile.lock

Large diffs are not rendered by default.

75 changes: 66 additions & 9 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,39 @@
qoo: a simple SQS client
========================

qoo
========
.. image:: https://travis-ci.org/jpetrucciani/qoo.svg?branch=master
:target: https://travis-ci.org/jpetrucciani/qoo

Quick start
-----------

.. image:: https://badge.fury.io/py/qoo.svg
:target: https://badge.fury.io/py/qoo
:alt: PyPI version


.. image:: https://img.shields.io/badge/code%20style-black-000000.svg
:target: https://github.com/ambv/black
:alt: Code style: black


.. image:: https://img.shields.io/badge/python-3.5+-blue.svg
:target: https://www.python.org/downloads/release/python-350/
:alt: Python 3.5+ supported


**qoo** is a very simple Amazon SQS client, written in Python. It
aims to be much more straight-forward to use than boto3, and specializes
only in Amazon SQS, ignoring the rest of the AWS ecosystem.


Features
--------

- Easier interaction with SQS queues
- Automatic support for ``AWS_ACCESS_KEY_ID``, ``AWS_SECRET_ACCESS_KEY``, and ``AWS_DEFAULT_REGION`` environment variables.
- automatic useful message/job metadata

Usage
-----

Installation
^^^^^^^^^^^^
Expand All @@ -17,17 +47,44 @@ Basic Usage

.. code-block:: python
pass
import qoo
# list bucket names
qoo.list()
# get an existing bucket
queue = qoo.get("$QUEUE_NAME")
# or create a queue
queue = qoo.create("$QUEUE_NAME")
# send a job, pass info/keys as kwargs
queue.send(info="foo", user_id="test_user") # etc.
# get an approximate count of messages in the queue
len(queue) # approximate total messages
queue.approx_not_visible # approximate number of message in the visibility timeout
# get a job
job = queue.receive(wait_time=1)
job.elapsed # time between sending the job and receiving it
job.md5_matches # boolean property to show that the md5 of the job matches what was sent
# and the data from the job is automatically converted into attrs
job.info # the string "foo"
job.user_id # the string "test_user"
# delete the job from the SQS queue
del job
job.delete()
Testing
-------

I'm currently working on rewriting many of the tests with `pytest <https://docs.pytest.org/en/latest/>`_\ .
Tests can be run with tox!

.. code-block:: bash
# run tests
make
# or
make test_all
tox
73 changes: 73 additions & 0 deletions qoo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,76 @@
"""
qoo module
"""
import boto3
import os
from qoo.errors import FailedToCreateQueue
from qoo.queues import Job, Queue # noqa
from typing import List


AWS_DEFAULT_REGION = "us-east-1"


def _client(region: str = ""):
"""returns a boto3 sqs client"""
region_name = region or os.environ.get("AWS_DEFAULT_REGION", AWS_DEFAULT_REGION)
return boto3.client("sqs", region_name=region_name)


def login(
access_key_id: str, secret_access_key: str, region: str = AWS_DEFAULT_REGION
) -> None:
"""sets environment variables for boto3."""
os.environ["AWS_ACCESS_KEY_ID"] = access_key_id
os.environ["AWS_SECRET_ACCESS_KEY"] = secret_access_key
os.environ["AWS_DEFAULT_REGION"] = region


def get(queue_name: str, **kwargs) -> Queue:
"""gets a qoo Queue object by SQS queue_name"""
return Queue(queue_name, **kwargs)


def list(region: str = "", verbose: bool = False) -> List[str]:
"""
list all queues in the default or given region
:note: this will only list up to 1000 queue names
"""
sqs_client = _client(region=region)
return [
(x if verbose else x.split("/")[-1])
for x in sqs_client.list_queues()["QueueUrls"]
]


def create(
queue_name: str,
region: str = "",
delay_seconds: int = 0,
maximum_message_size: int = 262144,
message_retention_period: int = 345600,
visibility_timeout: int = 30,
fifo: bool = False,
receive_message_wait_time_seconds: int = 0,
**additional_attributes
) -> Queue:
"""
attempt to create an SQS queue and return it
:note: most of the common params are here, but you can pass additional_attributes if needed
"""
sqs_client = _client(region=region)
new_queue_url = sqs_client.create_queue(
QueueName=queue_name,
Attributes=dict(
DelaySeconds=str(delay_seconds),
MaximumMessageSize=str(maximum_message_size),
MessageRetentionPeriod=str(message_retention_period),
ReceiveMessageWaitTimeSeconds=str(receive_message_wait_time_seconds),
VisibilityTimeout=str(visibility_timeout),
FifoQueue=str(fifo).lower(),
**additional_attributes
),
)
if not new_queue_url:
raise FailedToCreateQueue()
return get(new_queue_url["QueueUrl"].split("/")[-1])
4 changes: 4 additions & 0 deletions qoo/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ def __init__(self, *args, **kwargs): # type: ignore

class InvalidCredentials(QooException):
"""invalid credentials were passed for a queue object"""


class FailedToCreateQueue(QooException):
"""attempting to create a queue has failed"""
6 changes: 0 additions & 6 deletions qoo/globals.py

This file was deleted.

60 changes: 48 additions & 12 deletions qoo/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@
import time
import hashlib
from qoo.utils import jsond, jsonl
from typing import List, Optional


class Job(object):
"""a single unit of work"""

def __init__(self, sqs_message: dict) -> None:
def __init__(self, sqs_message: dict, queue: "Queue") -> None:
"""job constructor"""
self._queue = queue
self._data = sqs_message
self._md5 = self._data["MD5OfBody"]
self._id = self._data["MessageId"]
self._body = jsonl(self._data["Body"])
self._received_at = int(time.time())
self._received_at = float(time.time())
self.elapsed = time.time() - self._body["created_at"]
for key in self._body:
setattr(self, key, self._body[key])
self.handle = self._data["ReceiptHandle"]
Expand All @@ -30,6 +33,14 @@ def __repr__(self) -> str:
"""repr"""
return self.__str__()

def __del__(self) -> dict:
"""del keyword for the job"""
return self.delete()

def delete(self) -> dict:
"""delete this object"""
return self._queue.delete_job(self.handle)

@property
def md5_matches(self) -> bool:
"""verify contents of the message"""
Expand Down Expand Up @@ -82,14 +93,22 @@ def __repr__(self) -> str:
"""repr"""
return self.__str__()

def __len__(self) -> int:
"""
attempt to get how many messages are in the queue
:note: this is an approximate
"""
self._update_attributes()
return self.approx_messages

def _update_attributes(self) -> None:
"""pull the latest attributes and parse them into class attributes"""
self._attributes = self._client.get_queue_attributes(
QueueUrl=self._queue_url, AttributeNames=["All"]
)["Attributes"]
self.arn = self._attributes["QueueArn"]
self.created_at = int(self._attributes["CreatedTimestamp"])
self.updated_at = int(self._attributes["LastModifiedTimestamp"])
self.created_at = float(self._attributes["CreatedTimestamp"])
self.updated_at = float(self._attributes["LastModifiedTimestamp"])
self.visibility_timeout = int(self._attributes["VisibilityTimeout"])
self.delay_seconds = int(self._attributes["DelaySeconds"])
self.maximum_message_size = int(self._attributes["MaximumMessageSize"])
Expand All @@ -104,26 +123,43 @@ def _update_attributes(self) -> None:
self.approx_delayed = int(
self._attributes["ApproximateNumberOfMessagesDelayed"]
)
self.fifo = "FifoQueue" in self._attributes

def send(self, **attributes) -> str:
"""shorthand for send_job"""
return self.send_job(**attributes)

def send_job(self, **attributes) -> str:
"""using the kwarg attributes, send a job to this queue."""
"""
using the kwarg attributes, send a job to this queue.
pass job attributes to set the message body
"""
attributes.update({"created_at": int(time.time())})
response = self._client.send_message(
MessageBody=jsond(attributes), QueueUrl=self._queue_url
)
return response["MessageId"]

def receive_jobs(self) -> list:
def receive_jobs(
self, max_messages: int = None, wait_time: int = None
) -> List[Job]:
"""receive a list of jobs from the queue"""
jobs = self._client.receive_message(
QueueUrl=self._queue_url,
MaxNumberOfMessages=self._max_messages,
WaitTimeSeconds=self._wait_time,
MaxNumberOfMessages=max_messages if max_messages else self._max_messages,
WaitTimeSeconds=wait_time if wait_time else self._wait_time,
)
if "Messages" not in jobs:
return []
return [Job(x) for x in jobs["Messages"]]
return [Job(x, self) for x in jobs["Messages"]]

def receive(self, wait_time: int = None) -> Optional[Job]:
"""receive a single job from the queue"""
jobs = self.receive_jobs(max_messages=1, wait_time=wait_time)
return jobs[0] if jobs else None

def delete_job(self, handle: str) -> None:
"""delete a job"""
self._client.delete_message(QueueUrl=self._queue_url, ReceiptHandle=handle)
def delete_job(self, handle: str) -> dict:
"""delete a job by the message handle"""
return self._client.delete_message(
QueueUrl=self._queue_url, ReceiptHandle=handle
)
7 changes: 3 additions & 4 deletions qoo/workers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
worker class
worker class - not yet used
"""
from time import sleep

Expand Down Expand Up @@ -27,6 +27,5 @@ def work(self, burst=False, wait_seconds=5):
job.run()
queue.remove_job(job)

if burst:
break
sleep(wait_seconds)
if not burst:
sleep(wait_seconds)
2 changes: 0 additions & 2 deletions requirements-dev.txt

This file was deleted.

1 change: 0 additions & 1 deletion requirements.txt

This file was deleted.

Loading

0 comments on commit 0ccc430

Please sign in to comment.