接上文, rq worker 里面又创建新的 worker 需要怎么处理
感谢大家在前文 [请教下各位 Gunicorn...]( https://www.v2ex.com/t/876794)提供的帮助,前段时间有些忙,没有一一回复大家。在这里感谢各位了,中秋快乐。
听取了大家的意见,由于 celery 对于我来说过于复杂,所以使用 [rq]( https://python-rq.org/) 来做。
叨扰一下新的问题:
1. 长耗时任务分为两个步骤
2. 步骤 1 执行完,才执行步骤 2
3. 步骤 2 内部需要拆分为多个 worker 并行执行。
## 目前遇到的问题是
1. step_two 里面的 worker 变成串行了。
2. 要怎么汇总 step_two 的运行结果
```python
@app.route('/task')
def add_task():
user = request.args.get('user')
if user:
print('执行步骤 1')
d = datetime.now().strftime('%Y%m%d%H%M%S')
name = f"{user}_{d}"
queue.enqueue(
step_one,
args=(name,)
)
worker = Worker([queue], connection=redis, name=name)
worker.work(burst=True)
print('执行步骤 2')
step_two(name)
# TODO: 汇总上面的结果返回给网页端
return f"Task {name} complete"
return 'No value for n'
def step_one(name):
delay = 5
print(f"Running {name}, Simulating {delay} second delay")
time.sleep(delay)
f = open(f"step_one_{name}.txt", 'a')
d = datetime.now().strftime('%Y%m%d-%H%M%S')
f.write(d)
print(f"Task {name} complete")
return name
def step_two_fn(name):
delay = 5
print(f"Running {name}, Simulating {delay} second delay")
time.sleep(delay)
f = open(f"step_two_fn_{name}.txt", 'a')
d = datetime.now().strftime('%Y%m%d-%H%M%S')
f.write(d)
print(f"Task {name} complete")
return name
def step_two(name):
for i in range(2):
worker_name = f"{name}_{i}"
queue.enqueue(step_two_fn, worker_name)
worker = Worker([queue], connection=redis, name=name)
worker.work(burst=True)
```
#name #step #worker #delay #two #print #步骤 #def #user #queue
感谢大家在前文 [请教下各位 Gunicorn...]( https://www.v2ex.com/t/876794)提供的帮助,前段时间有些忙,没有一一回复大家。在这里感谢各位了,中秋快乐。
听取了大家的意见,由于 celery 对于我来说过于复杂,所以使用 [rq]( https://python-rq.org/) 来做。
叨扰一下新的问题:
1. 长耗时任务分为两个步骤
2. 步骤 1 执行完,才执行步骤 2
3. 步骤 2 内部需要拆分为多个 worker 并行执行。
## 目前遇到的问题是
1. step_two 里面的 worker 变成串行了。
2. 要怎么汇总 step_two 的运行结果
```python
@app.route('/task')
def add_task():
user = request.args.get('user')
if user:
print('执行步骤 1')
d = datetime.now().strftime('%Y%m%d%H%M%S')
name = f"{user}_{d}"
queue.enqueue(
step_one,
args=(name,)
)
worker = Worker([queue], connection=redis, name=name)
worker.work(burst=True)
print('执行步骤 2')
step_two(name)
# TODO: 汇总上面的结果返回给网页端
return f"Task {name} complete"
return 'No value for n'
def step_one(name):
delay = 5
print(f"Running {name}, Simulating {delay} second delay")
time.sleep(delay)
f = open(f"step_one_{name}.txt", 'a')
d = datetime.now().strftime('%Y%m%d-%H%M%S')
f.write(d)
print(f"Task {name} complete")
return name
def step_two_fn(name):
delay = 5
print(f"Running {name}, Simulating {delay} second delay")
time.sleep(delay)
f = open(f"step_two_fn_{name}.txt", 'a')
d = datetime.now().strftime('%Y%m%d-%H%M%S')
f.write(d)
print(f"Task {name} complete")
return name
def step_two(name):
for i in range(2):
worker_name = f"{name}_{i}"
queue.enqueue(step_two_fn, worker_name)
worker = Worker([queue], connection=redis, name=name)
worker.work(burst=True)
```
#name #step #worker #delay #two #print #步骤 #def #user #queue