import os import subprocess from datetime import datetime import sys PGHOST = "127.0.0.1" PGPORT = 5433 PGDB = "postgres" PGUSER = "postgres" PGPASS = "postgres123" PGSCHEMA = "public" PGTABLE = "foresttype_2025" S_SRS = "EPSG:5179" T_SRS = "EPSG:5186" GDB_PATH = r"C:\Users\ASUS\Desktop\dadrim\T2025imsangdo.gdb" # GDB내에 존재하는 레이어명 명시 SOURCE_LAYERS = [ "T2025_일반임상도", "T2025_접경임상도", ] TRUNCATE_BEFORE_LOAD = False LOG_DIR = os.path.dirname(GDB_PATH) LOGFILE = os.path.join(LOG_DIR, f"import_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt") def logger(message: str): ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S') line = f"[{ts}] {message}" with open(LOGFILE, "a", encoding="utf-8") as f: f.write(line + "\n") print(line) def run_cmd(cmd, env=None, timeout=None): try: res = subprocess.run( cmd, env=env, capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=timeout ) return res.returncode, res.stdout, res.stderr except subprocess.TimeoutExpired as e: return 999, e.stdout or "", f"TIMEOUT after {timeout}s" def psql_cmd(sql: str, timeout=60): env = os.environ.copy() env["PGPASSWORD"] = PGPASS return run_cmd( ["psql", "-h", PGHOST, "-p", str(PGPORT), "-U", PGUSER, "-d", PGDB, "-v", "ON_ERROR_STOP=1", "-c", sql], env=env, timeout=timeout ) def check_connection() -> bool: code, out, err = psql_cmd("SELECT 'DB_CONNECTED';", timeout=10) if code == 0 and "DB_CONNECTED" in out: logger(">> DB 접속 확인 성공!") return True logger(f">> DB 접속 실패: {err.strip() or out.strip()}") return False def table_exists(schema: str, table: str) -> bool: sql = ( "SELECT EXISTS (" " SELECT 1 FROM information_schema.tables" f" WHERE table_schema = '{schema}' AND table_name = '{table}'" ");" ) env = os.environ.copy() env["PGPASSWORD"] = PGPASS code, out, err = run_cmd( ["psql", "-h", PGHOST, "-p", str(PGPORT), "-U", PGUSER, "-d", PGDB, "-t", "-A", "-c", sql], env=env, timeout=30 ) if code != 0: raise RuntimeError(err.strip() or out.strip()) return out.strip().lower() == "t" def truncate_target_table(): sql = f'TRUNCATE TABLE "{PGSCHEMA}"."{PGTABLE}";' code, out, err = psql_cmd(sql, timeout=60) if code != 0: raise RuntimeError(err.strip() or out.strip()) logger(f'대상 테이블 TRUNCATE 완료: {PGSCHEMA}.{PGTABLE}') def run_ogr2ogr_layer(layer_name: str): conn_str = f"PG:host={PGHOST} port={PGPORT} dbname={PGDB} user={PGUSER} password={PGPASS}" cmd = [ "ogr2ogr", "-f", "PostgreSQL", conn_str, GDB_PATH, layer_name, "-nln", f"{PGSCHEMA}.{PGTABLE}", "-append", "-nlt", "PROMOTE_TO_MULTI", "-s_srs", S_SRS, "-t_srs", T_SRS, "--config", "PG_USE_COPY", "YES", "-gt", "10000", "-progress" ] logger(f"ogr2ogr 실행 레이어명: {layer_name}") code, out, err = run_cmd(cmd, timeout=60 * 60 * 6) if code != 0: return False, (err.strip() or out.strip()) return True, "" def create_index_and_analyze(): sql = ( f'CREATE INDEX IF NOT EXISTS "{PGTABLE}_geom_idx" ' f'ON "{PGSCHEMA}"."{PGTABLE}" USING GIST (geom); ' f'ANALYZE "{PGSCHEMA}"."{PGTABLE}";' ) code, out, err = psql_cmd(sql, timeout=60 * 60) if code != 0: raise RuntimeError(err.strip() or out.strip()) def main(): if not check_connection(): logger("DB 접속 실패로 프로그램을 종료합니다.") sys.exit(1) exists = table_exists(PGSCHEMA, PGTABLE) logger(f'대상 테이블 존재 여부: {"EXISTS" if exists else "NOT EXISTS"} ({PGSCHEMA}.{PGTABLE})') if not exists: logger("대상 테이블이 없습니다. Postgres에 테이블을 미리 생성한 뒤 다시 실행하세요.") sys.exit(1) if TRUNCATE_BEFORE_LOAD: try: truncate_target_table() except Exception as e: logger(f"TRUNCATE 실패: {e}") sys.exit(1) total_start = datetime.now() error_layers = [] for i, layer in enumerate(SOURCE_LAYERS, start=1): logger(f"[{i}/{len(SOURCE_LAYERS)}] 레이어 적재 시작: {layer}") start = datetime.now() ok, msg = run_ogr2ogr_layer(layer) sec = (datetime.now() - start).total_seconds() if ok: logger(f"[{i}/{len(SOURCE_LAYERS)}] 적재 완료: {layer} ({sec:.2f}s)") else: logger(f"[{i}/{len(SOURCE_LAYERS)}] 적재 실패: {layer} ({sec:.2f}s) / 사유: {msg}") error_layers.append(layer) logger("GIST 인덱스 생성 및 ANALYZE 시작") try: create_index_and_analyze() logger("GIST 인덱스 생성 및 ANALYZE 완료") except Exception as e: logger(f"인덱스 생성/ANALYZE 실패(수동 확인 필요): {e}") total_sec = (datetime.now() - total_start).total_seconds() logger("=" * 60) logger("전체 종료") logger(f"총 소요시간: {total_sec:.2f}초 ({total_sec/60:.1f}분)") logger(f"로그: {LOGFILE}") logger("=" * 60) if error_layers: print("\n!! 실패한 레이어 목록:", error_layers) if __name__ == "__main__": main()