Files
kubernetes/dev/zos-web/backend/tn3270_service.py
T
2026-05-31 16:07:30 +02:00

110 lines
3.0 KiB
Python

import subprocess
import time
import os
import threading
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TN3270Session:
def __init__(self):
self.host = os.environ.get("ZOS_HOST", "192.168.2.243")
self.port = os.environ.get("ZOS_PORT", "23")
self.s3270 = None
self.lock = threading.Lock()
self.connected = False
def _start_s3270(self):
self.s3270 = subprocess.Popen(
["s3270", "-model", "3279-2", f"{self.host}:{self.port}"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
time.sleep(2)
def _send(self, command):
self.s3270.stdin.write(command + "\n")
self.s3270.stdin.flush()
return self._read_response()
def _read_response(self):
lines = []
while True:
line = self.s3270.stdout.readline()
if not line:
return False, lines
line = line.rstrip("\n")
if line == "ok":
return True, lines
if line == "error":
return False, lines
if line.startswith("data: "):
line = line[6:]
lines.append(line)
def connect(self):
logger.info(f"Connecting to {self.host}:{self.port}")
self._start_s3270()
time.sleep(2)
self.connected = True
logger.info("TN3270 connected - waiting for user input")
def disconnect(self):
try:
self.s3270.terminate()
except Exception:
pass
finally:
self.connected = False
def get_screen(self):
with self.lock:
ok, lines = self._send("Ascii()")
return "\n".join(lines)
def send_command(self, command):
with self.lock:
self._send(f'String("{command}")')
self._send("Enter()")
time.sleep(0.5)
ok, lines = self._send("Ascii()")
return "\n".join(lines)
def send_pf(self, key):
with self.lock:
self._send(f"PF({key})")
time.sleep(0.3)
ok, lines = self._send("Ascii()")
return "\n".join(lines)
def send_tab(self):
with self.lock:
self._send("Tab()")
ok, lines = self._send("Ascii()")
return "\n".join(lines)
def send_enter(self):
with self.lock:
self._send("Enter()")
time.sleep(0.5)
ok, lines = self._send("Ascii()")
return "\n".join(lines)
def send_key(self, key):
with self.lock:
self._send(key)
time.sleep(0.1)
ok, lines = self._send("Ascii()")
return "\n".join(lines)
def send_char(self, char):
with self.lock:
escaped = char.replace('"', '\"')
self._send(f'String("{escaped}")')
ok, lines = self._send("Ascii()")
return "\n".join(lines)