1
+ import sys
2
+ from urllib .parse import urlparse
3
+
4
+ import gitpod
5
+ import gitpod .lib as util
6
+ from gitpod import AsyncGitpod
7
+
8
+ async def handle_pat_auth (client : AsyncGitpod , user_id : str , runner_id : str , host : str ) -> None :
9
+ scm_info = {
10
+ "github.com" : {
11
+ "create_url" : f"https://{ host } /settings/tokens/new?scopes=repo,workflow,read:user,user:email&description=gitpod" ,
12
+ "docs_url" : "https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens" ,
13
+ "scopes" : "repo, workflow, read:user, user:email"
14
+ },
15
+ "gitlab.com" : {
16
+ "create_url" : f"https://{ host } /-/user_settings/personal_access_tokens?scopes=api,read_user,read_repository&name=gitpod" ,
17
+ "docs_url" : "https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html" ,
18
+ "scopes" : "api, read_user, read_repository"
19
+ },
20
+ "dev.azure.com" : {
21
+ "create_url" : None ,
22
+ "docs_url" : "https://docs.microsoft.com/en-us/azure/devops/organizations/accounts/use-personal-access-tokens-to-authenticate" ,
23
+ "scopes" : "Code (Read & Write)"
24
+ }
25
+ }.get (host , {
26
+ "create_url" : None ,
27
+ "docs_url" : "https://www.gitpod.io/docs/flex/source-control" ,
28
+ "scopes" : "repository access"
29
+ })
30
+
31
+ print ("\n To create a Personal Access Token:" )
32
+ if scm_info ["create_url" ]:
33
+ print (f"1. Visit: { scm_info ['create_url' ]} " )
34
+ else :
35
+ print (f"1. Go to { host } > Settings > Developer Settings" )
36
+ print (f"2. Create a new token with the following scopes: { scm_info ['scopes' ]} " )
37
+ print (f"3. Copy the generated token" )
38
+ print (f"\n For detailed instructions, visit: { scm_info ['docs_url' ]} " )
39
+
40
+ pat = input ("\n Enter your Personal Access Token: " ).strip ()
41
+ if not pat :
42
+ return
43
+
44
+ await util .set_scm_pat (client , user_id , runner_id , host , pat )
45
+
46
+ async def verify_context_url (client : AsyncGitpod , context_url : str , runner_id : str ) -> None :
47
+ """Verify and handle authentication for a repository context URL."""
48
+ host = urlparse (context_url ).hostname
49
+ if host is None :
50
+ print ("Error: Invalid context URL" )
51
+ sys .exit (1 )
52
+
53
+ resp = await client .users .get_authenticated_user ()
54
+ assert resp is not None
55
+ user = resp .user
56
+ assert user is not None
57
+ user_id = user .id
58
+ assert user_id is not None
59
+
60
+ # Main authentication loop
61
+ first_attempt = True
62
+ while True :
63
+ try :
64
+ # Try to access the context URL
65
+ await client .runners .parse_context_url (context_url = context_url , runner_id = runner_id )
66
+ print ("\n ✓ Authentication verified successfully" )
67
+ return
68
+
69
+ except gitpod .APIError as e :
70
+ if e .code != "failed_precondition" :
71
+ raise e
72
+
73
+ # Show authentication required message only on first attempt
74
+ if first_attempt :
75
+ print (f"\n Authentication required for { host } " )
76
+ first_attempt = False
77
+
78
+ # Get authentication options for the host
79
+ auth_resp = await client .runners .check_authentication_for_host (
80
+ host = host ,
81
+ runner_id = runner_id
82
+ )
83
+
84
+ # Handle re-authentication case
85
+ if auth_resp .authenticated and not first_attempt :
86
+ print ("\n It looks like you are already authenticated." )
87
+ if input ("Would you like to re-authenticate? (y/n): " ).lower ().strip () != 'y' :
88
+ print ("\n Authentication cancelled" )
89
+ sys .exit (1 )
90
+ else :
91
+ print ("\n Retrying authentication..." )
92
+ continue
93
+
94
+ auth_methods : list [tuple [str , str ]] = []
95
+ if auth_resp .authentication_url :
96
+ auth_methods .append (("OAuth" , "Recommended" ))
97
+ if auth_resp .pat_supported :
98
+ auth_methods .append (("Personal Access Token (PAT)" , "" ))
99
+
100
+ if not auth_methods :
101
+ print (f"\n Error: No authentication method available for { host } " )
102
+ sys .exit (1 )
103
+
104
+ # Present authentication options
105
+ if len (auth_methods ) > 1 :
106
+ print ("\n Available authentication methods:" )
107
+ for i , (method , note ) in enumerate (auth_methods , 1 ):
108
+ note_text = f" ({ note } )" if note else ""
109
+ print (f"{ i } . { method } { note_text } " )
110
+
111
+ choice = input (f"\n Choose authentication method (1-{ len (auth_methods )} ): " ).strip ()
112
+ try :
113
+ method_index = int (choice ) - 1
114
+ if not 0 <= method_index < len (auth_methods ):
115
+ raise ValueError ()
116
+ except ValueError :
117
+ method_index = 0 # Default to OAuth if invalid input
118
+ else :
119
+ method_index = 0
120
+
121
+ # Handle chosen authentication method
122
+ chosen_method = auth_methods [method_index ][0 ]
123
+ if chosen_method == "Personal Access Token (PAT)" :
124
+ await handle_pat_auth (client , user_id , runner_id , host )
125
+ else :
126
+ print (f"\n Please visit the following URL to authenticate:" )
127
+ print (f"{ auth_resp .authentication_url } " )
128
+ print ("\n Waiting for authentication to complete..." )
129
+ input ("Press Enter after completing authentication in your browser..." )
0 commit comments