1
+ import sys
2
+ from urllib .parse import urlparse
3
+
4
+ import gitpod
5
+ import gitpod .lib as util
6
+ from gitpod import AsyncGitpod
7
+ from gitpod .types .runner_check_authentication_for_host_response import SupportsPat
8
+
9
+
10
+ async def handle_pat_auth (client : AsyncGitpod , user_id : str , runner_id : str , host : str , supports_pat : SupportsPat ) -> None :
11
+ print ("\n To create a Personal Access Token:" )
12
+ create_url = supports_pat .create_url
13
+
14
+ if create_url :
15
+ print (f"1. Visit: { create_url } " )
16
+ else :
17
+ print (f"1. Go to { host } > Settings > Developer Settings" )
18
+
19
+ if supports_pat .required_scopes and len (supports_pat .required_scopes ) > 0 :
20
+ required_scopes = ", " .join (supports_pat .required_scopes )
21
+ print (f"2. Create a new token with the following scopes: { required_scopes } " )
22
+ else :
23
+ print (f"2. Create a new token" )
24
+
25
+ if supports_pat .example :
26
+ print (f"3. Copy the generated token (example: { supports_pat .example } )" )
27
+ else :
28
+ print (f"3. Copy the generated token" )
29
+
30
+ if supports_pat .docs_url :
31
+ print (f"\n For detailed instructions, visit: { supports_pat .docs_url } " )
32
+
33
+ pat = input ("\n Enter your Personal Access Token: " ).strip ()
34
+ if not pat :
35
+ return
36
+
37
+ await util .set_scm_pat (client , user_id , runner_id , host , pat )
38
+
39
+ async def verify_context_url (client : AsyncGitpod , context_url : str , runner_id : str ) -> None :
40
+ """Verify and handle authentication for a repository context URL.
41
+
42
+ This function checks if the user has access to the specified repository and manages
43
+ the authentication process if needed. Git access to the repository is required for
44
+ environments to function properly.
45
+
46
+ As an alternative, you can authenticate once via the Gitpod dashboard:
47
+ 1. Start a new environment
48
+ 2. Complete the browser-based authentication flow
49
+
50
+ See https://www.gitpod.io/docs/flex/source-control for more details.
51
+ """
52
+ host = urlparse (context_url ).hostname
53
+ if host is None :
54
+ print ("Error: Invalid context URL" )
55
+ sys .exit (1 )
56
+
57
+ resp = await client .users .get_authenticated_user ()
58
+ user = resp .user
59
+ assert user is not None
60
+ user_id = user .id
61
+ assert user_id is not None
62
+
63
+ # Main authentication loop
64
+ first_attempt = True
65
+ while True :
66
+ try :
67
+ # Try to access the context URL
68
+ await client .runners .parse_context_url (context_url = context_url , runner_id = runner_id )
69
+ print ("\n ✓ Authentication verified successfully" )
70
+ return
71
+
72
+ except gitpod .APIError as e :
73
+ if e .code != "failed_precondition" :
74
+ raise e
75
+
76
+ # Show authentication required message only on first attempt
77
+ if first_attempt :
78
+ print (f"\n Authentication required for { host } " )
79
+ first_attempt = False
80
+
81
+ # Get authentication options for the host
82
+ auth_resp = await client .runners .check_authentication_for_host (
83
+ host = host ,
84
+ runner_id = runner_id
85
+ )
86
+
87
+ # Handle re-authentication case
88
+ if auth_resp .authenticated and not first_attempt :
89
+ print ("\n It looks like you are already authenticated." )
90
+ if input ("Would you like to re-authenticate? (y/n): " ).lower ().strip () != 'y' :
91
+ print ("\n Authentication cancelled" )
92
+ sys .exit (1 )
93
+ else :
94
+ print ("\n Retrying authentication..." )
95
+ continue
96
+
97
+ auth_methods : list [tuple [str , str ]] = []
98
+ if auth_resp .supports_oauth2 :
99
+ auth_methods .append (("OAuth" , "Recommended" ))
100
+ if auth_resp .supports_pat :
101
+ auth_methods .append (("Personal Access Token (PAT)" , "" ))
102
+
103
+ if not auth_methods :
104
+ print (f"\n Error: No authentication method available for { host } " )
105
+ sys .exit (1 )
106
+
107
+ # Present authentication options
108
+ if len (auth_methods ) > 1 :
109
+ print ("\n Available authentication methods:" )
110
+ for i , (method , note ) in enumerate (auth_methods , 1 ):
111
+ note_text = f" ({ note } )" if note else ""
112
+ print (f"{ i } . { method } { note_text } " )
113
+
114
+ choice = input (f"\n Choose authentication method (1-{ len (auth_methods )} ): " ).strip ()
115
+ try :
116
+ method_index = int (choice ) - 1
117
+ if not 0 <= method_index < len (auth_methods ):
118
+ raise ValueError ()
119
+ except ValueError :
120
+ method_index = 0 # Default to OAuth if invalid input
121
+ else :
122
+ method_index = 0
123
+
124
+ # Handle chosen authentication method
125
+ chosen_method = auth_methods [method_index ][0 ]
126
+ if chosen_method == "Personal Access Token (PAT)" :
127
+ assert auth_resp .supports_pat
128
+ await handle_pat_auth (client , user_id , runner_id , host , auth_resp .supports_pat )
129
+ else :
130
+ assert auth_resp .supports_oauth2
131
+ print (f"\n Please visit the following URL to authenticate:" )
132
+ print (f"{ auth_resp .supports_oauth2 .auth_url } " )
133
+ if auth_resp .supports_oauth2 .docs_url :
134
+ print (f"\n For detailed instructions, visit: { auth_resp .supports_oauth2 .docs_url } " )
135
+ print ("\n Waiting for authentication to complete..." )
136
+ input ("Press Enter after completing authentication in your browser..." )
0 commit comments