Add dockerized python executor

This commit is contained in:
Aymeric 2024-12-12 18:51:06 +01:00
parent 8824ba68eb
commit 715351defd
3 changed files with 274 additions and 16 deletions

View File

@ -15,33 +15,32 @@ rendered properly in your Markdown viewer.
-->
# Introduction to Agents
### Why do we need agentic systems?
### What is an agent?
Current LLMs are like basic reasoning robots, that are trapped into a room.
They can be sometimes impressively smart and often impressively dumb but they can only take as input what we decide to provide to them. We pass notes under the door be it text, or text with images for vision models, or even audio , and they reply to each note by passing another note under the door, but they cannot do anything else.
Current LLMs are like basic reasoning robots that are trapped into a room.
They take as input what we decide to provide to them. We pass notes under the door be it text, or text with images for vision models, or even audio , and they reply to each note by passing another note under the door, but they cannot do anything else.
Wouldn't it be much more efficient to let them have some kind of access to the real world, either as a way to do their own research in order to better answer a question, or a way to accomplish a complex task for us?
Any efficient system using AI will need to provide LLMs some kind of access to the real world: for instance the possibility to call a search tool to get external information, or to act on certain programs in order to solve a task.
In other words, give them some agency.
In other words, give them some agency. Agentic programs are the gateway to the outside world for LLMs.
The whole idea of agentic systems is to embed LLMs into a program where their input and outputs are optimized to better leverage real-world interactions.
Our definition of AI Agents is : “programs in which the workflow is determined by LLM outputs”. Any system leveraging LLMs will embed them into code. The influence of the LLM's input on the code workflow is the level of agency of LLMs in the system.
Note that with this definition, "agent" is not a discrete, 0 or 1 definition: instead, "agency" evolves on a continuous spectrum, as you give more or less influence to the LLM on your workflow.
### What is an agentic system ?
If the output of the LLM has no impact on the workflow, as in a program that just postprocesses a LLM's output and returns it, this system is not agentic at all.
Being "agentic" is not a discrete, 0 or 1 definition: instead, we should talk about "agency" being a continuous spectrum.
Any system leveraging LLMs will embed them into code. The influence of the LLM's input on the code workflow is the level of agency of LLMs in the system.
If the output of the LLM has no further impact on the workflow, as in a program that just postprocesses a LLM's output and returns it, this system is not agentic at all.
Once an LLM output is used to determine which branch of an `if/else` switch is ran, the system starts to have some level of agency: it's a router.
If an LLM output is used to determine which branch of an `if/else` switch is ran, the system starts to have some level of agency: it's a router.
Then it can get more agentic.
- If you use an LLM output to determine which function is run and with which arguments, that's tool calling.
- If you use an LLM output to determine if you should keep iterating in a while loop, you get a multi-step agent.
And the workflow can become even more complex. That's up to you to decide.
Since the systems versatility goes in lockstep with the level of agency that you give to the LLM, agentic systems can perform much broader tasks than any classic program.
Programs are not just tools anymore, confined to an ultra-specialized task : they are agents.
### When to use an agentic system ?

View File

