import subprocess import time import os import threading import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TN3270Session: def __init__(self): self.host = os.environ.get("ZOS_HOST", "192.168.2.243") self.port = os.environ.get("ZOS_PORT", "23") self.s3270 = None self.lock = threading.Lock() self.connected = False def _start_s3270(self): self.s3270 = subprocess.Popen( ["s3270", "-model", "3279-2", f"{self.host}:{self.port}"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, ) time.sleep(2) def _send(self, command): self.s3270.stdin.write(command + "\n") self.s3270.stdin.flush() return self._read_response() def _read_response(self): lines = [] while True: line = self.s3270.stdout.readline() if not line: return False, lines line = line.rstrip("\n") if line == "ok": return True, lines if line == "error": return False, lines if line.startswith("data: "): line = line[6:] lines.append(line) def connect(self): logger.info(f"Connecting to {self.host}:{self.port}") self._start_s3270() time.sleep(2) self.connected = True logger.info("TN3270 connected - waiting for user input") def disconnect(self): try: self.s3270.terminate() except Exception: pass finally: self.connected = False def get_screen(self): with self.lock: ok, lines = self._send("Ascii()") return "\n".join(lines) def send_command(self, command): with self.lock: self._send(f'String("{command}")') self._send("Enter()") time.sleep(0.5) ok, lines = self._send("Ascii()") return "\n".join(lines) def send_pf(self, key): with self.lock: self._send(f"PF({key})") time.sleep(0.3) ok, lines = self._send("Ascii()") return "\n".join(lines) def send_tab(self): with self.lock: self._send("Tab()") ok, lines = self._send("Ascii()") return "\n".join(lines) def send_enter(self): with self.lock: self._send("Enter()") time.sleep(0.5) ok, lines = self._send("Ascii()") return "\n".join(lines) def send_key(self, key): with self.lock: self._send(key) time.sleep(0.1) ok, lines = self._send("Ascii()") return "\n".join(lines) def send_char(self, char): with self.lock: escaped = char.replace('"', '\"') self._send(f'String("{escaped}")') ok, lines = self._send("Ascii()") return "\n".join(lines)