Skip to content

Commit

Permalink
🚧 fix(wip): stream management on indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
mxchinegod committed Jul 10, 2024
1 parent f022fdb commit 57cfa34
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 13 deletions.
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
project = 'magnet'
copyright = '2023, Prismadic, LLC'
author = 'Prismadic, LLC.'
release = '0.3.1'
release = '0.3.11'

# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
Expand All @@ -29,7 +29,7 @@
display_github = True
html_logo = "../magnet.png"
pygments_style = 'dracula'
version = "v0.3.1"
version = "v0.3.11"
release = "latest"
# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
Expand Down
13 changes: 6 additions & 7 deletions magnet/ic/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000,
, max_ack_pending=bandwidth
, ack_wait=3600
)
_f('wait', f'connecting to {self.magnet.config.host}')
_f('wait', f'connecting to {self.magnet.config.host.split("@")[1]}')
try:
if obj:
self.sub = await self.magnet.os.watch(include_history=False)
Expand All @@ -222,7 +222,6 @@ async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000,
self.sub = await self.magnet.js.pull_subscribe(
durable=self.magnet.config.session
, subject=self.magnet.config.category
, stream=self.magnet.config.stream_name
, config=self.consumer_config
)
_f('info',
Expand Down Expand Up @@ -271,13 +270,13 @@ async def deliver_messages(msgs):
await self.download(msg)
await cb(self.magnet.os, msg)
else:
_f("info", f'consuming {job_n} from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.session}"')
_f("info", f'consuming {job_n} from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.config.session}"')
msgs = await self.sub.fetch(batch=job_n, timeout=60)
await deliver_messages(msgs)
except ValueError as e:
_f('warn', f'{self.magnet.config.session} reached the end of {self.magnet.config.category}, {self.magnet.config.name}')
except Exception as e:
_f('warn', "no more data")
_f('warn', f"no more data\n{e}")
else:
if type(self.sub).__name__ == "ObjectWatcher":
_f("info", f'consuming objects from [{self.magnet.config.host.split("@")[1]}] from\n🛰️ bucket: {self.magnet.config.stream_name}"')
Expand All @@ -300,7 +299,7 @@ async def deliver_messages(msgs):
_f('warn', 'encountered a timeout, retrying in 1s')
else:
_f('fatal', str(e))
_f("warn", f'retrying connection to {self.magnet.config.host}\n{e}')
_f("warn", f'retrying connection to {self.magnet.config.host.split("@")[1]}\n{e}')
_f("info", "this can also be a problem with your callback")
await asyncio.sleep(1)
_f("info", f'consuming delta from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.config.session}"')
Expand Down Expand Up @@ -350,8 +349,8 @@ async def off(self):
:return: None
"""
await self.magnet.js.sub.unsubscribe()
await self.sub.unsubscribe()
_f('warn', f'unsubscribed from {self.magnet.config.stream_name}')
await self.nc.drain()
_f('warn', f'safe to disconnect from {self.magnet.config.host}')
_f('warn', f'safe to disconnect from {self.magnet.config.host.split("@")[1]}')

4 changes: 2 additions & 2 deletions magnet/ize/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ async def index(self, payload, msg, field=None, v=False, instruction="Represent
self.db.collection.insert([
[payload.document], [_chunk], [embedding.tolist()]
])
_f('success', f'embedding indexed\n{payload}') if v else None
_f('success', f'embedding indexed\n{_chunk}') if v else None
if field:
payload = EmbeddingPayload(
model=self.config.index.model,
embedding=embedding.tolist(),
text=_chunk,
document=payload.document
)
_f('info', f'sending payload\n{payload}') if v else None
_f('info', f'sending payload\n{_chunk}') if v else None
await self.field.pulse(payload)
else:
embedding = self._model.encode(text_to_encode, normalize_embeddings=True)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "llm_magnet"
version = "0.3.1"
version = "0.3.11"
description = "the small distributed language model toolkit. fine-tune state-of-the-art LLMs anywhere, rapidly."
readme = "dynamic"

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='llm_magnet',
version='0.3.1',
version='0.3.11',
description="the small distributed language model toolkit. fine-tune state-of-the-art LLMs anywhere, rapidly.",
long_description=open('README.md').read(),
long_description_content_type='text/markdown',
Expand Down

0 comments on commit 57cfa34

Please sign in to comment.