Находки в опенсорсе: Python
1.01K subscribers
4 photos
150 links
Легкие задачки в опенсорсе из мира Python

Чат: @opensource_findings_chat
Download Telegram
🚀 New issue to ag2ai/faststream by @GrigoriyKuzevanov
📝 Bug: min_idle_time ignored when group and consumer are specified (#2678)


Describe the bug
When a StreamSub has both 'group' and 'consumer', and 'min_idle_time' specified, Faststream uses 'XREADGROUP' instead of 'XAUTOCALIM'

How to reproduce
Include source code:

from faststream import FastStream
from faststream.redis import RedisBroker, StreamSub

broker = RedisBroker("redis://localhost:6379")

@broker.subscriber(
stream=StreamSub(
"orders",
group="processors",
consumer="claimer",
min_idle_time=10000, # Should trigger XAUTOCLAIM
)
)
async def claiming_handler(msg):
print("Should use XAUTOCLAIM, but uses XREADGROUP")


app = FastStream(broker)

Redis MONITOR output shows:

XREADGROUP GROUP processors claimer BLOCK 100 STREAMS orders >

Expected behavior

XAUTOCLAIM orders processors claimer 10000 0-0 COUNT 1

Observed behavior
I suppose that a root cause in faststream/redis/subscriber/use_cases/stream_subscriber, method _StreamHandlerMixin.start():

if stream.group and stream.consumer:  # ← Checked FIRST
# Uses XREADGROUP
...
elif self.stream_sub.min_idle_time is None:
# Uses XREAD
...
else:
# Uses XAUTOCLAIM ← Never reached when group is set!
...

Or i just misunderstand the logic.

Environment
Running FastStream 0.6.3 with CPython 3.12.3 on Linux


#bug #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @gaby
📝 bug: Usage of custom logger results in no logs (#2677)


Is your feature request related to a problem? Please describe.
The built-logger is configured to always add colors, even when passing a logger to faststream. This is hardcoded here https://github.com/ag2ai/faststream/blob/main/faststream/_internal/logger/logging.py#L80 This affects systems collecting logs from faststream hosts. This makes loga generated by faststream to show in raw text as "\033[36mDEBUG\033[0m" instead of DEBUG.

Describe the solution you'd like
Make the use_colors param configurable instead of a hardcoded value.

Describe alternatives you've considered
Writing a custom log parser.


#enhancement #good_first_issue #faststream #ag2ai
sent via relator
🚀New issues to taskiq-python project

Hello everyone interested in contributing to open-source projects! We appreciate your intentions and ask for your help.

The taskiq-python project aims to migrate from Poetry to UV and drop support for Python 3.9. Since Taskiq has many different repositories, we would greatly appreciate your assistance. Here's a list of issues:

- https://github.com/taskiq-python/taskiq-fastapi/issues/24
- https://github.com/taskiq-python/taskiq-redis/issues/108
- https://github.com/taskiq-python/taskiq-psqlpy/issues/10
- https://github.com/taskiq-python/taskiq-valkey/issues/3
- https://github.com/taskiq-python/taskiq-pipelines/issues/28
- https://github.com/taskiq-python/taskiq-litestar/issues/3
- https://github.com/taskiq-python/taskiq-aiogram/issues/4
- https://github.com/taskiq-python/taskiq-aiohttp/issues/7

#good_first_issue #taskiq #enchancement
🔥11🫡1
🚀 New issue to ag2ai/faststream by @nectarindev
📝 feat: merge `Broker(context=...)` and `FastStream(context=...)` at broker-level (#2693)


I recently migrated my project to faststream 0.6. I was very interested in how I could add my dependencies to the context. Prior to version 0.6, I did something like this:

from faststream.annotations import ContextRepo
from faststream.kafka import KafkaBroker
from faststream.utils.context import context

broker = KafkaBroker()


@broker.subscriber("my_topic", group_id="my_group")
async def handle(
context: ContextRepo,
):
print("dependency: ", context.get("dependency")) # 42


async def lifespan(*args, **kwargs):
context.set_global("dependency", 42)

await broker.start()
try:
yield
finally:
await broker.stop()

I launched the broker as part of my application in lifespan without using the FastStream class.

For version 0.6, I saw examples where it was suggested to pass the context to FastStream, but that solution did not suit me. I discovered that the broker also accepts context, and that solves my problem:

broker = KafkaBroker(context=ContextRepo({"dependency": 42}))

...

async def lifespan(*args, **kwargs):
await broker.start()
try:
yield
finally:
await broker.stop()

But I also discovered that if I create a FastStream instance, its context will be used, even though I didn't use it to start the broker.

from fastapi import FastAPI
from faststream import ContextRepo, FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker(context=ContextRepo({"broker_dependency": 2}))
app = FastStream(broker, context=ContextRepo({"application_dependency": 1}))


@broker.subscriber("my_topic", group_id="my_group")
async def handle(
context: ContextRepo,
):
print("broker_dependency: ", context.get("broker_dependency")) # None
print("application_dependency: ", context.get("application_dependency")) # 1


async def lifespan(*args, **kwargs):
await broker.start()
try:
yield
finally:
await broker.stop()


asgi = FastAPI(lifespan=lifespan)

I'm not sure that's normal behavior. It would make much more sense if only the broker's dependency were available.

⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯

Running FastStream 0.6.3 with CPython 3.12.4 on Linux


#enhancement #good_first_issue #core #faststream #ag2ai
sent via relator
1
🚀 New issue to wemake-services/django-modern-rest by @sobolevn
📝 Unskip `test_msgspec` files on 3.14 with new released version (#334)


msgspec@0.20.0 is out and updated in #333. Now all tests in test_plugins/test_msgpspec can be executed on 3.14 as well.
We need a PR with the fix that removes all pytest.skip directives for 3.14 from these files.


#enhancement #good_first_issue #help_wanted #django_modern_rest
sent via relator