@ -0,0 +1,259 @@
import sys
import json
import traceback
from pathlib import Path
import docker
import time
import uuid
import signal
from typing import Optional, Dict, Tuple, Any
import subprocess
def read_multiplexed_response(socket):
"""Read and demultiplex all responses from Docker exec socket"""
socket.settimeout(10.0)
i = 0
while True and i<1000:
# Stream output from socket
response_data = socket.recv(4096)
responses = response_data.split(b'\x01\x00\x00\x00\x00\x00')
# The last non-empty chunk should be our JSON response
for chunk in reversed(responses):
if chunk and len(chunk.strip()) > 0:
try:
# Find the start of valid JSON by looking for '{'
json_start = chunk.find(b'{')
if json_start != -1:
decoded = chunk[json_start:].decode('utf-8')
result = json.loads(decoded)
if "output" in result:
return decoded
except json.JSONDecodeError:
continue
i+=1
class DockerInterpreter:
def __init__(self, work_dir: Path = Path(".")):
self.client = docker.from_env()
self.work_dir = work_dir
self.work_dir.mkdir(exist_ok=True)
self.container = None
self.exec_id = None
self.socket = None
def create_interpreter_script(self) -> str:
"""Create the interpreter script that will run inside the container"""
script = """
import sys
import code
import json
import traceback
import signal
from threading import Lock
class PersistentInterpreter(code.InteractiveInterpreter):
def __init__(self):
self.locals_dict = {'__name__': '__console__', '__doc__': None}
super().__init__(self.locals_dict)
self.lock = Lock()
self.output_buffer = []
def write(self, data):
self.output_buffer.append(data)
def run_command(self, source):
with self.lock:
self.output_buffer = []
try:
more = self.runsource(source)
output = ''.join(self.output_buffer)
if not more and not output and source.strip():
try:
result = eval(source, self.locals_dict)
if result is not None:
output = repr(result) + '\\n'
except:
pass
return json.dumps({'output': output, 'more': more, 'error': None}) + '\\n'
except KeyboardInterrupt:
return json.dumps({'output': '\\nKeyboardInterrupt\\n', 'more': False, 'error': 'interrupt'}) + '\\n'
except Exception as e:
return json.dumps({'output': f"Error: {str(e)}\\n", 'more': False, 'error': str(e)}) + '\\n'
def main():
interpreter = PersistentInterpreter()
# Make sure interrupts are handled
signal.signal(signal.SIGINT, signal.default_int_handler)
while True:
try:
line = sys.stdin.readline()
if not line:
break
try:
command = json.loads(line)
result = interpreter.run_command(command['code'])
sys.stdout.write(result)
sys.stdout.flush()
except json.JSONDecodeError:
sys.stdout.write(json.dumps({'output': 'Invalid command\\n', 'more': False, 'error': 'invalid_json'}) + '\\n')
sys.stdout.flush()
except KeyboardInterrupt:
sys.stdout.write(json.dumps({'output': '\\nKeyboardInterrupt\\n', 'more': False, 'error': 'interrupt'}) + '\\n')
sys.stdout.flush()
continue
except Exception as e:
sys.stderr.write(f"Fatal error: {str(e)}\\n")
break
if __name__ == '__main__':
main()
"""
script_path = self.work_dir / "interpreter.py"
with open(script_path, "w") as f:
f.write(script)
return str(script_path)
def wait_for_ready(self, container: Any, timeout: int = 60) -> bool:
elapsed_time = 0
while elapsed_time < timeout:
try:
container.reload()
if container.status == "running":
return True
time.sleep(0.2)
elapsed_time += 0.2
except docker.errors.NotFound:
return False
return False
def start(self, container_name: Optional[str] = None):
if container_name is None:
container_name = f"python-interpreter-{uuid.uuid4().hex[:8]}"
# Setup volume mapping
volumes = {
str(self.work_dir.resolve()): {"bind": "/workspace", "mode": "rw"}
}
for container in self.client.containers.list(all=True):
if container_name == container.name:
print(f"Found existing container: {container.name}")
if container.status != "running":
container.start()
self.container = container
break
else: # Create new container
self.container = self.client.containers.run(
"python:3.9",
name=container_name,
command=["python", "/workspace/interpreter.py"],
detach=True,
tty=True,
stdin_open=True,
working_dir="/workspace",
volumes=volumes
)
# Install packages in the new container
print("Installing packages...")
packages = ["pandas", "numpy"] # Add your required packages here
result = self.container.exec_run(
f"pip install {' '.join(packages)}",
workdir="/workspace"
)
if result.exit_code != 0:
print(f"Warning: Failed to install: {result.output.decode()}")
else:
print(f"Installed {packages}.")
if not self.wait_for_ready(self.container):
raise Exception("Failed to start container")
# Start a persistent exec instance
self.exec_id = self.client.api.exec_create(
self.container.id,
["python", "/workspace/interpreter.py"],
stdin=True,
stdout=True,
stderr=True,
tty=True
)
# Connect to the exec instance
self.socket = self.client.api.exec_start(
self.exec_id['Id'],
socket=True,
demux=True
)._sock
def execute(self, code: str) -> Tuple[str, bool]:
if not self.container :
raise Exception("Container not started")
if not self.socket:
raise Exception("Socket not started")
command = json.dumps({'code': code}) + '\n'
self.socket.send(command.encode())
response = read_multiplexed_response(self.socket)
try:
result = json.loads(response)
return result['output'], result['more']
except json.JSONDecodeError:
return f"Error: Invalid response from interpreter: {response}", False
def stop(self, remove: bool = False):
if self.socket:
try:
self.socket.close()
except:
pass
if self.container:
try:
self.container.stop()
if remove:
self.container.remove()
self.container = None
except docker.errors.APIError as e:
print(f"Error stopping container: {e}")
raise
def main():
work_dir = Path("interpreter_workspace")
interpreter = DockerInterpreter(work_dir)
def signal_handler(signum, frame):
print("\nExiting...")
interpreter.stop(remove=True)
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
print("Starting Python interpreter in Docker...")
interpreter.start("persistent_python_interpreter2")
snippet = "import pandas as pd"
output, more = interpreter.execute(snippet)
print("OUTPUT1")
print(output, end='')
snippet = "pd.DataFrame()"
output, more = interpreter.execute(snippet)
print("OUTPUT2")
print(output, end='')
print("\nStopping interpreter...")
interpreter.stop(remove=True)
if __name__ == '__main__':
main()

View File

@ -963,7 +963,7 @@ def evaluate_python_code(
code (`str`):
The code to evaluate.
static_tools (`Dict[str, Callable]`):
The functions that may be called during the evaluation.
The functions that may be called during the evaluation. These can also be agents in a multiagent setting.
These tools cannot be overwritten in the code: any assignment to their name will raise an error.
custom_tools (`Dict[str, Callable]`):
The functions that may be called during the evaluation.