Skip to content

Commit

Permalink
[PBE-1321] support sync channel & segment res objects
Browse files Browse the repository at this point in the history
  • Loading branch information
kanat committed Feb 14, 2024
1 parent 793340c commit 10fb316
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 80 deletions.
6 changes: 3 additions & 3 deletions stream_chat/async_chat/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from urllib.parse import urlparse

from stream_chat.types.campaign import CampaignData, QueryCampaignsOptions
from stream_chat.types.segment import SegmentType, SegmentData, QuerySegmentsOptions, UpdateSegmentData
from stream_chat.types.segment import SegmentType, SegmentData, QuerySegmentsOptions

if sys.version_info >= (3, 8):
from typing import Literal
Expand Down Expand Up @@ -547,7 +547,7 @@ async def query_segments(self, filter_conditions: Dict, options: QuerySegmentsOp
payload = {"filter": filter_conditions, **options}
return await self.post("segments/query", params=payload)

async def update_segment(self, segment_id: str, data: UpdateSegmentData) -> StreamResponse:
async def update_segment(self, segment_id: str, data: SegmentData) -> StreamResponse:
return await self.put(f"segments/{segment_id}", data=data)

async def delete_segment(self, segment_id: str) -> StreamResponse:
Expand All @@ -567,7 +567,7 @@ async def delete_campaign(self, campaign_id: str, **options: Any) -> StreamRespo
return await self.delete(f"campaigns/{campaign_id}", params=options)

async def start_campaign(
self, campaign_id: str, scheduled_for: Optional[datetime.datetime] = None
self, campaign_id: str, scheduled_for: Optional[Union[str, datetime.datetime]] = None
) -> StreamResponse:
return await self.patch(
f"campaigns/{campaign_id}/start", data={"scheduled_for": scheduled_for}
Expand Down
5 changes: 3 additions & 2 deletions stream_chat/base/campaign.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import datetime
from typing import Awaitable, Dict, List, Union, Optional

from stream_chat.base.client import StreamChatInterface
Expand Down Expand Up @@ -26,15 +27,15 @@ def get(self) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
def update(self) -> Union[StreamResponse, Awaitable[StreamResponse]]:
def update(self, data: CampaignData) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
def delete(self) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
def start(self) -> Union[StreamResponse, Awaitable[StreamResponse]]:
def start(self, scheduled_for: Optional[Union[str, datetime.datetime]] = None) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
Expand Down
55 changes: 41 additions & 14 deletions stream_chat/base/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Any, Awaitable, Dict, Iterable, List, TypeVar, Union, Optional

from stream_chat.types.campaign import CampaignData, QueryCampaignsOptions
from stream_chat.types.segment import SegmentType, SegmentData, QuerySegmentsOptions, UpdateSegmentData
from stream_chat.types.segment import SegmentType, SegmentData, QuerySegmentsOptions

if sys.version_info >= (3, 8):
from typing import Literal
Expand Down Expand Up @@ -924,7 +924,7 @@ def list_roles(self) -> Union[StreamResponse, Awaitable[StreamResponse]]:
"""
pass

def segment(self, segment_type: SegmentType, segment_id: str, segment_name: str) -> TSegment:
def segment(self, segment_type: SegmentType, data: Optional[SegmentData]) -> TSegment:
"""
Creates a channel object
:param segment_type: the segment type
Expand All @@ -937,13 +937,22 @@ def segment(self, segment_type: SegmentType, segment_id: str, segment_name: str)

@abc.abstractmethod
def create_segment(
self, segment_type: SegmentType, segment_id: str, segment_name: str, data: SegmentData
self, segment_type: SegmentType, segment_id: Optional[str], data: Optional[SegmentData] = None
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
"""
Create a segment
"""
pass

@abc.abstractmethod
def get_segment(
self, segment_id: str
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
"""
Query segments
"""
pass

@abc.abstractmethod
def query_segments(
self, filter_conditions: Dict, options: QuerySegmentsOptions
Expand All @@ -955,7 +964,7 @@ def query_segments(

@abc.abstractmethod
def update_segment(
self, segment_id: str, data: UpdateSegmentData
self, segment_id: str, data: SegmentData
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
"""
Update a segment by id
Expand All @@ -971,6 +980,33 @@ def delete_segment(
"""
pass

@abc.abstractmethod
def segment_target_exists(
self, segment_id: str, target_id: str
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
"""
Check if a target exists in a segment
"""
pass

@abc.abstractmethod
def add_segment_targets(
self, segment_id: str, target_ids: List[str]
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
"""
Add targets to a segment
"""
pass

@abc.abstractmethod
def delete_segment_targets(
self, segment_id: str, target_ids: List[str]
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
"""
Delete targets from a segment
"""
pass

def campaign(self, campaign_id: Optional[str]):
"""
Creates a channel object
Expand Down Expand Up @@ -1028,7 +1064,7 @@ def delete_campaign(

@abc.abstractmethod
def start_campaign(
self, campaign_id: str, scheduled_for: Optional[datetime.datetime] = None
self, campaign_id: str, scheduled_for: Optional[Union[str, datetime.datetime]] = None
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
"""
Schedule a campaign at given time
Expand All @@ -1044,15 +1080,6 @@ def stop_campaign(
"""
pass

@abc.abstractmethod
def resume_campaign(
self, campaign_id: str
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
"""
Resume a stopped campaign
"""
pass

@abc.abstractmethod
def test_campaign(
self, campaign_id: str, users: Iterable[str]
Expand Down
20 changes: 15 additions & 5 deletions stream_chat/base/segment.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import abc
from typing import Optional, Awaitable, Union
from typing import Optional, Awaitable, Union, List

from stream_chat.base.client import StreamChatInterface
from stream_chat.types.segment import SegmentType, SegmentData, UpdateSegmentData
from stream_chat.types.segment import SegmentType, SegmentData
from stream_chat.types.stream_response import StreamResponse


Expand All @@ -12,12 +12,10 @@ def __init__(
client: StreamChatInterface,
segment_type: SegmentType,
segment_id: Optional[str] = None,
segment_name: Optional[str] = None,
data: Optional[SegmentData] = None,
):
self.segment_type = segment_type
self.segment_id = segment_id
self.segment_name = segment_name
self.client = client
self.data = data

Expand All @@ -30,9 +28,21 @@ def get(self) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
def update(self, data: UpdateSegmentData) -> Union[StreamResponse, Awaitable[StreamResponse]]:
def update(self, data: SegmentData) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
def delete(self) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
def target_exists(self, target_id: str) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
def add_targets(self, target_ids: List[str]) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass

@abc.abstractmethod
def delete_targets(self, target_ids: List[str]) -> Union[StreamResponse, Awaitable[StreamResponse]]:
pass
22 changes: 11 additions & 11 deletions stream_chat/campaign.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import datetime
from typing import Any, Optional, Union

from stream_chat.base.campaign import CampaignInterface
from stream_chat.types.campaign import CampaignData
from stream_chat.types.stream_response import StreamResponse


Expand All @@ -14,18 +18,14 @@ def create(self) -> StreamResponse:
def get(self) -> StreamResponse:
return self.client.get_campaign(campaign_id=self.campaign_id)

def update(self, name: str = None, description: str = None) -> StreamResponse:
payload = {"name": name, "description": description}
return self.client.put(f"campaigns/{self.campaign_id}", data=payload)
def update(self, data: CampaignData) -> StreamResponse:
return self.client.update_campaign(campaign_id=self.campaign_id, data=data)

def delete(self) -> StreamResponse:
return self.client.delete(f"campaigns/{self.campaign_id}")
def delete(self, **options: Any) -> StreamResponse:
return self.client.delete_campaign(campaign_id=self.campaign_id, options=options)

def start(self) -> StreamResponse:
return self.client.get(f"campaigns/{self.campaign_id}")
def start(self, scheduled_for: Optional[Union[str, datetime.datetime]] = None) -> StreamResponse:
return self.client.start_campaign(campaign_id=self.campaign_id, scheduled_for=scheduled_for)

def stop(self) -> StreamResponse:
return self.client.get(f"campaigns/{self.campaign_id}")

def query(self) -> StreamResponse:
return self.client.get(f"campaigns/{self.campaign_id}")
return self.client.stop_campaign(campaign_id=self.campaign_id)
59 changes: 40 additions & 19 deletions stream_chat/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from stream_chat.campaign import Campaign
from stream_chat.segment import Segment
from stream_chat.types.campaign import QueryCampaignsOptions, CampaignData
from stream_chat.types.segment import SegmentType, QuerySegmentsOptions, UpdateSegmentData, SegmentData
from stream_chat.types.segment import SegmentType, QuerySegmentsOptions, SegmentData

if sys.version_info >= (3, 8):
from typing import Literal
Expand Down Expand Up @@ -522,26 +522,45 @@ def delete_role(self, name: str) -> StreamResponse:
def list_roles(self) -> StreamResponse:
return self.get("roles")

def segment(self, segment_type: SegmentType, segment_id: Optional[str] = None, segment_name: Optional[str] = None,
data: Optional[SegmentData] = None) -> Segment:
return Segment(client=self, segment_type=segment_type, segment_id=segment_id, segment_name=segment_name,
data=data)
def segment(self, segment_type: SegmentType, segment_id: Optional[str] = None, data: Optional[SegmentData] = None) -> Segment:
return Segment(client=self, segment_type=segment_type, segment_id=segment_id, data=data)

def create_segment(self, segment_type: SegmentType, segment_id: Optional[str] = None,
segment_name: Optional[str] = None, data: Dict = None) -> StreamResponse:
return self.post("segments",
data={"type": segment_type.value, "id": segment_id, "name": segment_name, "data": data})
def create_segment(self, segment_type: SegmentType, segment_id: Optional[str]=None, data: Optional[SegmentData] = None) -> StreamResponse:
payload = {"type": segment_type.value}
if segment_id is not None:
payload["id"] = segment_id
if data is not None:
payload.update(data)
return self.post("segments", data=payload)

def get_segment(self, segment_id: str) -> StreamResponse:
return self.get(f"segments/{segment_id}")

def query_segments(self, filter_conditions: Dict, options: QuerySegmentsOptions) -> StreamResponse:
payload = {"filter": filter_conditions, **options}
return self.post("segments/query", data=payload)

def update_segment(self, segment_id: str, data: UpdateSegmentData) -> StreamResponse:
def update_segment(self, segment_id: str, data: SegmentData) -> StreamResponse:
return self.put(f"segments/{segment_id}", data=data)

def delete_segment(self, segment_id: str) -> StreamResponse:
return self.delete(f"segments/{segment_id}")

def segment_target_exists(
self, segment_id: str, target_id: str
) -> StreamResponse:
return self.get(f"segments/{segment_id}/target/{target_id}")

def add_segment_targets(
self, segment_id: str, target_ids: List[str]
) -> StreamResponse:
return self.post(f"segments/{segment_id}/addtargets", data={"targets": target_ids})

def delete_segment_targets(
self, segment_id: str, target_ids: List[str]
) -> StreamResponse:
return self.post(f"segments/{segment_id}/deletetargets", data={"targets": target_ids})

def campaign(self, campaign_id: Optional[str] = None, data: CampaignData = None) -> Campaign:
return Campaign(client=self, campaign_id=campaign_id, data=data)

Expand All @@ -559,24 +578,26 @@ def query_campaigns(self, filter_conditions: Dict[str, Any],
payload = {"filter": filter_conditions, **options}
return self.post("campaigns/query", data=payload)

def update_campaign(self, campaign_id: str, params: CampaignData) -> StreamResponse:
return self.put(f"campaigns/{campaign_id}", data=params)
def update_campaign(self, campaign_id: str, data: CampaignData) -> StreamResponse:
return self.put(f"campaigns/{campaign_id}", data=data)

def delete_campaign(self, campaign_id: str, **options: Any) -> StreamResponse:
return self.delete(f"campaigns/{campaign_id}", params=options)

def start_campaign(
self, campaign_id: str, scheduled_for: Optional[datetime.datetime] = None
self, campaign_id: str, scheduled_for: Optional[Union[str, datetime.datetime]] = None
) -> StreamResponse:
return self.patch(
f"campaigns/{campaign_id}/start", data={"scheduled_for": scheduled_for}
payload = {}
if scheduled_for is not None:
if isinstance(scheduled_for, datetime.datetime):
scheduled_for = scheduled_for.isoformat()
payload["scheduled_for"] = scheduled_for
return self.post(
f"campaigns/{campaign_id}/start", data=payload
)

def stop_campaign(self, campaign_id: str) -> StreamResponse:
return self.patch(f"campaigns/{campaign_id}/stop")

def resume_campaign(self, campaign_id: str) -> StreamResponse:
return self.patch(f"campaigns/{campaign_id}/resume")
return self.post(f"campaigns/{campaign_id}/stop")

def test_campaign(self, campaign_id: str, users: Iterable[str]) -> StreamResponse:
return self.post(f"campaigns/{campaign_id}/test", data={"users": users})
Expand Down
28 changes: 12 additions & 16 deletions stream_chat/segment.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
import uuid

from stream_chat.base.segment import SegmentInterface
from stream_chat.types.segment import UpdateSegmentData
from stream_chat.types.segment import SegmentData
from stream_chat.types.stream_response import StreamResponse


class Segment(SegmentInterface):

def create(self) -> StreamResponse:
if self.segment_name is None:
self.segment_name = str(uuid.uuid4())

state = self.client.create_segment(
segment_type=self.segment_type,
segment_id=self.segment_id,
segment_name=self.segment_name,
data=self.data
)

Expand All @@ -23,20 +17,22 @@ def create(self) -> StreamResponse:
return state

def get(self) -> StreamResponse:
return self.client.query_segments(
filter_conditions={
"id": self.segment_id,
},
options={
"limit": 1,
}
)
return self.client.get_segment(segment_id=self.segment_id)

def update(self, data: UpdateSegmentData) -> StreamResponse:
def update(self, data: SegmentData) -> StreamResponse:
return self.client.update_segment(
segment_id=self.segment_id,
data=data
)

def delete(self) -> StreamResponse:
return self.client.delete_segment(segment_id=self.segment_id)

def target_exists(self, target_id: str) -> StreamResponse:
return self.client.segment_target_exists(segment_id=self.segment_id, target_id=target_id)

def add_targets(self, target_ids: list) -> StreamResponse:
return self.client.add_segment_targets(segment_id=self.segment_id, target_ids=target_ids)

def delete_targets(self, target_ids: list) -> StreamResponse:
return self.client.delete_segment_targets(segment_id=self.segment_id, target_ids=target_ids)
Loading

0 comments on commit 10fb316

Please sign in to comment.